This is an automated email from the ASF dual-hosted git repository.

gkoszyk pushed a commit to branch io_uring_tpc
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/io_uring_tpc by this push:
     new 4f07d68c feat(io_uring): move to compio runtime (#1960)
4f07d68c is described below

commit 4f07d68c87c6556fcbc82bdd59653ab1783990d1
Author: Grzegorz Koszyk <[email protected]>
AuthorDate: Fri Jul 4 15:27:28 2025 +0200

    feat(io_uring): move to compio runtime (#1960)
---
 Cargo.lock                                         | 418 ++++++++++++++-------
 core/server/Cargo.toml                             |   4 +-
 .../handlers/messages/poll_messages_handler.rs     |  25 +-
 .../binary/handlers/topics/create_topic_handler.rs |   3 +-
 core/server/src/binary/sender.rs                   |  17 +-
 core/server/src/bootstrap.rs                       |  35 +-
 .../src/compat/index_rebuilding/index_rebuilder.rs |  42 ++-
 core/server/src/io/file.rs                         | 117 ------
 core/server/src/io/fs_utils.rs                     |   2 +-
 core/server/src/io/mod.rs                          |   3 -
 core/server/src/io/reader.rs                       |  44 ---
 core/server/src/io/writer.rs                       |  57 ---
 core/server/src/log/runtime.rs                     |  48 +++
 core/server/src/main.rs                            |  52 ++-
 core/server/src/quic/quic_sender.rs                |   5 +-
 core/server/src/shard/mod.rs                       |  15 +-
 core/server/src/shard/system/snapshot/mod.rs       |  50 ++-
 core/server/src/shard/system/storage.rs            |  11 +-
 core/server/src/shard/system/streams.rs            |   2 +-
 core/server/src/shard/system/topics.rs             |  35 +-
 core/server/src/shard/task_registry.rs             |   8 +-
 core/server/src/shard/tasks/messages.rs            |   2 +-
 core/server/src/shard/transmission/event.rs        |   3 +-
 core/server/src/state/file.rs                      |  36 +-
 core/server/src/streaming/partitions/storage.rs    |  27 +-
 core/server/src/streaming/persistence/persister.rs |  24 +-
 core/server/src/streaming/persistence/task.rs      |  10 +-
 .../src/streaming/segments/indexes/index_reader.rs |  16 +-
 .../src/streaming/segments/indexes/index_writer.rs |  15 +-
 .../streaming/segments/messages/messages_reader.rs |  20 +-
 .../streaming/segments/messages/messages_writer.rs |  23 +-
 core/server/src/streaming/segments/messages/mod.rs |  32 +-
 core/server/src/streaming/segments/segment.rs      |   6 +-
 core/server/src/streaming/streams/storage.rs       |   6 +-
 core/server/src/streaming/topics/storage.rs        |   6 +-
 core/server/src/streaming/utils/file.rs            |  14 +-
 core/server/src/streaming/utils/pooled_buffer.rs   |  34 +-
 core/server/src/tcp/connection_handler.rs          |   1 +
 core/server/src/tcp/sender.rs                      |  62 +--
 core/server/src/tcp/tcp_listener.rs                |  12 +-
 core/server/src/tcp/tcp_sender.rs                  |   9 +-
 core/server/src/tcp/tcp_tls_listener.rs            | 185 ++++-----
 core/server/src/tcp/tcp_tls_sender.rs              |  18 +-
 43 files changed, 809 insertions(+), 745 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index ccecbd7b..d9383e8a 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -142,7 +142,7 @@ dependencies = [
  "actix-utils",
  "futures-core",
  "futures-util",
- "mio 1.0.4",
+ "mio",
  "socket2",
  "tokio",
  "tracing",
@@ -307,6 +307,15 @@ dependencies = [
  "memchr",
 ]
 
+[[package]]
+name = "aligned-array"
+version = "1.0.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "e05c92d086290f52938013f6242ac62bf7d401fab8ad36798a609faa65c3fd2c"
+dependencies = [
+ "generic-array",
+]
+
 [[package]]
 name = "alloc-no-stdlib"
 version = "2.0.4"
@@ -578,6 +587,12 @@ dependencies = [
  "syn 2.0.104",
 ]
 
+[[package]]
+name = "async-task"
+version = "4.7.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "8b75356056920673b02621b35afd0f7dda9306d03c79a30f5c56c44cf256e3de"
+
 [[package]]
 name = "async-trait"
 version = "0.1.88"
@@ -643,17 +658,6 @@ dependencies = [
  "webpki-roots 0.26.11",
 ]
 
-[[package]]
-name = "auto-const-array"
-version = "0.2.2"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "fd73835ad7deb4bd2b389e6f10333b143f025d607c55ca04c66a0bcc6bb2fc6d"
-dependencies = [
- "proc-macro2",
- "quote",
- "syn 2.0.104",
-]
-
 [[package]]
 name = "autocfg"
 version = "1.5.0"
@@ -1529,6 +1533,216 @@ dependencies = [
  "static_assertions",
 ]
 
+[[package]]
+name = "compio"
+version = "0.15.0"
+source = 
"git+https://github.com/compio-rs/compio.git?rev=5504e02ec2e821e9b024006224c34c06646410cc#5504e02ec2e821e9b024006224c34c06646410cc";
+dependencies = [
+ "compio-buf",
+ "compio-dispatcher",
+ "compio-driver",
+ "compio-fs",
+ "compio-io",
+ "compio-log",
+ "compio-macros",
+ "compio-net",
+ "compio-process",
+ "compio-quic",
+ "compio-runtime",
+ "compio-signal",
+ "compio-tls",
+]
+
+[[package]]
+name = "compio-buf"
+version = "0.6.0"
+source = 
"git+https://github.com/compio-rs/compio.git?rev=5504e02ec2e821e9b024006224c34c06646410cc#5504e02ec2e821e9b024006224c34c06646410cc";
+dependencies = [
+ "arrayvec",
+ "bytes",
+ "libc",
+]
+
+[[package]]
+name = "compio-dispatcher"
+version = "0.7.0"
+source = 
"git+https://github.com/compio-rs/compio.git?rev=5504e02ec2e821e9b024006224c34c06646410cc#5504e02ec2e821e9b024006224c34c06646410cc";
+dependencies = [
+ "compio-driver",
+ "compio-runtime",
+ "flume",
+ "futures-channel",
+]
+
+[[package]]
+name = "compio-driver"
+version = "0.8.1"
+source = 
"git+https://github.com/compio-rs/compio.git?rev=5504e02ec2e821e9b024006224c34c06646410cc#5504e02ec2e821e9b024006224c34c06646410cc";
+dependencies = [
+ "aligned-array",
+ "cfg-if",
+ "cfg_aliases",
+ "compio-buf",
+ "compio-log",
+ "crossbeam-channel",
+ "crossbeam-queue",
+ "futures-util",
+ "io-uring",
+ "io_uring_buf_ring",
+ "libc",
+ "once_cell",
+ "paste",
+ "polling",
+ "slab",
+ "socket2",
+ "windows-sys 0.52.0",
+]
+
+[[package]]
+name = "compio-fs"
+version = "0.8.0"
+source = 
"git+https://github.com/compio-rs/compio.git?rev=5504e02ec2e821e9b024006224c34c06646410cc#5504e02ec2e821e9b024006224c34c06646410cc";
+dependencies = [
+ "cfg-if",
+ "cfg_aliases",
+ "compio-buf",
+ "compio-driver",
+ "compio-io",
+ "compio-runtime",
+ "libc",
+ "os_pipe",
+ "widestring",
+ "windows-sys 0.52.0",
+]
+
+[[package]]
+name = "compio-io"
+version = "0.7.0"
+source = 
"git+https://github.com/compio-rs/compio.git?rev=5504e02ec2e821e9b024006224c34c06646410cc#5504e02ec2e821e9b024006224c34c06646410cc";
+dependencies = [
+ "compio-buf",
+ "futures-util",
+ "paste",
+ "pin-project-lite",
+]
+
+[[package]]
+name = "compio-log"
+version = "0.1.0"
+source = 
"git+https://github.com/compio-rs/compio.git?rev=5504e02ec2e821e9b024006224c34c06646410cc#5504e02ec2e821e9b024006224c34c06646410cc";
+dependencies = [
+ "tracing",
+]
+
+[[package]]
+name = "compio-macros"
+version = "0.1.2"
+source = 
"git+https://github.com/compio-rs/compio.git?rev=5504e02ec2e821e9b024006224c34c06646410cc#5504e02ec2e821e9b024006224c34c06646410cc";
+dependencies = [
+ "proc-macro-crate 3.3.0",
+ "proc-macro2",
+ "quote",
+ "syn 2.0.104",
+]
+
+[[package]]
+name = "compio-net"
+version = "0.8.0"
+source = 
"git+https://github.com/compio-rs/compio.git?rev=5504e02ec2e821e9b024006224c34c06646410cc#5504e02ec2e821e9b024006224c34c06646410cc";
+dependencies = [
+ "cfg-if",
+ "compio-buf",
+ "compio-driver",
+ "compio-io",
+ "compio-runtime",
+ "either",
+ "libc",
+ "once_cell",
+ "socket2",
+ "widestring",
+ "windows-sys 0.52.0",
+]
+
+[[package]]
+name = "compio-process"
+version = "0.5.0"
+source = 
"git+https://github.com/compio-rs/compio.git?rev=5504e02ec2e821e9b024006224c34c06646410cc#5504e02ec2e821e9b024006224c34c06646410cc";
+dependencies = [
+ "cfg-if",
+ "compio-buf",
+ "compio-driver",
+ "compio-io",
+ "compio-runtime",
+ "futures-util",
+ "windows-sys 0.52.0",
+]
+
+[[package]]
+name = "compio-quic"
+version = "0.4.0"
+source = 
"git+https://github.com/compio-rs/compio.git?rev=5504e02ec2e821e9b024006224c34c06646410cc#5504e02ec2e821e9b024006224c34c06646410cc";
+dependencies = [
+ "cfg_aliases",
+ "compio-buf",
+ "compio-io",
+ "compio-log",
+ "compio-net",
+ "compio-runtime",
+ "flume",
+ "futures-util",
+ "libc",
+ "quinn-proto",
+ "rustc-hash 2.1.1",
+ "rustls",
+ "thiserror 2.0.12",
+ "windows-sys 0.52.0",
+]
+
+[[package]]
+name = "compio-runtime"
+version = "0.8.1"
+source = 
"git+https://github.com/compio-rs/compio.git?rev=5504e02ec2e821e9b024006224c34c06646410cc#5504e02ec2e821e9b024006224c34c06646410cc";
+dependencies = [
+ "async-task",
+ "cfg-if",
+ "compio-buf",
+ "compio-driver",
+ "compio-log",
+ "crossbeam-queue",
+ "futures-util",
+ "libc",
+ "once_cell",
+ "scoped-tls",
+ "slab",
+ "socket2",
+ "windows-sys 0.52.0",
+]
+
+[[package]]
+name = "compio-signal"
+version = "0.6.0"
+source = 
"git+https://github.com/compio-rs/compio.git?rev=5504e02ec2e821e9b024006224c34c06646410cc#5504e02ec2e821e9b024006224c34c06646410cc";
+dependencies = [
+ "compio-buf",
+ "compio-driver",
+ "compio-runtime",
+ "libc",
+ "once_cell",
+ "os_pipe",
+ "slab",
+ "windows-sys 0.52.0",
+]
+
+[[package]]
+name = "compio-tls"
+version = "0.6.0"
+source = 
"git+https://github.com/compio-rs/compio.git?rev=5504e02ec2e821e9b024006224c34c06646410cc#5504e02ec2e821e9b024006224c34c06646410cc";
+dependencies = [
+ "compio-buf",
+ "compio-io",
+ "rustls",
+]
+
 [[package]]
 name = "concurrent-queue"
 version = "2.5.0"
@@ -1696,6 +1910,17 @@ version = "0.8.7"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b"
 
+[[package]]
+name = "core_affinity"
+version = "0.8.3"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "a034b3a7b624016c6e13f5df875747cc25f884156aad2abd12b6c46797971342"
+dependencies = [
+ "libc",
+ "num_cpus",
+ "winapi",
+]
+
 [[package]]
 name = "cpufeatures"
 version = "0.2.17"
@@ -1846,7 +2071,7 @@ version = "3.4.7"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "46f93780a459b7d656ef7f071fe699c4d3d2cb201c4b24d085b6ddc505276e73"
 dependencies = [
- "nix 0.30.1",
+ "nix",
  "windows-sys 0.59.0",
 ]
 
@@ -2682,15 +2907,6 @@ dependencies = [
  "slab",
 ]
 
-[[package]]
-name = "fxhash"
-version = "0.2.1"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "c31b6d751ae2c7f11320402d34e41349dd1016f8d5d45e48c4312bc8625af50c"
-dependencies = [
- "byteorder",
-]
-
 [[package]]
 name = "generator"
 version = "0.8.5"
@@ -4143,14 +4359,26 @@ dependencies = [
 
 [[package]]
 name = "io-uring"
-version = "0.6.4"
+version = "0.7.8"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "595a0399f411a508feb2ec1e970a4a30c249351e30208960d58298de8660b0e5"
+checksum = "b86e202f00093dcba4275d4636b93ef9dd75d025ae560d2521b45ea28ab49013"
 dependencies = [
- "bitflags 1.3.2",
+ "bitflags 2.9.1",
+ "cfg-if",
  "libc",
 ]
 
+[[package]]
+name = "io_uring_buf_ring"
+version = "0.2.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "8a8867874ff5758b47c1dac069e6e86541432f9da8be9111c5e94154134f07d0"
+dependencies = [
+ "bytes",
+ "io-uring",
+ "rustix 1.0.7",
+]
+
 [[package]]
 name = "ipnet"
 version = "2.11.0"
@@ -4657,15 +4885,6 @@ version = "2.7.5"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "32a282da65faaf38286cf3be983213fcf1d2e2a58700e808f83f4ea9a4804bc0"
 
-[[package]]
-name = "memoffset"
-version = "0.7.1"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "5de893c32cde5f383baa4c04c5d6dbdd735cfd4a794b0debdb2bb1b421da5ff4"
-dependencies = [
- "autocfg",
-]
-
 [[package]]
 name = "mimalloc"
 version = "0.1.47"
@@ -4715,18 +4934,6 @@ dependencies = [
  "adler2",
 ]
 
-[[package]]
-name = "mio"
-version = "0.8.11"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "a4a650543ca06a924e8b371db273b2756685faae30f8487da1b56505a8f78b0c"
-dependencies = [
- "libc",
- "log",
- "wasi 0.11.1+wasi-snapshot-preview1",
- "windows-sys 0.48.0",
-]
-
 [[package]]
 name = "mio"
 version = "1.0.4"
@@ -4787,62 +4994,6 @@ dependencies = [
  "uuid",
 ]
 
-[[package]]
-name = "monoio"
-version = "0.2.4"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "3bd0f8bcde87b1949f95338b547543fcab187bc7e7a5024247e359a5e828ba6a"
-dependencies = [
- "auto-const-array",
- "bytes",
- "flume",
- "fxhash",
- "io-uring",
- "libc",
- "memchr",
- "mio 0.8.11",
- "monoio-macros",
- "nix 0.26.4",
- "once_cell",
- "pin-project-lite",
- "socket2",
- "threadpool",
- "windows-sys 0.48.0",
-]
-
-[[package]]
-name = "monoio-io-wrapper"
-version = "0.1.1"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "4bcfaa76e5daf87cc4d31b4d1b6bc93c12db59c19df50b9200afdbde42077655"
-dependencies = [
- "monoio",
-]
-
-[[package]]
-name = "monoio-macros"
-version = "0.1.0"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "176a5f5e69613d9e88337cf2a65e11135332b4efbcc628404a7c555e4452084c"
-dependencies = [
- "proc-macro2",
- "quote",
- "syn 2.0.104",
-]
-
-[[package]]
-name = "monoio-rustls"
-version = "0.4.0"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "6e31f422825bd7fb19957af6eaf89d7234ba143fcc0e515f5a2f526e332d1875"
-dependencies = [
- "bytes",
- "monoio",
- "monoio-io-wrapper",
- "rustls",
- "thiserror 1.0.69",
-]
-
 [[package]]
 name = "nanorand"
 version = "0.7.0"
@@ -4858,19 +5009,6 @@ version = "6.6.666"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "cf5a574dadd7941adeaa71823ecba5e28331b8313fb2e1c6a5c7e5981ea53ad6"
 
-[[package]]
-name = "nix"
-version = "0.26.4"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "598beaf3cc6fdd9a5dfb1630c2800c7acd31df7aaf0f565796fba2b53ca1af1b"
-dependencies = [
- "bitflags 1.3.2",
- "cfg-if",
- "libc",
- "memoffset",
- "pin-utils",
-]
-
 [[package]]
 name = "nix"
 version = "0.30.1"
@@ -4935,7 +5073,7 @@ dependencies = [
  "kqueue",
  "libc",
  "log",
- "mio 1.0.4",
+ "mio",
  "notify-types",
  "walkdir",
  "windows-sys 0.59.0",
@@ -5351,6 +5489,16 @@ dependencies = [
  "hashbrown 0.14.5",
 ]
 
+[[package]]
+name = "os_pipe"
+version = "1.2.2"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "db335f4760b14ead6290116f2427bf33a14d4f0617d49f78a246de10c1831224"
+dependencies = [
+ "libc",
+ "windows-sys 0.59.0",
+]
+
 [[package]]
 name = "overload"
 version = "0.1.1"
@@ -5602,6 +5750,21 @@ version = "0.3.32"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "7edddbd0b52d732b21ad9a5fab5c704c14cd949e5e9a1ec5929a24fded1b904c"
 
+[[package]]
+name = "polling"
+version = "3.8.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "b53a684391ad002dd6a596ceb6c74fd004fdce75f4be2e3f615068abbea5fd50"
+dependencies = [
+ "cfg-if",
+ "concurrent-queue",
+ "hermit-abi",
+ "pin-project-lite",
+ "rustix 1.0.7",
+ "tracing",
+ "windows-sys 0.59.0",
+]
+
 [[package]]
 name = "polonius-the-crab"
 version = "0.2.1"
@@ -6874,7 +7037,9 @@ dependencies = [
  "bytes",
  "chrono",
  "clap",
+ "compio",
  "console-subscriber",
+ "core_affinity",
  "crossbeam",
  "ctrlc",
  "dashmap",
@@ -6894,9 +7059,7 @@ dependencies = [
  "mimalloc",
  "mockall",
  "moka",
- "monoio",
- "monoio-rustls",
- "nix 0.30.1",
+ "nix",
  "once_cell",
  "opentelemetry",
  "opentelemetry-appender-tracing",
@@ -7409,15 +7572,6 @@ dependencies = [
  "cfg-if",
 ]
 
-[[package]]
-name = "threadpool"
-version = "1.8.1"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "d050e60b33d41c19108b32cea32164033a9013fe3b46cbd4457559bfbf77afaa"
-dependencies = [
- "num_cpus",
-]
-
 [[package]]
 name = "time"
 version = "0.3.41"
@@ -7494,7 +7648,7 @@ dependencies = [
  "backtrace",
  "bytes",
  "libc",
- "mio 1.0.4",
+ "mio",
  "parking_lot 0.12.4",
  "pin-project-lite",
  "signal-hook-registry",
@@ -8330,6 +8484,12 @@ dependencies = [
  "safe_arch",
 ]
 
+[[package]]
+name = "widestring"
+version = "1.2.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "dd7cf3379ca1aac9eea11fba24fd7e315d621f8dfe35c8d7d2be8b793726e07d"
+
 [[package]]
 name = "winapi"
 version = "0.3.9"
diff --git a/core/server/Cargo.toml b/core/server/Cargo.toml
index f91ac15c..57ab996a 100644
--- a/core/server/Cargo.toml
+++ b/core/server/Cargo.toml
@@ -63,12 +63,12 @@ human-repr = { workspace = true }
 iggy_common = { workspace = true }
 jsonwebtoken = "9.3.1"
 socket2 = "0.5.10"
+core_affinity = "0.8.0"
 lending-iterator = "0.1.7"
 hash32 = "1.0.0"
 mimalloc = { workspace = true, optional = true }
 moka = { version = "0.12.10", features = ["future"] }
-monoio = { version = "0.2.4", features = ["mkdirat", "unlinkat", "renameat", 
"sync"] }
-monoio-rustls = "0.4.0"
+compio =  { git = "https://github.com/compio-rs/compio.git";, rev = 
"5504e02ec2e821e9b024006224c34c06646410cc", features = ["runtime", "macros", 
"io-uring", "time", "rustls"] }
 nix = { version = "0.30", features = ["fs"] }
 once_cell = "1.21.3"
 opentelemetry = { version = "0.30.0", features = ["trace", "logs"] }
diff --git a/core/server/src/binary/handlers/messages/poll_messages_handler.rs 
b/core/server/src/binary/handlers/messages/poll_messages_handler.rs
index dfc4a89f..2f02a039 100644
--- a/core/server/src/binary/handlers/messages/poll_messages_handler.rs
+++ b/core/server/src/binary/handlers/messages/poll_messages_handler.rs
@@ -27,8 +27,10 @@ use crate::shard::transmission::frame::ShardResponse;
 use crate::shard::transmission::message::{ShardMessage, ShardRequest, 
ShardRequestPayload};
 use crate::streaming::segments::IggyMessagesBatchSet;
 use crate::streaming::session::Session;
+use crate::streaming::utils::PooledBuffer;
 use crate::to_iovec;
 use anyhow::Result;
+use bytes::BytesMut;
 use error_set::ErrContext;
 use iggy_common::{IggyError, PollMessages};
 use std::io::IoSlice;
@@ -76,7 +78,7 @@ impl ServerCommandHandler for PollMessages {
 
         let user_id = session.get_user_id();
         let client_id = session.client_id;
-        let (metadata, batch) = shard
+        let (metadata, mut batch) = shard
             .poll_messages(
                 client_id,
                 user_id,
@@ -96,16 +98,19 @@ impl ServerCommandHandler for PollMessages {
         let response_length = 4 + 8 + 4 + batch.size();
         let response_length_bytes = response_length.to_le_bytes();
 
-        let partition_id = metadata.partition_id.to_le_bytes();
-        let current_offset = metadata.current_offset.to_le_bytes();
-        let count = batch.count().to_le_bytes();
+        let mut bufs = Vec::with_capacity(batch.containers_count() + 5);
+        let mut partition_id_buf = PooledBuffer::with_capacity(4);
+        let mut current_offset_buf = PooledBuffer::with_capacity(8);
+        let mut count_buf = PooledBuffer::with_capacity(4);
+        partition_id_buf.put_u32_le(metadata.partition_id);
+        current_offset_buf.put_u64_le(metadata.current_offset);
+        count_buf.put_u32_le(batch.count());
 
-        let mut iovecs = Vec::with_capacity(batch.containers_count() + 3);
-        iovecs.push(to_iovec(&partition_id));
-        iovecs.push(to_iovec(&current_offset));
-        iovecs.push(to_iovec(&count));
+        bufs.push(partition_id_buf);
+        bufs.push(current_offset_buf);
+        bufs.push(count_buf);
 
-        iovecs.extend(batch.iter().map(|m| to_iovec(&m)));
+        batch.iter_mut().for_each(|m| bufs.push(m.take_messages()));
         trace!(
             "Sending {} messages to client ({} bytes) to client",
             batch.count(),
@@ -113,7 +118,7 @@ impl ServerCommandHandler for PollMessages {
         );
 
         sender
-            .send_ok_response_vectored(&response_length_bytes, iovecs)
+            .send_ok_response_vectored(&response_length_bytes, bufs)
             .await?;
         Ok(())
     }
diff --git a/core/server/src/binary/handlers/topics/create_topic_handler.rs 
b/core/server/src/binary/handlers/topics/create_topic_handler.rs
index 6785e4f0..51088b55 100644
--- a/core/server/src/binary/handlers/topics/create_topic_handler.rs
+++ b/core/server/src/binary/handlers/topics/create_topic_handler.rs
@@ -48,7 +48,7 @@ impl ServerCommandHandler for CreateTopic {
         debug!("session: {session}, command: {self}");
         let stream_id = self.stream_id.clone();
         let topic_id = self.topic_id;
-        let created_topic_id = shard
+        let (shards_assignment, created_topic_id) = shard
                 .create_topic(
                     session,
                     &self.stream_id,
@@ -72,6 +72,7 @@ impl ServerCommandHandler for CreateTopic {
             compression_algorithm: self.compression_algorithm,
             max_topic_size: self.max_topic_size,
             replication_factor: self.replication_factor,
+            shards_assignment,
         };
         // Broadcast the event to all shards.
         let _responses = shard.broadcast_event_to_all_shards(event.into());
diff --git a/core/server/src/binary/sender.rs b/core/server/src/binary/sender.rs
index 45066c75..96bed3d5 100644
--- a/core/server/src/binary/sender.rs
+++ b/core/server/src/binary/sender.rs
@@ -19,17 +19,17 @@
 use std::future::Future;
 use std::io::IoSlice;
 
+use crate::streaming::utils::PooledBuffer;
 use crate::tcp::tcp_sender::TcpSender;
 use crate::tcp::tcp_tls_sender::TcpTlsSender;
 use crate::{quic::quic_sender::QuicSender, server_error::ServerError};
 use bytes::BytesMut;
+use compio::buf::{IoBuf, IoBufMut};
+use compio::io::{AsyncReadExt, AsyncWriteExt};
+use compio::net::TcpStream;
 use iggy_common::IggyError;
-use monoio::buf::IoBufMut;
-use monoio::io::{AsyncReadRent, AsyncWriteRent};
-use monoio::net::TcpStream;
 use nix::libc;
 use quinn::{RecvStream, SendStream};
-use monoio_rustls::{ServerTlsStream, TlsStream};
 
 macro_rules! forward_async_methods {
     (
@@ -65,7 +65,7 @@ pub trait Sender {
     fn send_ok_response_vectored(
         &mut self,
         length: &[u8],
-        slices: Vec<libc::iovec>,
+        slices: Vec<PooledBuffer>,
     ) -> impl Future<Output = Result<(), IggyError>>;
     fn send_error_response(
         &mut self,
@@ -85,8 +85,9 @@ impl SenderKind {
         Self::Tcp(TcpSender { stream })
     }
 
-    pub fn get_tcp_tls_sender(stream: ServerTlsStream<TcpStream>) -> Self {
-        Self::TcpTls(TcpTlsSender { stream })
+    pub fn get_tcp_tls_sender(stream: ()) -> Self {
+        todo!();
+        //Self::TcpTls(TcpTlsSender { stream })
     }
 
     pub fn get_quic_sender(send_stream: SendStream, recv_stream: RecvStream) 
-> Self {
@@ -100,7 +101,7 @@ impl SenderKind {
         async fn read<B: IoBufMut>(&mut self, buffer: B) -> (Result<usize, 
IggyError>, B);
         async fn send_empty_ok_response(&mut self) -> Result<(), IggyError>;
         async fn send_ok_response(&mut self, payload: &[u8]) -> Result<(), 
IggyError>;
-        async fn send_ok_response_vectored(&mut self, length: &[u8], slices: 
Vec<libc::iovec>) -> Result<(), IggyError>;
+        async fn send_ok_response_vectored(&mut self, length: &[u8], slices: 
Vec<PooledBuffer>) -> Result<(), IggyError>;
         async fn send_error_response(&mut self, error: IggyError) -> 
Result<(), IggyError>;
         async fn shutdown(&mut self) -> Result<(), ServerError>;
     }
diff --git a/core/server/src/bootstrap.rs b/core/server/src/bootstrap.rs
index ddd0599c..6a2eb8af 100644
--- a/core/server/src/bootstrap.rs
+++ b/core/server/src/bootstrap.rs
@@ -1,3 +1,4 @@
+use compio::{fs::create_dir_all, runtime::Runtime};
 use iggy_common::{
     IggyError,
     defaults::{
@@ -5,7 +6,6 @@ use iggy_common::{
         MIN_PASSWORD_LENGTH, MIN_USERNAME_LENGTH,
     },
 };
-use monoio::{Buildable, Driver, Runtime, fs::create_dir_all, time::TimeDriver};
 use tracing::info;
 
 use crate::{
@@ -121,24 +121,21 @@ pub fn create_root_user() -> User {
     user
 }
 
-pub fn create_default_executor<D>() -> Runtime<D>
-where
-    D: Driver + Buildable,
-{
-    let builder = monoio::RuntimeBuilder::<D>::new();
-    let rt = Buildable::build(builder).expect("Failed to create default 
runtime");
-    rt
-}
-
-pub fn create_shard_executor() -> Runtime<TimeDriver<monoio::IoUringDriver>> {
-    // TODO: Figure out what else we could tweak there
-    // We for sure want to disable the userspace interrupts on new cq entry 
(set_coop_taskrun)
-    // TODO: Shall we make the size of ring be configureable ?
-    let builder = monoio::RuntimeBuilder::<monoio::IoUringDriver>::new()
-        //.uring_builder(urb.setup_coop_taskrun()) // broken shit.
-        .with_entries(1024) // Default size
-        .enable_timer();
-    let rt = Buildable::build(builder).expect("Failed to create default 
runtime");
+pub fn create_shard_executor() -> Runtime {
+    //TODO: The event intererval tick, could be configureed based on the fact
+    // How many clients we expect to have connected.
+    // This roughly estimates the number of tasks we will create.
+    let proactor = compio::driver::ProactorBuilder::new()
+        .capacity(4096)
+        .coop_taskrun(true)
+        .taskrun_flag(false) //TODO: Try enabling this.
+        .thread_pool_limit(0)
+        .to_owned();
+    let rt = compio::runtime::RuntimeBuilder::new()
+        .with_proactor(proactor)
+        .event_interval(69)
+        .build()
+        .unwrap();
     rt
 }
 
diff --git a/core/server/src/compat/index_rebuilding/index_rebuilder.rs 
b/core/server/src/compat/index_rebuilding/index_rebuilder.rs
index 1eebdacd..dbc93ebe 100644
--- a/core/server/src/compat/index_rebuilding/index_rebuilder.rs
+++ b/core/server/src/compat/index_rebuilding/index_rebuilder.rs
@@ -16,12 +16,14 @@
  * under the License.
  */
 
-use crate::io::file::IggyFile;
-use crate::io::writer::IggyWriter;
+use crate::server_error::CompatError;
 use crate::streaming::utils::file;
-use crate::{io::reader::IggyReader, server_error::CompatError};
+use async_zip::tokio::write;
+use compio::{
+    fs::File,
+    io::{AsyncBufRead, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, 
BufReader, BufWriter},
+};
 use iggy_common::{IGGY_MESSAGE_HEADER_SIZE, IggyMessageHeader};
-use monoio::io::{AsyncReadRent, AsyncReadRentExt, AsyncWriteRent, 
AsyncWriteRentExt};
 use std::io::{Seek, SeekFrom};
 
 pub struct IndexRebuilder {
@@ -40,17 +42,17 @@ impl IndexRebuilder {
     }
 
     async fn read_message_header(
-        reader: &mut IggyReader<IggyFile>,
+        reader: &mut BufReader<std::io::Cursor<File>>,
     ) -> Result<IggyMessageHeader, std::io::Error> {
         let buf = [0u8; IGGY_MESSAGE_HEADER_SIZE];
-        let (result, buf) = reader.read_exact(Box::new(buf)).await;
+        let (result, buf) = reader.read_exact(Box::new(buf)).await.into();
         result?;
         IggyMessageHeader::from_raw_bytes(&*buf)
             .map_err(|_| std::io::Error::from(std::io::ErrorKind::InvalidData))
     }
 
     async fn write_index_entry(
-        writer: &mut IggyWriter<IggyFile>,
+        writer: &mut BufWriter<std::io::Cursor<File>>,
         header: &IggyMessageHeader,
         position: usize,
         start_offset: u64,
@@ -58,26 +60,34 @@ impl IndexRebuilder {
         // Write offset (4 bytes) - base_offset + last_offset_delta - 
start_offset
         let offset = start_offset - header.offset;
         debug_assert!(offset <= u32::MAX as u64);
-        let (result, _) = 
writer.write_all(Box::new(offset.to_le_bytes())).await;
+        let (result, _) = writer
+            .write_all(Box::new(offset.to_le_bytes()))
+            .await
+            .into();
         result?;
 
         // Write position (4 bytes)
-        let (result, _) = 
writer.write_all(Box::new(position.to_le_bytes())).await;
+        let (result, _) = writer
+            .write_all(Box::new(position.to_le_bytes()))
+            .await
+            .into();
         result?;
 
         // Write timestamp (8 bytes)
         let (result, _) = writer
             .write_all(Box::new(header.timestamp.to_le_bytes()))
-            .await;
+            .await
+            .into();
         result?;
 
         Ok(())
     }
 
     pub async fn rebuild(&self) -> Result<(), CompatError> {
-        let mut reader =
-            
IggyReader::new(IggyFile::new(file::open(&self.messages_file_path).await?));
-        let mut writer = 
IggyWriter::new(IggyFile::new(file::overwrite(&self.index_path).await?));
+        let read_cursor = 
std::io::Cursor::new(file::open(&self.messages_file_path).await?);
+        let write_cursor = 
std::io::Cursor::new(file::overwrite(&self.index_path).await?);
+        let mut reader = BufReader::new(read_cursor);
+        let mut writer = BufWriter::new(write_cursor);
         let mut position = 0;
         let mut next_position;
 
@@ -93,9 +103,9 @@ impl IndexRebuilder {
                         .await?;
 
                     // Skip message payload and headers
-                    reader.seek(SeekFrom::Current(
-                        header.payload_length as i64 + 
header.user_headers_length as i64,
-                    ))?;
+                    reader.consume(
+                        header.payload_length as usize + 
header.user_headers_length as usize,
+                    );
 
                     // Update position for next iteration
                     position = next_position;
diff --git a/core/server/src/io/file.rs b/core/server/src/io/file.rs
deleted file mode 100644
index 95129aa8..00000000
--- a/core/server/src/io/file.rs
+++ /dev/null
@@ -1,117 +0,0 @@
-use std::io::SeekFrom;
-
-use monoio::{
-    BufResult,
-    buf::{IoBuf, IoBufMut, IoVecBuf, IoVecBufMut, IoVecWrapper},
-    fs::{File, Metadata},
-    io::{AsyncReadRent, AsyncWriteRent},
-};
-
-/// Wrapper around `monoio::fs::File` to provide a consistent API for reading 
and writing files
-/// in an asynchronous context. This struct maintains the current position in 
the file and provides
-/// methods to read and write data at specific positions.
-#[derive(Debug)]
-pub struct IggyFile {
-    file: File,
-    position: u64,
-}
-
-impl From<File> for IggyFile {
-    fn from(file: File) -> Self {
-        Self { file, position: 0 }
-    }
-}
-
-impl std::io::Seek for IggyFile {
-    /// This method doesn't do bound checking aswell as, the `End` variant is 
not supported.
-    fn seek(&mut self, pos: SeekFrom) -> std::io::Result<u64> {
-        self.position = match pos {
-            SeekFrom::Start(n) => n as i64,
-            SeekFrom::End(_) => unreachable!("End variant is not supported in 
IggyFile"),
-            SeekFrom::Current(n) => self.position as i64 + n,
-        } as u64;
-        Ok(self.position)
-    }
-}
-
-impl IggyFile {
-    pub fn new(file: File) -> Self {
-        Self { file, position: 0 }
-    }
-
-    pub async fn metadata(&self) -> std::io::Result<Metadata> {
-        self.file.metadata().await
-    }
-
-    pub async fn write_at<T: IoBuf>(&self, buf: T, pos: usize) -> 
BufResult<usize, T> {
-        self.file.write_at(buf, pos as u64).await
-    }
-
-    pub async fn write_all_at<T: IoBuf>(&self, buf: T, pos: u64) -> 
BufResult<(), T> {
-        self.file.write_all_at(buf, pos).await
-    }
-
-    pub async fn read_exact_at<T: IoBufMut>(&self, buf: T, pos: u64) -> 
BufResult<(), T> {
-        self.file.read_exact_at(buf, pos).await
-    }
-
-    pub async fn read_at<T: IoBufMut>(&self, buf: T, pos: usize) -> 
BufResult<usize, T> {
-        self.file.read_at(buf, pos as u64).await
-    }
-
-    pub async fn sync_all(&self) -> std::io::Result<()> {
-        self.file.sync_all().await
-    }
-}
-
-impl AsyncReadRent for IggyFile {
-    /// Reads `exactly` `buf.len()` bytes into the buffer.
-    async fn read<T: IoBufMut>(&mut self, buf: T) -> BufResult<usize, T> {
-        let (res, buf) = self.file.read_at(buf, self.position).await;
-        let n = match res {
-            Ok(n) => n,
-            Err(e) => return (Err(e), buf),
-        };
-        self.position += n as u64;
-        (Ok(n), buf)
-    }
-
-    //TODO(numinex) - maybe implement this ?
-    fn readv<T: IoVecBufMut>(&mut self, buf: T) -> impl Future<Output = 
BufResult<usize, T>> {
-        async move { (Ok(0), buf) }
-    }
-}
-
-impl AsyncWriteRent for IggyFile {
-    /// Writes entire buffer to the file at the current position.
-    async fn write<B: IoBuf>(&mut self, buf: B) -> BufResult<usize, B> {
-        let (res, buf) = self.file.write_at(buf, self.position).await;
-        let n = match res {
-            Ok(n) => n,
-            Err(e) => return (Err(e), buf),
-        };
-        self.position += n as u64;
-        (Ok(n), buf)
-    }
-
-    // This is bait!!!!
-    async fn writev<T: IoVecBuf>(&mut self, buf_vec: T) -> BufResult<usize, T> 
{
-        unimplemented!("writev is not implemented for IggyFile");
-    }
-
-    async fn flush(&mut self) -> std::io::Result<()> {
-        self.file.sync_all().await
-    }
-
-    //TODO(numinex) - How to implement this ?
-    async fn shutdown(&mut self) -> std::io::Result<()> {
-        panic!("shutdown is not supported for IggyFile");
-        /*
-                self.file.sync_all().await;
-                unsafe {
-                    let file = *self.file;
-                    file.close().await
-                }
-        */
-    }
-}
diff --git a/core/server/src/io/fs_utils.rs b/core/server/src/io/fs_utils.rs
index d9be21e7..6bbc1b7f 100644
--- a/core/server/src/io/fs_utils.rs
+++ b/core/server/src/io/fs_utils.rs
@@ -16,7 +16,7 @@
  * under the License.
  */
 
-use monoio::fs;
+use compio::fs;
 use std::io;
 use std::path::{Path, PathBuf};
 
diff --git a/core/server/src/io/mod.rs b/core/server/src/io/mod.rs
index 686717cd..c3140c9e 100644
--- a/core/server/src/io/mod.rs
+++ b/core/server/src/io/mod.rs
@@ -1,4 +1 @@
-pub mod file;
 pub mod fs_utils;
-pub mod reader;
-pub mod writer;
diff --git a/core/server/src/io/reader.rs b/core/server/src/io/reader.rs
deleted file mode 100644
index 63443214..00000000
--- a/core/server/src/io/reader.rs
+++ /dev/null
@@ -1,44 +0,0 @@
-use std::io::{Seek, SeekFrom};
-
-use monoio::{
-    BufResult,
-    buf::{IoBufMut, IoVecBufMut},
-    io::{AsyncReadRent, BufReader},
-};
-
-pub struct IggyReader<R: AsyncReadRent + Seek> {
-    inner: BufReader<R>,
-}
-
-impl<R: AsyncReadRent + Seek> IggyReader<R> {
-    pub fn new(reader: R) -> Self {
-        Self {
-            inner: BufReader::new(reader),
-        }
-    }
-
-    pub fn with_capacity(capacity: usize, reader: R) -> Self {
-        Self {
-            inner: BufReader::with_capacity(capacity, reader),
-        }
-    }
-}
-
-impl<R: AsyncReadRent + Seek> Seek for IggyReader<R> {
-    fn seek(&mut self, pos: SeekFrom) -> std::io::Result<u64> {
-        self.inner.get_mut().seek(pos)
-    }
-}
-
-impl<R> AsyncReadRent for IggyReader<R>
-where
-    R: AsyncReadRent + Seek,
-{
-    fn read<T: IoBufMut>(&mut self, buf: T) -> impl Future<Output = 
BufResult<usize, T>> {
-        self.inner.read(buf)
-    }
-
-    fn readv<T: IoVecBufMut>(&mut self, buf: T) -> impl Future<Output = 
BufResult<usize, T>> {
-        self.inner.readv(buf)
-    }
-}
diff --git a/core/server/src/io/writer.rs b/core/server/src/io/writer.rs
deleted file mode 100644
index e249e405..00000000
--- a/core/server/src/io/writer.rs
+++ /dev/null
@@ -1,57 +0,0 @@
-use std::io::{Seek, SeekFrom};
-
-use monoio::{
-    BufResult,
-    io::{AsyncWriteRent, BufWriter},
-};
-
-pub struct IggyWriter<W: AsyncWriteRent + Seek> {
-    inner: BufWriter<W>,
-}
-
-impl<W: AsyncWriteRent + Seek> IggyWriter<W> {
-    pub fn new(writer: W) -> Self {
-        Self {
-            inner: BufWriter::new(writer),
-        }
-    }
-
-    pub fn with_capacity(capacity: usize, writer: W) -> Self {
-        Self {
-            inner: BufWriter::with_capacity(capacity, writer),
-        }
-    }
-}
-
-impl<W: AsyncWriteRent + Seek> Seek for IggyWriter<W> {
-    fn seek(&mut self, pos: SeekFrom) -> std::io::Result<u64> {
-        self.inner.get_mut().seek(pos)
-    }
-}
-
-impl<W> AsyncWriteRent for IggyWriter<W>
-where
-    W: AsyncWriteRent + Seek,
-{
-    fn write<T: monoio::buf::IoBuf>(
-        &mut self,
-        buf: T,
-    ) -> impl Future<Output = BufResult<usize, T>> {
-        self.inner.write(buf)
-    }
-
-    fn writev<T: monoio::buf::IoVecBuf>(
-        &mut self,
-        buf_vec: T,
-    ) -> impl Future<Output = BufResult<usize, T>> {
-        self.inner.writev(buf_vec)
-    }
-
-    fn flush(&mut self) -> impl Future<Output = std::io::Result<()>> {
-        self.inner.flush()
-    }
-
-    fn shutdown(&mut self) -> impl Future<Output = std::io::Result<()>> {
-        self.inner.shutdown()
-    }
-}
diff --git a/core/server/src/log/runtime.rs b/core/server/src/log/runtime.rs
new file mode 100644
index 00000000..36bbdd80
--- /dev/null
+++ b/core/server/src/log/runtime.rs
@@ -0,0 +1,48 @@
+use std::{pin::Pin, time::Duration};
+
+use futures::{FutureExt, SinkExt, Stream};
+use opentelemetry_sdk::runtime::{Runtime, RuntimeChannel, TrySend};
+
+#[derive(Clone)]
+pub struct MonoioRuntime;
+
+impl Runtime for MonoioRuntime {
+    fn spawn<F>(&self, future: F)
+    where
+        F: Future<Output = ()> + Send + 'static,
+    {
+        // TODO: This wont' work, we init Opentelemetry in the main thread, 
when there is no instance of monoio runtime
+        // running yet....
+        monoio::spawn(future);
+    }
+
+    fn delay(&self, duration: Duration) -> impl Future<Output = ()> + Send + 
'static {
+        let sleep = Sleep::new(duration);
+        sleep
+    }
+}
+
+
+pub struct Sleep {
+    pub inner: Pin<Box<monoio::time::Sleep>>,
+}
+
+impl Sleep {
+    pub fn new(duration: Duration) -> Self {
+        Self {
+            inner: Box::pin(monoio::time::sleep(duration)),
+        }
+    }
+}
+
+impl Future for Sleep {
+    type Output = ();
+
+    fn poll(mut self: std::pin::Pin<&mut Self>, cx: &mut 
std::task::Context<'_>) -> std::task::Poll<Self::Output> {
+        self.inner.as_mut().poll_unpin(cx)
+    }
+}
+
+// Safety: There is no way for `Sleep` future to be flipped like a burger to 
another thread,
+// because we create instance of OpenTelemetry SDK runtime in the main thread, 
and monoio futures don't require Send & Sync bounds.
+unsafe impl Send for Sleep {}
\ No newline at end of file
diff --git a/core/server/src/main.rs b/core/server/src/main.rs
index 53fa2d7b..5d5f8698 100644
--- a/core/server/src/main.rs
+++ b/core/server/src/main.rs
@@ -30,8 +30,8 @@ use iggy_common::defaults::DEFAULT_ROOT_USER_ID;
 use iggy_common::{Aes256GcmEncryptor, EncryptorKind, IggyError};
 use server::args::Args;
 use server::bootstrap::{
-    create_default_executor, create_directories, create_root_user, 
create_shard_connections,
-    create_shard_executor, load_config, resolve_persister,
+    create_directories, create_root_user, create_shard_connections, 
create_shard_executor,
+    load_config, resolve_persister,
 };
 use server::configs::config_provider::{self};
 use server::configs::server::ServerConfig;
@@ -84,7 +84,8 @@ fn main() -> Result<(), ServerError> {
     let config_provider = config_provider::resolve(&args.config_provider)?;
     // Load config and create directories.
     // Remove `local_data` directory if run with `--fresh` flag.
-    let mut rt = create_default_executor::<monoio::IoUringDriver>();
+    // TODO: Replace this once we use `monoio::main` macro.
+    let rt = create_shard_executor();
     let config = rt
         .block_on(async {
             let config = load_config(&config_provider)
@@ -94,7 +95,7 @@ fn main() -> Result<(), ServerError> {
                 })?;
             if args.fresh {
                 let system_path = config.system.get_system_path();
-                if monoio::fs::metadata(&system_path).await.is_ok() {
+                if compio::fs::metadata(&system_path).await.is_ok() {
                     println!(
                         "Removing system path at: {} because `--fresh` flag 
was set",
                         system_path
@@ -122,6 +123,7 @@ fn main() -> Result<(), ServerError> {
 
     // TODO: Make this configurable from config as a range
     // for example this instance of Iggy will use cores from 0..4
+    let core_ids = core_affinity::get_core_ids().unwrap();
     let available_cpus = available_parallelism().expect("Failed to get num of 
cores");
     let shards_count = available_cpus.into();
     let shards_set = 0..shards_count;
@@ -136,14 +138,24 @@ fn main() -> Result<(), ServerError> {
         let config = config.clone();
         let state_persister = 
resolve_persister(config.system.state.enforce_fsync);
 
+        let core_ids = core_ids.clone();
         let handle = std::thread::Builder::new()
             .name(format!("shard-{id}"))
             .spawn(move || {
+                let core_ids = core_ids.clone();
                 MemoryPool::init_pool(config.system.clone());
-                monoio::utils::bind_to_cpu_set(Some(shard_id))
+                if core_ids.len() > shard_id {
+                    core_affinity::set_for_current(core_ids[shard_id]);
+                }
+
+                // TODO: Fix this once this PR gets resolved.
+                // https://github.com/compio-rs/compio/pull/445 
+                /*
+                compio::utils::bind_to_cpu_set(Some(shard_id))
                     .unwrap_or_else(|e| panic!("Failed to set CPU affinity for 
shard-{id}: {e}"));
+                */
 
-                let mut rt = create_shard_executor();
+                let rt = create_shard_executor();
                 rt.block_on(async move {
                     let version = SemanticVersion::current().expect("Invalid 
version");
                     info!(
@@ -253,20 +265,22 @@ fn main() -> Result<(), ServerError> {
     }
 
     let shutdown_handles_for_signal = shutdown_handles.clone();
-    ctrlc::set_handler(move || {
-        info!("Received shutdown signal (SIGTERM/SIGINT), initiating graceful 
shutdown...");
-
-        for (shard_id, stop_sender) in &shutdown_handles_for_signal {
-            info!("Sending shutdown signal to shard {}", shard_id);
-            if let Err(e) = stop_sender.send_blocking(()) {
-                error!(
-                    "Failed to send shutdown signal to shard {}: {}",
-                    shard_id, e
-                );
+    /*
+        ::set_handler(move || {
+            info!("Received shutdown signal (SIGTERM/SIGINT), initiating 
graceful shutdown...");
+
+            for (shard_id, stop_sender) in &shutdown_handles_for_signal {
+                info!("Sending shutdown signal to shard {}", shard_id);
+                if let Err(e) = stop_sender.send_blocking(()) {
+                    error!(
+                        "Failed to send shutdown signal to shard {}: {}",
+                        shard_id, e
+                    );
+                }
             }
-        }
-    })
-    .expect("Error setting Ctrl-C handler");
+        })
+        .expect("Error setting Ctrl-C handler");
+    */
 
     info!("Iggy server is running. Press Ctrl+C or send SIGTERM to shutdown.");
     for (idx, handle) in handles.into_iter().enumerate() {
diff --git a/core/server/src/quic/quic_sender.rs 
b/core/server/src/quic/quic_sender.rs
index c6403349..54bda604 100644
--- a/core/server/src/quic/quic_sender.rs
+++ b/core/server/src/quic/quic_sender.rs
@@ -17,11 +17,12 @@
  */
 
 use crate::quic::COMPONENT;
+use crate::streaming::utils::PooledBuffer;
 use crate::{binary::sender::Sender, server_error::ServerError};
 use bytes::BytesMut;
+use compio::buf::{IoBuf, IoBufMut};
 use error_set::ErrContext;
 use iggy_common::IggyError;
-use monoio::buf::IoBufMut;
 use nix::libc;
 use quinn::{RecvStream, SendStream};
 use std::io::IoSlice;
@@ -71,7 +72,7 @@ impl Sender for QuicSender {
     async fn send_ok_response_vectored(
         &mut self,
         length: &[u8],
-        slices: Vec<libc::iovec>,
+        slices: Vec<PooledBuffer>,
     ) -> Result<(), IggyError> {
         debug!("Sending vectored response with status: {:?}...", STATUS_OK);
 
diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs
index 8ffc6560..2178d2b3 100644
--- a/core/server/src/shard/mod.rs
+++ b/core/server/src/shard/mod.rs
@@ -110,6 +110,7 @@ impl Shard {
     }
 }
 
+#[derive(Debug, Clone, Eq, PartialEq, Hash)]
 pub struct ShardInfo {
     id: u16,
 }
@@ -118,6 +119,10 @@ impl ShardInfo {
     pub fn new(id: u16) -> Self {
         Self { id }
     }
+
+    pub fn id(&self) -> u16 {
+        self.id
+    }
 }
 
 pub struct IggyShard {
@@ -193,7 +198,8 @@ impl IggyShard {
         let stop_receiver = self.get_stop_receiver();
         let shard_for_shutdown = self.clone();
 
-        monoio::spawn(async move {
+        /*
+        compio::runtime::spawn(async move {
             let _ = stop_receiver.recv().await;
             info!("Shard {} received shutdown signal", shard_for_shutdown.id);
 
@@ -207,6 +213,7 @@ impl IggyShard {
                 );
             }
         });
+        */
 
         let result = try_join_all(tasks).await;
         result?;
@@ -506,7 +513,7 @@ impl IggyShard {
     async fn handle_event(&self, event: Arc<ShardEvent>) -> Result<(), 
IggyError> {
         match &*event {
             ShardEvent::CreatedStream { stream_id, name } => {
-                self.create_stream_bypass_auth(*stream_id, name).await
+                self.create_stream_bypass_auth(*stream_id, name)
             }
             ShardEvent::CreatedTopic {
                 stream_id,
@@ -517,6 +524,7 @@ impl IggyShard {
                 compression_algorithm,
                 max_topic_size,
                 replication_factor,
+                shards_assignment,
             } => {
                 self.create_topic_bypass_auth(
                     stream_id,
@@ -527,6 +535,7 @@ impl IggyShard {
                     *compression_algorithm,
                     *max_topic_size,
                     *replication_factor,
+                    shards_assignment.clone()
                 )
                 .await
             }
@@ -604,7 +613,7 @@ impl IggyShard {
 
     pub fn insert_shard_table_records(
         &self,
-        records: impl Iterator<Item = (IggyNamespace, ShardInfo)>,
+        records: impl IntoIterator<Item = (IggyNamespace, ShardInfo)>,
     ) {
         self.shards_table.borrow_mut().extend(records);
     }
diff --git a/core/server/src/shard/system/snapshot/mod.rs 
b/core/server/src/shard/system/snapshot/mod.rs
index eb726509..f6bf7857 100644
--- a/core/server/src/shard/system/snapshot/mod.rs
+++ b/core/server/src/shard/system/snapshot/mod.rs
@@ -23,14 +23,14 @@ use crate::shard::IggyShard;
 use crate::streaming::session::Session;
 use async_zip::tokio::write::ZipFileWriter;
 use async_zip::{Compression, ZipEntryBuilder};
+use compio::fs::{File, OpenOptions};
+use compio::io::{AsyncReadAtExt, AsyncWriteAtExt};
 use iggy_common::{IggyDuration, IggyError, Snapshot, SnapshotCompression, 
SystemSnapshotType};
-use monoio::fs::{File, OpenOptions};
 use std::io::Cursor;
 use std::path::PathBuf;
 use std::sync::Arc;
 use std::time::Instant;
 use tempfile::NamedTempFile;
-use tokio::io::{AsyncReadExt, AsyncWriteExt};
 use tokio::process::Command;
 use tokio_util::compat::TokioAsyncWriteCompatExt;
 use tracing::{error, info};
@@ -59,6 +59,8 @@ impl IggyShard {
         // and impl the monoio async writer, based on this example:
         // https://youtu.be/RYHYiXMJdZI?si=d2roKeHn5lJrw2ri&t=1140
         // and rc-zip-tokio crate.
+
+        /*
         let cursor = Cursor::new(Vec::new());
         let mut zip_writer = ZipFileWriter::new(cursor.compat_write());
 
@@ -80,13 +82,17 @@ impl IggyShard {
                     let filename = format!("{snapshot_type}.txt");
                     let entry = ZipEntryBuilder::new(filename.clone().into(), 
compression);
 
-                    let file = File::open(temp_file.path()).await.map_err(|e| {
-                        error!("Failed to open temporary file: {}", e);
-                        IggyError::SnapshotFileCompletionFailed
-                    })?;
+                    let file = OpenOptions::new()
+                        .read(true)
+                        .open(temp_file.path())
+                        .await
+                        .map_err(|e| {
+                            error!("Failed to open temporary file: {}", e);
+                            IggyError::SnapshotFileCompletionFailed
+                        })?;
 
                     let content = Vec::new();
-                    let (result, content) = file.read_exact_at(content, 
0).await;
+                    let (result, content) = file.read_exact_at(content, 
0).await.into();
                     if let Err(e) = result {
                         error!("Failed to read temporary file: {}", e);
                         continue;
@@ -112,7 +118,9 @@ impl IggyShard {
                 }
             }
         }
+        */
 
+        /*
         info!(
             "Snapshot commands {:?} finished in {}",
             snapshot_types,
@@ -128,7 +136,8 @@ impl IggyShard {
         let zip_data = cursor.into_inner();
 
         info!("Final zip size: {} bytes", zip_data.len());
-        Ok(Snapshot::new(zip_data))
+        */
+        Ok(Snapshot::new(vec![]))
     }
 }
 
@@ -137,8 +146,11 @@ async fn write_command_output_to_temp_file(
 ) -> Result<NamedTempFile, std::io::Error> {
     let output = command.output().await?;
     let temp_file = NamedTempFile::new()?;
-    let file = File::from_std(temp_file.as_file().try_clone()?).unwrap();
-    let (result, _) = file.write_all_at(output.stdout, 0).await;
+    let mut file = OpenOptions::new()
+        .write(true)
+        .open(temp_file.path())
+        .await?;
+    let (result, _) = file.write_all_at(output.stdout, 0).await.into();
     result?;
     file.sync_all().await?;
     Ok(temp_file)
@@ -150,33 +162,39 @@ async fn get_filesystem_overview() -> 
Result<NamedTempFile, std::io::Error> {
 
 async fn get_process_info() -> Result<NamedTempFile, std::io::Error> {
     let temp_file = NamedTempFile::new()?;
-    let file = File::from_std(temp_file.as_file().try_clone()?).unwrap();
+    let mut file = OpenOptions::new()
+        .create(true)
+        .write(true)
+        .open(temp_file.path())
+        .await?;
 
     let mut position = 0;
     let ps_output = Command::new("ps").arg("aux").output().await?;
     let (result, written) = file
         .write_all_at(b"=== Process List (ps aux) ===\n", 0)
-        .await;
+        .await
+        .into();
     result?;
     position += written.len() as u64;
 
-    let (result, written) = file.write_all_at(ps_output.stdout, 
position).await;
+    let (result, written) = file.write_all_at(ps_output.stdout, 
position).await.into();
     result?;
     position += written.len() as u64;
 
-    let (result, written) = file.write_all_at(b"\n\n", position).await;
+    let (result, written) = file.write_all_at(b"\n\n", position).await.into();
     result?;
     position += written.len() as u64;
 
     let (result, written) = file
         .write_all_at(b"=== Detailed Process Information ===\n", position)
-        .await;
+        .await
+        .into();
     result?;
     position += written.len() as u64;
 
     let proc_info = procdump::get_proc_info().await?;
     let bytes = proc_info.as_bytes().to_owned();
-    let (result, _) = file.write_all_at(bytes, position).await;
+    let (result, _) = file.write_all_at(bytes, position).await.into();
     result?;
     file.sync_all().await?;
 
diff --git a/core/server/src/shard/system/storage.rs 
b/core/server/src/shard/system/storage.rs
index 3d895d4c..e1e7cf84 100644
--- a/core/server/src/shard/system/storage.rs
+++ b/core/server/src/shard/system/storage.rs
@@ -17,18 +17,17 @@
  */
 
 use super::COMPONENT;
-use crate::io::file::IggyFile;
 use crate::shard::system::info::SystemInfo;
 use crate::streaming::persistence::persister::PersisterKind;
 use crate::streaming::storage::SystemInfoStorage;
 use crate::streaming::utils::PooledBuffer;
 use crate::streaming::utils::file;
 use anyhow::Context;
+use compio::io::AsyncReadAtExt;
+use compio::io::AsyncReadExt;
 use error_set::ErrContext;
 use iggy_common::IggyError;
-use monoio::io::AsyncReadRentExt;
 use std::sync::Arc;
-use tokio::io::AsyncReadExt;
 use tracing::info;
 
 #[derive(Debug)]
@@ -63,10 +62,12 @@ impl SystemInfoStorage for FileSystemInfoStorage {
             .map_err(|_| IggyError::CannotReadFileMetadata)?
             .len() as usize;
 
-        let mut file = IggyFile::new(file);
+        let file = file::open(&self.path)
+            .await
+            .map_err(|_| IggyError::CannotReadFile)?;
         let mut buffer = PooledBuffer::with_capacity(file_size);
         buffer.put_bytes(0, file_size);
-        let (result, buffer) = file.read_exact(buffer).await;
+        let (result, buffer) = file.read_exact_at(buffer, 0).await.into();
         result
             .with_error_context(|error| {
                 format!(
diff --git a/core/server/src/shard/system/streams.rs 
b/core/server/src/shard/system/streams.rs
index b4921066..bc8fc2bb 100644
--- a/core/server/src/shard/system/streams.rs
+++ b/core/server/src/shard/system/streams.rs
@@ -186,7 +186,7 @@ impl IggyShard {
         }))
     }
 
-    pub async fn create_stream_bypass_auth(
+    pub fn create_stream_bypass_auth(
         &self,
         stream_id: Option<u32>,
         name: &str,
diff --git a/core/server/src/shard/system/topics.rs 
b/core/server/src/shard/system/topics.rs
index 0f8155fb..d71b0f1d 100644
--- a/core/server/src/shard/system/topics.rs
+++ b/core/server/src/shard/system/topics.rs
@@ -112,8 +112,9 @@ impl IggyShard {
         compression_algorithm: CompressionAlgorithm,
         max_topic_size: MaxTopicSize,
         replication_factor: Option<u8>,
+        shards_assignment: Vec<(IggyNamespace, ShardInfo)>,
     ) -> Result<(), IggyError> {
-        let (topic_id, partition_ids) = self.create_topic_base(
+        let (topic_id, _) = self.create_topic_base(
             stream_id,
             topic_id,
             name,
@@ -132,22 +133,16 @@ impl IggyShard {
             .with_error_context(|error| {
                 format!("{COMPONENT} (error: {error}) - failed to get topic 
with ID: {topic_id} in stream with ID: {stream_id}")
             })?;
-        topic.persist().await.with_error_context(|error| {
-            format!("{COMPONENT} (error: {error}) - failed to persist topic: 
{topic}")
-        })?;
 
-        // TODO: Figure out a way how to distribute the shards table among 
different shards,
-        // without the need to do code from below, everytime we handle a 
`ShardEvent`.
-        // I think we shouldn't be sharing it tho and still maintain a single 
shard table per shard,
-        // but find a way how to distribute it smarter (maybe move the 
broadcast inside of the `insert_shard_table_records`) method.
-        let records = partition_ids.into_iter().map(|partition_id| {
-            let namespace = IggyNamespace::new(stream_id, topic_id, 
partition_id);
-            let hash = namespace.generate_hash();
-            let shard_id = hash % self.get_available_shards_count();
-            let shard_info = ShardInfo::new(shard_id as u16);
-            (namespace, shard_info)
-        });
-        self.insert_shard_table_records(records);
+        for (_, shard_info) in &shards_assignment {
+            if shard_info.id() == self.id {
+                topic.persist().await.with_error_context(|error| {
+                    format!("{COMPONENT} (error: {error}) - failed to persist 
topic: {topic}")
+                })?;
+            }
+        }
+
+        self.insert_shard_table_records(shards_assignment);
 
         self.metrics.increment_topics(1);
         self.metrics.increment_partitions(partitions_count);
@@ -167,7 +162,7 @@ impl IggyShard {
         compression_algorithm: CompressionAlgorithm,
         max_topic_size: MaxTopicSize,
         replication_factor: Option<u8>,
-    ) -> Result<Identifier, IggyError> {
+    ) -> Result<(Vec<(IggyNamespace, ShardInfo)>, Identifier), IggyError> {
         self.ensure_authenticated(session)?;
         {
             let stream = self.get_stream(stream_id).with_error_context(|error| 
{
@@ -208,6 +203,7 @@ impl IggyShard {
             format!("{COMPONENT} (error: {error}) - failed to persist topic: 
{topic}")
         })?;
 
+        // TODO: Refactor
         let records = partition_ids.into_iter().map(|partition_id| {
             let namespace = IggyNamespace::new(stream_id, topic_id, 
partition_id);
             let hash = namespace.generate_hash();
@@ -215,13 +211,14 @@ impl IggyShard {
             let shard_info = ShardInfo::new(shard_id as u16);
             (namespace, shard_info)
         });
-        self.insert_shard_table_records(records);
+        let records = records.collect::<Vec<_>>();
+        self.insert_shard_table_records(records.clone());
 
         self.metrics.increment_topics(1);
         self.metrics.increment_partitions(partitions_count);
         self.metrics.increment_segments(partitions_count);
 
-        Ok(Identifier::numeric(topic_id)?)
+        Ok((records, Identifier::numeric(topic_id)?))
     }
 
     fn create_topic_base(
diff --git a/core/server/src/shard/task_registry.rs 
b/core/server/src/shard/task_registry.rs
index 1a201d3b..ff674a8e 100644
--- a/core/server/src/shard/task_registry.rs
+++ b/core/server/src/shard/task_registry.rs
@@ -17,8 +17,8 @@
  */
 
 use async_channel::{Receiver, Sender, bounded};
+use compio::runtime::JoinHandle;
 use futures::future::join_all;
-use monoio::task::JoinHandle;
 use std::cell::RefCell;
 use std::collections::HashMap;
 use std::future::Future;
@@ -42,7 +42,7 @@ impl TaskRegistry {
     where
         F: Future<Output = ()> + 'static,
     {
-        let handle = monoio::spawn(future);
+        let handle = compio::runtime::spawn(future);
         self.tasks.borrow_mut().push(handle);
     }
 
@@ -85,8 +85,8 @@ impl TaskRegistry {
             .into_iter()
             .enumerate()
             .map(|(idx, handle)| async move {
-                match monoio::time::timeout(timeout, handle).await {
-                    Ok(()) => (idx, true),
+                match compio::time::timeout(timeout, handle).await {
+                    Ok(_) => (idx, true),
                     Err(_) => {
                         warn!("Task {} did not complete within timeout", idx);
                         (idx, false)
diff --git a/core/server/src/shard/tasks/messages.rs 
b/core/server/src/shard/tasks/messages.rs
index ec4018c0..d573c058 100644
--- a/core/server/src/shard/tasks/messages.rs
+++ b/core/server/src/shard/tasks/messages.rs
@@ -14,7 +14,7 @@ async fn run_shard_messages_receiver(shard: Rc<IggyShard>) -> 
Result<(), IggyErr
                 if shard.is_shutting_down() {
                     return;
                 }
-                monoio::time::sleep(Duration::from_millis(100)).await;
+                compio::time::sleep(Duration::from_millis(100)).await;
             }
         };
 
diff --git a/core/server/src/shard/transmission/event.rs 
b/core/server/src/shard/transmission/event.rs
index 49794c3f..60a28dfc 100644
--- a/core/server/src/shard/transmission/event.rs
+++ b/core/server/src/shard/transmission/event.rs
@@ -2,7 +2,7 @@ use std::net::SocketAddr;
 
 use iggy_common::{CompressionAlgorithm, Identifier, IggyExpiry, MaxTopicSize};
 
-use crate::streaming::clients::client_manager::Transport;
+use crate::{shard::{namespace::IggyNamespace, ShardInfo}, 
streaming::clients::client_manager::Transport};
 
 #[derive(Debug)]
 pub enum ShardEvent {
@@ -24,6 +24,7 @@ pub enum ShardEvent {
         compression_algorithm: CompressionAlgorithm,
         max_topic_size: MaxTopicSize,
         replication_factor: Option<u8>,
+        shards_assignment: Vec<(IggyNamespace, ShardInfo)>,
     },
     //CreatedConsumerGroup(Identifier, Identifier, Option<u32>, String),
     //DeletedConsumerGroup(Identifier, Identifier, Identifier),
diff --git a/core/server/src/state/file.rs b/core/server/src/state/file.rs
index e8eb6ee6..5716d7bb 100644
--- a/core/server/src/state/file.rs
+++ b/core/server/src/state/file.rs
@@ -16,28 +16,27 @@
  * under the License.
  */
 
-use crate::io::file::IggyFile;
-use crate::io::reader::IggyReader;
 use crate::state::command::EntryCommand;
 use crate::state::{COMPONENT, State, StateEntry};
 use crate::streaming::persistence::persister::PersisterKind;
 use crate::streaming::utils::file;
 use crate::versioning::SemanticVersion;
 use bytes::{Buf, BufMut, Bytes, BytesMut};
+use compio::fs::File;
+use compio::io::{AsyncReadExt, AsyncWriteExt};
 use error_set::ErrContext;
 use iggy_common::BytesSerializable;
 use iggy_common::EncryptorKind;
 use iggy_common::IggyByteSize;
 use iggy_common::IggyError;
 use iggy_common::IggyTimestamp;
-use monoio::io::AsyncReadRentExt;
 use std::fmt::Debug;
 use std::path::Path;
 use std::sync::Arc;
 use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
 use tracing::{debug, error, info};
 
-pub const BUF_READER_CAPACITY_BYTES: usize = 512 * 1000;
+pub const BUF_cursor_CAPACITY_BYTES: usize = 512 * 1000;
 const FILE_STATE_PARSE_ERROR: &str = "STATE - failed to parse file state";
 
 #[derive(Debug)]
@@ -129,7 +128,6 @@ impl State for FileState {
             .map_err(|_| IggyError::CannotReadFileMetadata)?
             .len();
 
-        let file = IggyFile::new(file);
         if file_size == 0 {
             info!("State file is empty");
             return Ok(Vec::new());
@@ -141,11 +139,11 @@ impl State for FileState {
         );
         let mut entries = Vec::new();
         let mut total_size: u64 = 0;
-        let mut reader = IggyReader::with_capacity(BUF_READER_CAPACITY_BYTES, 
file);
+        let mut cursor = std::io::Cursor::new(file);
         let mut current_index = 0;
         let mut entries_count = 0;
         loop {
-            let index = reader
+            let index = cursor
                 .read_u64_le()
                 .await
                 .with_error_context(|error| format!("{FILE_STATE_PARSE_ERROR} 
index. {error}"))
@@ -163,32 +161,32 @@ impl State for FileState {
 
             current_index = index;
             entries_count += 1;
-            let term = reader
+            let term = cursor
                 .read_u64_le()
                 .await
                 .with_error_context(|error| format!("{FILE_STATE_PARSE_ERROR} 
term. {error}"))
                 .map_err(|_| IggyError::InvalidNumberEncoding)?;
             total_size += 8;
-            let leader_id = reader
+            let leader_id = cursor
                 .read_u32_le()
                 .await
                 .with_error_context(|error| format!("{FILE_STATE_PARSE_ERROR} 
leader_id. {error}"))
                 .map_err(|_| IggyError::InvalidNumberEncoding)?;
             total_size += 4;
-            let version = reader
+            let version = cursor
                 .read_u32_le()
                 .await
                 .with_error_context(|error| format!("{FILE_STATE_PARSE_ERROR} 
version. {error}"))
                 .map_err(|_| IggyError::InvalidNumberEncoding)?;
             total_size += 4;
-            let flags = reader
+            let flags = cursor
                 .read_u64_le()
                 .await
                 .with_error_context(|error| format!("{FILE_STATE_PARSE_ERROR} 
flags. {error}"))
                 .map_err(|_| IggyError::InvalidNumberEncoding)?;
             total_size += 8;
             let timestamp = IggyTimestamp::from(
-                reader
+                cursor
                     .read_u64_le()
                     .await
                     .with_error_context(|error| {
@@ -197,19 +195,19 @@ impl State for FileState {
                     .map_err(|_| IggyError::InvalidNumberEncoding)?,
             );
             total_size += 8;
-            let user_id = reader
+            let user_id = cursor
                 .read_u32_le()
                 .await
                 .with_error_context(|error| format!("{FILE_STATE_PARSE_ERROR} 
user_id. {error}"))
                 .map_err(|_| IggyError::InvalidNumberEncoding)?;
             total_size += 4;
-            let checksum = reader
+            let checksum = cursor
                 .read_u32_le()
                 .await
                 .with_error_context(|error| format!("{FILE_STATE_PARSE_ERROR} 
checksum. {error}"))
                 .map_err(|_| IggyError::InvalidNumberEncoding)?;
             total_size += 4;
-            let context_length = reader
+            let context_length = cursor
                 .read_u32_le()
                 .await
                 .with_error_context(|error| {
@@ -220,20 +218,20 @@ impl State for FileState {
             total_size += 4;
             let mut context = BytesMut::with_capacity(context_length);
             context.put_bytes(0, context_length);
-            let (result, context) = reader.read_exact(context).await;
+            let (result, context) = cursor.read_exact(context).await.into();
 
             result
                 .with_error_context(|error| format!("{FILE_STATE_PARSE_ERROR} 
code. {error}"))
                 .map_err(|_| IggyError::CannotReadFile)?;
             let context = context.freeze();
             total_size += context_length as u64;
-            let code = reader
+            let code = cursor
                 .read_u32_le()
                 .await
                 .with_error_context(|error| format!("{FILE_STATE_PARSE_ERROR} 
code. {error}"))
                 .map_err(|_| IggyError::InvalidNumberEncoding)?;
             total_size += 4;
-            let mut command_length = reader
+            let mut command_length = cursor
                 .read_u32_le()
                 .await
                 .with_error_context(|error| {
@@ -244,7 +242,7 @@ impl State for FileState {
             total_size += 4;
             let mut command = BytesMut::with_capacity(command_length);
             command.put_bytes(0, command_length);
-            let (result, command) = reader.read_exact(command).await;
+            let (result, command) = cursor.read_exact(command).await.into();
             result
                 .with_error_context(|error| format!("{FILE_STATE_PARSE_ERROR} 
command. {error}"))
                 .map_err(|_| IggyError::CannotReadFile)?;
diff --git a/core/server/src/streaming/partitions/storage.rs 
b/core/server/src/streaming/partitions/storage.rs
index 6621ea19..67e3d3f4 100644
--- a/core/server/src/streaming/partitions/storage.rs
+++ b/core/server/src/streaming/partitions/storage.rs
@@ -18,7 +18,6 @@
 
 use crate::compat::index_rebuilding::index_rebuilder::IndexRebuilder;
 use crate::configs::cache_indexes::CacheIndexesConfig;
-use crate::io::file::IggyFile;
 use crate::io::fs_utils;
 use crate::state::system::PartitionState;
 use crate::streaming::partitions::COMPONENT;
@@ -27,12 +26,12 @@ use crate::streaming::persistence::persister::PersisterKind;
 use crate::streaming::segments::*;
 use crate::streaming::storage::PartitionStorage;
 use crate::streaming::utils::file;
+use compio::fs;
+use compio::fs::create_dir_all;
+use compio::io::AsyncReadExt;
 use error_set::ErrContext;
 use iggy_common::ConsumerKind;
 use iggy_common::IggyError;
-use monoio::fs;
-use monoio::fs::create_dir_all;
-use monoio::io::AsyncReadRentExt;
 use std::path::Path;
 use std::sync::Arc;
 use std::sync::atomic::Ordering;
@@ -117,8 +116,18 @@ impl PartitionStorage for FilePartitionStorage {
             let messages_file_path = segment.messages_file_path().to_owned();
             let time_index_path = index_path.replace(INDEX_EXTENSION, 
"timeindex");
 
-            let index_path_exists = 
tokio::fs::try_exists(&index_path).await.unwrap();
-            let time_index_path_exists = 
tokio::fs::try_exists(&time_index_path).await.unwrap();
+            // TODO: Move to fs_utils
+            async fn try_exists(index_path: &str) -> Result<bool, 
std::io::Error> {
+                match compio::fs::metadata(index_path).await {
+                    Ok(_) => Ok(true),
+                    Err(err) => match err.kind() {
+                        std::io::ErrorKind::NotFound => Ok(false),
+                        _ => Err(err),
+                    },
+                }
+            }
+            let index_path_exists = try_exists(&index_path).await.unwrap();
+            let time_index_path_exists = 
try_exists(&time_index_path).await.unwrap();
             let index_cache_enabled = matches!(
                 partition.config.segment.cache_indexes,
                 CacheIndexesConfig::All | CacheIndexesConfig::OpenSegment
@@ -416,6 +425,7 @@ impl PartitionStorage for FilePartitionStorage {
 
         let mut consumer_offsets = Vec::new();
         let mut dir_entries = dir_entries.unwrap();
+        // TODO: reimplement using the compio dir walk
         while let Some(dir_entry) = dir_entries.next() {
             let dir_entry = dir_entry.unwrap();
             let metadata = dir_entry.metadata();
@@ -451,8 +461,9 @@ impl PartitionStorage for FilePartitionStorage {
                     )
                 })
                 .map_err(|_| IggyError::CannotReadFile)?;
-            let mut file = IggyFile::new(file);
-            let offset = file
+            // TODO: This is like awfull.
+            let mut cursor = std::io::Cursor::new(file);
+            let offset = cursor
                 .read_u64_le()
                 .await
                 .with_error_context(|error| {
diff --git a/core/server/src/streaming/persistence/persister.rs 
b/core/server/src/streaming/persistence/persister.rs
index 6ba5c87b..ca522b85 100644
--- a/core/server/src/streaming/persistence/persister.rs
+++ b/core/server/src/streaming/persistence/persister.rs
@@ -18,12 +18,13 @@
 
 use crate::streaming::persistence::COMPONENT;
 use crate::streaming::utils::file;
+use compio::buf::IoBuf;
+use compio::fs::remove_file;
+use compio::io::AsyncWriteAtExt;
 use error_set::ErrContext;
 use iggy_common::IggyError;
-use monoio::buf::IoBuf;
 use std::fmt::Debug;
 use std::future::Future;
-use tokio::fs;
 
 #[cfg(test)]
 use mockall::automock;
@@ -85,13 +86,13 @@ pub struct FileWithSyncPersister;
 
 impl Persister for FilePersister {
     async fn append<B: IoBuf>(&self, path: &str, bytes: B) -> Result<(), 
IggyError> {
-        let file = file::append(path)
+        let (mut file, position) = file::append(path)
             .await
             .with_error_context(|error| {
                 format!("{COMPONENT} (error: {error}) - failed to append to 
file: {path}")
             })
             .map_err(|_| IggyError::CannotAppendToFile)?;
-        file.write_all_at(bytes, 0)
+        file.write_all_at(bytes, position)
             .await
             .0
             .with_error_context(|error| {
@@ -102,7 +103,7 @@ impl Persister for FilePersister {
     }
 
     async fn overwrite<B: IoBuf>(&self, path: &str, bytes: B) -> Result<(), 
IggyError> {
-        let file = file::overwrite(path)
+        let mut file = file::overwrite(path)
             .await
             .with_error_context(|error| {
                 format!("{COMPONENT} (error: {error}) - failed to overwrite 
file: {path}")
@@ -120,7 +121,7 @@ impl Persister for FilePersister {
     }
 
     async fn delete(&self, path: &str) -> Result<(), IggyError> {
-        fs::remove_file(path)
+        remove_file(path)
             .await
             .with_error_context(|error| {
                 format!("{COMPONENT} (error: {error}) - failed to delete file: 
{path}")
@@ -132,17 +133,12 @@ impl Persister for FilePersister {
 
 impl Persister for FileWithSyncPersister {
     async fn append<B: IoBuf>(&self, path: &str, bytes: B) -> Result<(), 
IggyError> {
-        let file = file::append(path)
+        let (mut file, position) = file::append(path)
             .await
             .with_error_context(|error| {
                 format!("{COMPONENT} (error: {error}) - failed to append to 
file: {path}")
             })
             .map_err(|_| IggyError::CannotAppendToFile)?;
-        let position = file
-            .metadata()
-            .await
-            .map_err(|_| IggyError::CannotReadFileMetadata)?
-            .len();
         file.write_all_at(bytes, position)
             .await
             .0
@@ -162,7 +158,7 @@ impl Persister for FileWithSyncPersister {
     }
 
     async fn overwrite<B: IoBuf>(&self, path: &str, bytes: B) -> Result<(), 
IggyError> {
-        let file = file::overwrite(path)
+        let mut file = file::overwrite(path)
             .await
             .with_error_context(|error| {
                 format!("{COMPONENT} (error: {error}) - failed to overwrite 
file: {path}")
@@ -188,7 +184,7 @@ impl Persister for FileWithSyncPersister {
     }
 
     async fn delete(&self, path: &str) -> Result<(), IggyError> {
-        fs::remove_file(path)
+        remove_file(path)
             .await
             .with_error_context(|error| {
                 format!("{COMPONENT} (error: {error}) - failed to delete file: 
{path}")
diff --git a/core/server/src/streaming/persistence/task.rs 
b/core/server/src/streaming/persistence/task.rs
index d1c716aa..1b359ef7 100644
--- a/core/server/src/streaming/persistence/task.rs
+++ b/core/server/src/streaming/persistence/task.rs
@@ -18,18 +18,18 @@
 
 use crate::streaming::persistence::COMPONENT;
 use bytes::Bytes;
+use compio::runtime::Task;
 use error_set::ErrContext;
 use flume::{Receiver, Sender, unbounded};
 use iggy_common::IggyError;
-use monoio::task;
-use std::{sync::Arc, time::Duration};
+use std::{any::Any, sync::Arc, time::Duration};
 use tracing::error;
 
 use super::persister::PersisterKind;
 
 pub struct LogPersisterTask {
     _sender: Option<Sender<Bytes>>,
-    _task_handle: Option<task::JoinHandle<()>>,
+    _task_handle: Option<Task<Result<(), Box<dyn Any + Send>>>>,
 }
 
 impl std::fmt::Debug for LogPersisterTask {
@@ -50,7 +50,7 @@ impl LogPersisterTask {
     ) -> Self {
         let (sender, receiver): (Sender<Bytes>, Receiver<Bytes>) = unbounded();
 
-        let task_handle = monoio::spawn(async move {
+        let task_handle = compio::runtime::spawn(async move {
             loop {
                 match receiver.recv_async().await {
                     Ok(data) => {
@@ -129,7 +129,7 @@ impl Drop for LogPersisterTask {
         self._sender.take();
 
         if let Some(handle) = self._task_handle.take() {
-            monoio::spawn(async move { handle.await });
+            compio::runtime::spawn(async move { handle.await });
         }
     }
 }
diff --git a/core/server/src/streaming/segments/indexes/index_reader.rs 
b/core/server/src/streaming/segments/indexes/index_reader.rs
index 8b09666a..ce235ef4 100644
--- a/core/server/src/streaming/segments/indexes/index_reader.rs
+++ b/core/server/src/streaming/segments/indexes/index_reader.rs
@@ -17,13 +17,16 @@
  */
 
 use super::IggyIndexesMut;
-use crate::{io::file::IggyFile, streaming::utils::PooledBuffer};
+use crate::streaming::utils::PooledBuffer;
 use bytes::BytesMut;
+use compio::{
+    BufResult,
+    fs::{File, OpenOptions},
+    io::AsyncReadAtExt,
+};
 use error_set::ErrContext;
 use iggy_common::{INDEX_SIZE, IggyError, IggyIndex, IggyIndexView};
-use monoio::fs::OpenOptions;
 use std::{
-    fs::File as StdFile,
     io::ErrorKind,
     os::unix::fs::FileExt,
     sync::{
@@ -37,7 +40,7 @@ use tracing::{error, trace};
 #[derive(Debug)]
 pub struct IndexReader {
     file_path: String,
-    file: IggyFile,
+    file: File,
     index_size_bytes: Arc<AtomicU64>,
 }
 
@@ -51,7 +54,6 @@ impl IndexReader {
             .with_error_context(|error| format!("Failed to open index file: 
{file_path}. {error}"))
             .map_err(|_| IggyError::CannotReadFile)?;
 
-        let file = IggyFile::new(file);
         trace!(
             "Opened index file for reading: {file_path}, size: {}",
             index_size_bytes.load(Ordering::Acquire)
@@ -337,13 +339,13 @@ impl IndexReader {
         if use_pool {
             let mut buf = PooledBuffer::with_capacity(len as usize);
             unsafe { buf.set_len(len as usize) };
-            let (result, buf) = self.file.read_exact_at(buf, offset as 
u64).await;
+            let (result, buf) = self.file.read_exact_at(buf, offset as 
u64).await.into();
             result?;
             Ok(buf)
         } else {
             let mut buf = BytesMut::with_capacity(len as usize);
             unsafe { buf.set_len(len as usize) };
-            let (result, buf) = self.file.read_exact_at(buf, offset as 
u64).await;
+            let (result, buf) = self.file.read_exact_at(buf, offset as 
u64).await.into();
             result?;
             Ok(PooledBuffer::from_existing(buf))
         }
diff --git a/core/server/src/streaming/segments/indexes/index_writer.rs 
b/core/server/src/streaming/segments/indexes/index_writer.rs
index 4b79f9d2..416cde21 100644
--- a/core/server/src/streaming/segments/indexes/index_writer.rs
+++ b/core/server/src/streaming/segments/indexes/index_writer.rs
@@ -16,25 +16,25 @@
  * under the License.
  */
 
+use compio::fs::File;
+use compio::fs::OpenOptions;
+use compio::io::AsyncWriteAtExt;
 use error_set::ErrContext;
 use iggy_common::INDEX_SIZE;
 use iggy_common::IggyError;
-use monoio::fs::OpenOptions;
-use monoio::io::AsyncWriteRentExt;
 use std::sync::{
     Arc,
     atomic::{AtomicU64, Ordering},
 };
 use tracing::trace;
 
-use crate::io::file::IggyFile;
 use crate::streaming::utils::PooledBuffer;
 
 /// A dedicated struct for writing to the index file.
 #[derive(Debug)]
 pub struct IndexWriter {
     file_path: String,
-    file: IggyFile,
+    file: File,
     index_size_bytes: Arc<AtomicU64>,
     fsync: bool,
 }
@@ -48,15 +48,13 @@ impl IndexWriter {
         file_exists: bool,
     ) -> Result<Self, IggyError> {
         let file = OpenOptions::new()
-            .write(true)
-            .append(true)
             .create(true)
+            .write(true)
             .open(file_path)
             .await
             .with_error_context(|error| format!("Failed to open index file: 
{file_path}. {error}"))
             .map_err(|_| IggyError::CannotReadFile)?;
 
-        let file = IggyFile::from(file);
         if file_exists {
             let _ = file.sync_all().await.with_error_context(|error| {
                 format!("Failed to fsync index file after creation: 
{file_path}. {error}",)
@@ -96,8 +94,9 @@ impl IndexWriter {
         let count = indexes.len() / INDEX_SIZE;
         let len = indexes.len();
 
+        let position = self.index_size_bytes.load(Ordering::Relaxed);
         self.file
-            .write_all(indexes)
+            .write_all_at(indexes, position)
             .await
             .0
             .with_error_context(|error| {
diff --git a/core/server/src/streaming/segments/messages/messages_reader.rs 
b/core/server/src/streaming/segments/messages/messages_reader.rs
index 4091e8e3..e81eb793 100644
--- a/core/server/src/streaming/segments/messages/messages_reader.rs
+++ b/core/server/src/streaming/segments/messages/messages_reader.rs
@@ -16,13 +16,13 @@
  * under the License.
  */
 
-use crate::io::file::IggyFile;
 use crate::streaming::segments::{IggyIndexesMut, IggyMessagesBatchMut};
-use crate::streaming::utils::{PooledBuffer, file};
+use crate::streaming::utils::PooledBuffer;
 use bytes::BytesMut;
+use compio::fs::{File, OpenOptions};
+use compio::io::AsyncReadAtExt;
 use error_set::ErrContext;
 use iggy_common::IggyError;
-use monoio::fs::File;
 use std::{
     io::ErrorKind,
     sync::{
@@ -36,7 +36,7 @@ use tracing::{error, trace};
 #[derive(Debug)]
 pub struct MessagesReader {
     file_path: String,
-    file: IggyFile,
+    file: File,
     messages_size_bytes: Arc<AtomicU64>,
 }
 
@@ -46,9 +46,12 @@ impl MessagesReader {
         file_path: &str,
         messages_size_bytes: Arc<AtomicU64>,
     ) -> Result<Self, IggyError> {
-        let file = file::open_std(file_path)
+        let file = OpenOptions::new()
+            .read(true)
+            .open(file_path)
+            .await
             .with_error_context(|error| {
-                format!("Failed to open messages file: {file_path}, error: 
{error}")
+                format!("Failed to open messages file: {file_path}. {error}")
             })
             .map_err(|_| IggyError::CannotReadFile)?;
 
@@ -67,7 +70,6 @@ impl MessagesReader {
                 )
             });
         }
-        let file = File::from_std(file).unwrap();
 
         trace!(
             "Opened messages file for reading: {file_path}, size: {}",
@@ -177,13 +179,13 @@ impl MessagesReader {
         if use_pool {
             let mut buf = PooledBuffer::with_capacity(len as usize);
             unsafe { buf.set_len(len as usize) };
-            let (result, buf) = self.file.read_exact_at(buf, offset as 
u64).await;
+            let (result, buf) = self.file.read_exact_at(buf, offset as 
u64).await.into();
             result?;
             Ok(buf)
         } else {
             let mut buf = BytesMut::with_capacity(len as usize);
             unsafe { buf.set_len(len as usize) };
-            let (result, buf) = self.file.read_exact_at(buf, offset as 
u64).await;
+            let (result, buf) = self.file.read_exact_at(buf, offset as 
u64).await.into();
             result?;
             Ok(PooledBuffer::from_existing(buf))
         }
diff --git a/core/server/src/streaming/segments/messages/messages_writer.rs 
b/core/server/src/streaming/segments/messages/messages_writer.rs
index 63e424a9..f3d43d1e 100644
--- a/core/server/src/streaming/segments/messages/messages_writer.rs
+++ b/core/server/src/streaming/segments/messages/messages_writer.rs
@@ -16,13 +16,10 @@
  * under the License.
  */
 
-use crate::{
-    io::file::IggyFile,
-    streaming::segments::{IggyMessagesBatchSet, messages::write_batch},
-};
+use crate::streaming::segments::{IggyMessagesBatchSet, messages::write_batch};
+use compio::fs::{File, OpenOptions};
 use error_set::ErrContext;
 use iggy_common::{IggyByteSize, IggyError};
-use monoio::fs::{File, OpenOptions};
 use std::sync::{
     Arc,
     atomic::{AtomicU64, Ordering},
@@ -34,7 +31,7 @@ use tracing::{error, trace};
 pub struct MessagesWriter {
     file_path: String,
     /// Holds the file for synchronous writes; when asynchronous persistence 
is enabled, this will be None.
-    file: Option<IggyFile>,
+    file: Option<File>,
     /// When set, asynchronous writes are handled by this persister task.
     messages_size_bytes: Arc<AtomicU64>,
     fsync: bool,
@@ -53,11 +50,13 @@ impl MessagesWriter {
         file_exists: bool,
     ) -> Result<Self, IggyError> {
         let file = OpenOptions::new()
-            .write(true)
             .create(true)
-            .append(true)
+            .write(true)
             .open(file_path)
             .await
+            .with_error_context(|err| {
+                format!("Failed to open messages file: {file_path}, error: 
{err}")
+            })
             .map_err(|_| IggyError::CannotReadFile)?;
 
         if file_exists {
@@ -82,7 +81,7 @@ impl MessagesWriter {
             messages_size_bytes.load(Ordering::Acquire)
         );
 
-        let file = Some(IggyFile::new(file));
+        let file = Some(file);
         Ok(Self {
             file_path: file_path.to_string(),
             file,
@@ -103,16 +102,16 @@ impl MessagesWriter {
             "Saving batch set of size {messages_size} bytes 
({containers_count} containers, {messages_count} messages) to messages file: 
{}",
             self.file_path
         );
+        let position = self.messages_size_bytes.load(Ordering::Relaxed);
         if let Some(ref mut file) = self.file {
-            write_batch(file, &self.file_path, batch_set)
+            write_batch(file, position, batch_set)
                 .await
                 .with_error_context(|error| {
                     format!(
                         "Failed to write batch to messages file: {}. {error}",
                         self.file_path
                     )
-                })
-                .map_err(|_| IggyError::CannotWriteToFile)?;
+                })?;
         } else {
             error!("File handle is not available for synchronous write.");
             return Err(IggyError::CannotWriteToFile);
diff --git a/core/server/src/streaming/segments/messages/mod.rs 
b/core/server/src/streaming/segments/messages/mod.rs
index 9799608f..a3989a2e 100644
--- a/core/server/src/streaming/segments/messages/mod.rs
+++ b/core/server/src/streaming/segments/messages/mod.rs
@@ -19,37 +19,25 @@
 mod messages_reader;
 mod messages_writer;
 
-use crate::io::file::IggyFile;
-
 use super::IggyMessagesBatchSet;
-use error_set::ErrContext;
+use compio::{fs::File, io::AsyncWriteAtExt};
 use iggy_common::IggyError;
-use monoio::io::AsyncWriteRentExt;
 
 pub use messages_reader::MessagesReader;
 pub use messages_writer::MessagesWriter;
 
 /// Vectored write a batches of messages to file
 async fn write_batch(
-    file: &mut IggyFile,
-    file_path: &str,
+    file: &mut File,
+    position: u64,
     mut batches: IggyMessagesBatchSet,
 ) -> Result<usize, IggyError> {
-    //let mut slices = batches.iter().map(|b| 
to_iovec(&b)).collect::<Vec<iovec>>();
-    let mut total_written = 0;
-    // TODO: Fork monoio, piece of shit runtime.
-    for batch in batches.iter_mut() {
-        let messages = batch.take_messages();
-        let writen = file
-            .write_all(messages)
-            .await
-            .0
-            .with_error_context(|error| {
-                format!("Failed to write messages to file: {file_path}, error: 
{error}",)
-                // TODO: Better error variant.
-            })
-            .map_err(|_| IggyError::CannotAppendMessage)?;
-        total_written += writen;
-    }
+    let total_written = batches.iter().map(|b| b.size() as usize).sum();
+    let batches = batches
+        .iter_mut()
+        .map(|b| b.take_messages())
+        .collect::<Vec<_>>();
+    let (result, _) = file.write_vectored_all_at(batches, 
position).await.into();
+    result.map_err(|_| IggyError::CannotWriteToFile)?;
     Ok(total_written)
 }
diff --git a/core/server/src/streaming/segments/segment.rs 
b/core/server/src/streaming/segments/segment.rs
index 9887df90..c13ce07a 100644
--- a/core/server/src/streaming/segments/segment.rs
+++ b/core/server/src/streaming/segments/segment.rs
@@ -21,6 +21,7 @@ use super::messages::*;
 use super::messages_accumulator::MessagesAccumulator;
 use crate::configs::system::SystemConfig;
 use crate::streaming::segments::*;
+use compio::fs::remove_file;
 use error_set::ErrContext;
 use iggy_common::INDEX_SIZE;
 use iggy_common::IggyByteSize;
@@ -30,7 +31,6 @@ use iggy_common::IggyTimestamp;
 use std::rc::Rc;
 use std::sync::Arc;
 use std::sync::atomic::{AtomicU64, Ordering};
-use tokio::fs::remove_file;
 use tracing::{info, warn};
 
 const SIZE_16MB: usize = 16 * 1024 * 1024;
@@ -302,7 +302,7 @@ impl Segment {
     pub async fn shutdown_writing(&mut self) {
         if let Some(log_writer) = self.messages_writer.take() {
             //TODO: Fixme not sure whether we should spawn a task here.
-            monoio::spawn(async move {
+            compio::runtime::spawn(async move {
                 let _ = log_writer.fsync().await;
             });
         } else {
@@ -314,7 +314,7 @@ impl Segment {
 
         if let Some(index_writer) = self.index_writer.take() {
             //TODO: Fixme not sure whether we should spawn a task here.
-            monoio::spawn(async move {
+            compio::runtime::spawn(async move {
                 let _ = index_writer.fsync().await;
                 drop(index_writer)
             });
diff --git a/core/server/src/streaming/streams/storage.rs 
b/core/server/src/streaming/streams/storage.rs
index d9848f03..b2b9672c 100644
--- a/core/server/src/streaming/streams/storage.rs
+++ b/core/server/src/streaming/streams/storage.rs
@@ -23,12 +23,12 @@ use crate::streaming::streams::COMPONENT;
 use crate::streaming::streams::stream::Stream;
 use crate::streaming::topics::topic::Topic;
 use ahash::AHashSet;
+use compio::fs;
+use compio::fs::create_dir_all;
 use error_set::ErrContext;
 use futures::future::join_all;
 use iggy_common::IggyError;
 use iggy_common::IggyTimestamp;
-use monoio::fs;
-use monoio::fs::create_dir_all;
 use serde::{Deserialize, Serialize};
 use std::path::Path;
 use std::sync::Arc;
@@ -157,7 +157,7 @@ impl StreamStorage for FileStreamStorage {
         for mut topic in unloaded_topics {
             let loaded_topics = loaded_topics.clone();
             let topic_state = state.topics.remove(&topic.topic_id).unwrap();
-            let load_topic = monoio::spawn(async move {
+            let load_topic = compio::runtime::spawn(async move {
                 match topic.load(topic_state).await {
                     Ok(_) => loaded_topics.lock().await.push(topic),
                     Err(error) => error!(
diff --git a/core/server/src/streaming/topics/storage.rs 
b/core/server/src/streaming/topics/storage.rs
index 8d92b9d1..f7a1fe38 100644
--- a/core/server/src/streaming/topics/storage.rs
+++ b/core/server/src/streaming/topics/storage.rs
@@ -25,13 +25,13 @@ use crate::streaming::topics::consumer_group::ConsumerGroup;
 use crate::streaming::topics::topic::Topic;
 use ahash::AHashSet;
 use anyhow::Context;
+use compio::fs;
+use compio::fs::create_dir_all;
 use error_set::ErrContext;
 use futures::future::join_all;
 use iggy_common::IggyError;
 use iggy_common::locking::IggyRwLock;
 use iggy_common::locking::IggySharedMutFn;
-use monoio::fs;
-use monoio::fs::create_dir_all;
 use serde::{Deserialize, Serialize};
 use std::path::Path;
 use std::sync::Arc;
@@ -188,7 +188,7 @@ impl TopicStorage for FileTopicStorage {
         for mut partition in unloaded_partitions {
             let loaded_partitions = loaded_partitions.clone();
             let partition_state = 
state.partitions.remove(&partition.partition_id).unwrap();
-            let load_partition = monoio::spawn(async move {
+            let load_partition = compio::runtime::spawn(async move {
                 match partition.load(partition_state).await {
                     Ok(_) => {
                         loaded_partitions.lock().await.push(partition);
diff --git a/core/server/src/streaming/utils/file.rs 
b/core/server/src/streaming/utils/file.rs
index 7a5bb524..de00aa5e 100644
--- a/core/server/src/streaming/utils/file.rs
+++ b/core/server/src/streaming/utils/file.rs
@@ -16,7 +16,7 @@
  * under the License.
  */
 
-use monoio::fs::{File, OpenOptions, remove_file};
+use compio::fs::{File, OpenOptions, remove_file};
 use std::path::Path;
 
 pub fn open_std(path: &str) -> Result<std::fs::File, std::io::Error> {
@@ -27,8 +27,14 @@ pub async fn open(path: &str) -> Result<File, 
std::io::Error> {
     OpenOptions::new().read(true).open(path).await
 }
 
-pub async fn append(path: &str) -> Result<File, std::io::Error> {
-    OpenOptions::new().read(true).append(true).open(path).await
+pub async fn append(path: &str) -> Result<(File, u64), std::io::Error> {
+    let file = OpenOptions::new()
+        .create(true)
+        .write(true)
+        .open(path)
+        .await?;
+    let position = file.metadata().await?.len();
+    Ok((file, position))
 }
 
 pub async fn overwrite(path: &str) -> Result<File, std::io::Error> {
@@ -45,7 +51,7 @@ pub async fn remove(path: &str) -> Result<(), std::io::Error> 
{
 }
 
 pub async fn rename(old_path: &str, new_path: &str) -> Result<(), 
std::io::Error> {
-    monoio::fs::rename(Path::new(old_path), Path::new(new_path)).await
+    compio::fs::rename(Path::new(old_path), Path::new(new_path)).await
 }
 
 pub async fn exists(path: &str) -> Result<bool, std::io::Error> {
diff --git a/core/server/src/streaming/utils/pooled_buffer.rs 
b/core/server/src/streaming/utils/pooled_buffer.rs
index a8664325..39ef25be 100644
--- a/core/server/src/streaming/utils/pooled_buffer.rs
+++ b/core/server/src/streaming/utils/pooled_buffer.rs
@@ -18,7 +18,7 @@
 
 use super::memory_pool::{BytesMutExt, memory_pool};
 use bytes::{Buf, BufMut, BytesMut};
-use monoio::buf::{IoBuf, IoBufMut};
+use compio::buf::{IoBuf, IoBufMut, SetBufInit};
 use std::ops::{Deref, DerefMut};
 
 #[derive(Debug)]
@@ -217,26 +217,32 @@ impl Buf for PooledBuffer {
     }
 }
 
-unsafe impl IoBufMut for PooledBuffer {
-    fn write_ptr(&mut self) -> *mut u8 {
-        self.inner.write_ptr()
-    }
-
-    fn bytes_total(&mut self) -> usize {
-        self.inner.bytes_total()
+impl SetBufInit for PooledBuffer {
+    unsafe fn set_buf_init(&mut self, len: usize) {
+        if self.inner.len() <= len {
+            unsafe {
+                self.inner.set_len(len);
+            }
+        }
     }
+}
 
-    unsafe fn set_init(&mut self, pos: usize) {
-        unsafe { self.inner.set_init(pos) }
+unsafe impl IoBufMut for PooledBuffer {
+    fn as_buf_mut_ptr(&mut self) -> *mut u8 {
+        self.inner.as_buf_mut_ptr()
     }
 }
 
 unsafe impl IoBuf for PooledBuffer {
-    fn read_ptr(&self) -> *const u8 {
-        self.inner.read_ptr()
+    fn as_buf_ptr(&self) -> *const u8 {
+        self.inner.as_buf_ptr()
+    }
+
+    fn buf_len(&self) -> usize {
+        self.inner.len()
     }
 
-    fn bytes_init(&self) -> usize {
-        self.inner.bytes_init()
+    fn buf_capacity(&self) -> usize {
+        self.inner.capacity()
     }
 }
diff --git a/core/server/src/tcp/connection_handler.rs 
b/core/server/src/tcp/connection_handler.rs
index 418af387..9c6803b8 100644
--- a/core/server/src/tcp/connection_handler.rs
+++ b/core/server/src/tcp/connection_handler.rs
@@ -62,6 +62,7 @@ pub(crate) async fn handle_connection(
                         if error.as_code() == 
IggyError::ConnectionClosed.as_code() {
                             return Err(ConnectionError::from(error));
                         } else {
+                            error!("got error: {:?}", error);
                             sender.send_error_response(error).await?;
                             continue;
                         }
diff --git a/core/server/src/tcp/sender.rs b/core/server/src/tcp/sender.rs
index 2f445d4f..2c0ecfa9 100644
--- a/core/server/src/tcp/sender.rs
+++ b/core/server/src/tcp/sender.rs
@@ -16,30 +16,37 @@
  * under the License.
  */
 
-use bytes::BytesMut;
-use iggy_common::IggyError;
-use monoio::{
-    buf::IoBufMut,
-    io::{AsyncReadRent, AsyncReadRentExt, AsyncWriteRent, AsyncWriteRentExt},
+use bytes::{Bytes, BytesMut};
+use compio::{
+    BufResult,
+    buf::{IoBuf, IoBufMut},
+    io::{AsyncRead, AsyncReadAtExt, AsyncReadExt, AsyncWriteExt},
 };
+use iggy_common::IggyError;
 use nix::libc;
 use std::io::IoSlice;
-use tracing::debug;
+use tracing::{debug, error};
+
+use crate::streaming::utils::PooledBuffer;
 
 const STATUS_OK: &[u8] = &[0; 4];
 
 pub(crate) async fn read<T, B>(stream: &mut T, buffer: B) -> (Result<usize, 
IggyError>, B)
 where
-    T: AsyncReadRent + AsyncWriteRent + Unpin,
+    T: AsyncReadExt + AsyncWriteExt + Unpin,
     B: IoBufMut,
 {
-    match stream.read_exact(buffer).await {
-        (Ok(0), buffer) => (Err(IggyError::ConnectionClosed), buffer),
-        (Ok(read_bytes), buffer) => (Ok(read_bytes), buffer),
+    let BufResult(result, buffer) = stream.read_exact(buffer).await;
+    match (result, buffer) {
+        (Ok(_), buffer) => (Ok(buffer.buf_len()), buffer),
+        // TODO: How to handle this ?(Ok(0), buffer) => 
(Err(IggyError::ConnectionClosed), buffer),
+        // `read_exact` from compio doesn't return how many bytes it read.
         (Err(error), buffer) => {
             if error.kind() == std::io::ErrorKind::UnexpectedEof {
+                //error!("Got some error tho.. {}", error);
                 (Err(IggyError::ConnectionClosed), buffer)
             } else {
+                //error!("Got some other error tho.. {}", error);
                 (Err(IggyError::TcpError), buffer)
             }
         }
@@ -48,14 +55,14 @@ where
 
 pub(crate) async fn send_empty_ok_response<T>(stream: &mut T) -> Result<(), 
IggyError>
 where
-    T: AsyncReadRent + AsyncWriteRent + Unpin,
+    T: AsyncReadExt + AsyncWriteExt + Unpin,
 {
     send_ok_response(stream, &[]).await
 }
 
 pub(crate) async fn send_ok_response<T>(stream: &mut T, payload: &[u8]) -> 
Result<(), IggyError>
 where
-    T: AsyncReadRent + AsyncWriteRent + Unpin,
+    T: AsyncReadExt + AsyncWriteExt + Unpin,
 {
     send_response(stream, STATUS_OK, payload).await
 }
@@ -63,10 +70,10 @@ where
 pub(crate) async fn send_ok_response_vectored<T>(
     stream: &mut T,
     length: &[u8],
-    slices: Vec<libc::iovec>,
+    slices: Vec<PooledBuffer>,
 ) -> Result<(), IggyError>
 where
-    T: AsyncReadRentExt + AsyncWriteRentExt + Unpin,
+    T: AsyncReadExt + AsyncWriteExt + Unpin,
 {
     send_response_vectored(stream, STATUS_OK, length, slices).await
 }
@@ -76,7 +83,7 @@ pub(crate) async fn send_error_response<T>(
     error: IggyError,
 ) -> Result<(), IggyError>
 where
-    T: AsyncReadRent + AsyncWriteRent + Unpin,
+    T: AsyncReadExt + AsyncWriteExt + Unpin,
 {
     send_response(stream, &error.as_code().to_le_bytes(), &[]).await
 }
@@ -87,7 +94,7 @@ pub(crate) async fn send_response<T>(
     payload: &[u8],
 ) -> Result<(), IggyError>
 where
-    T: AsyncReadRent + AsyncWriteRent + Unpin,
+    T: AsyncReadExt + AsyncWriteExt + Unpin,
 {
     debug!(
         "Sending response of len: {} with status: {:?}...",
@@ -108,32 +115,25 @@ pub(crate) async fn send_response_vectored<T>(
     stream: &mut T,
     status: &[u8],
     length: &[u8],
-    mut slices: Vec<libc::iovec>,
+    mut slices: Vec<PooledBuffer>,
 ) -> Result<(), IggyError>
 where
-    T: AsyncReadRentExt + AsyncWriteRentExt + Unpin,
+    T: AsyncReadExt + AsyncWriteExt + Unpin,
 {
+    let resp_status = u32::from_le_bytes(status.try_into().unwrap());
     debug!(
         "Sending vectored response of len: {} with status: {:?}...",
         slices.len(),
-        status
+        resp_status
     );
-    let prefix = [
-        libc::iovec {
-            iov_base: status.as_ptr() as _,
-            iov_len: status.len(),
-        },
-        libc::iovec {
-            iov_base: length.as_ptr() as _,
-            iov_len: length.len(),
-        },
-    ];
-    slices.splice(0..0, prefix);
+    let status = PooledBuffer::from(status);
+    let length = PooledBuffer::from(length);
+    slices.splice(0..0, [status, length]);
     stream
         .write_vectored_all(slices)
         .await
         .0
         .map_err(|_| IggyError::TcpError)?;
-    debug!("Sent response with status: {:?}", status);
+    debug!("Sent response with status: {:?}", resp_status);
     Ok(())
 }
diff --git a/core/server/src/tcp/tcp_listener.rs 
b/core/server/src/tcp/tcp_listener.rs
index 4f528016..b541305b 100644
--- a/core/server/src/tcp/tcp_listener.rs
+++ b/core/server/src/tcp/tcp_listener.rs
@@ -29,13 +29,19 @@ use std::rc::Rc;
 use std::time::Duration;
 use tracing::{error, info};
 
-pub async fn start(server_name: &'static str, addr: SocketAddr, socket: 
Socket, shard: Rc<IggyShard>) -> Result<(), IggyError> {
+pub async fn start(
+    server_name: &'static str,
+    addr: SocketAddr,
+    socket: Socket,
+    shard: Rc<IggyShard>,
+) -> Result<(), IggyError> {
     socket
         .bind(&addr.into())
         .expect("Failed to bind TCP listener");
     socket.listen(1024).unwrap();
+    //TODO: Fix it later to use `TcpOpts` from compio.
     let listener: std::net::TcpListener = socket.into();
-    let listener = monoio::net::TcpListener::from_std(listener).unwrap();
+    let listener = compio::net::TcpListener::from_std(listener).unwrap();
     info!("{server_name} server has started on: {:?}", addr);
 
     loop {
@@ -44,7 +50,7 @@ pub async fn start(server_name: &'static str, addr: 
SocketAddr, socket: Socket,
                 if shard.is_shutting_down() {
                     return;
                 }
-                monoio::time::sleep(Duration::from_millis(100)).await;
+                compio::time::sleep(Duration::from_millis(100)).await;
             }
         };
 
diff --git a/core/server/src/tcp/tcp_sender.rs 
b/core/server/src/tcp/tcp_sender.rs
index cfa25aad..d0a1c27c 100644
--- a/core/server/src/tcp/tcp_sender.rs
+++ b/core/server/src/tcp/tcp_sender.rs
@@ -17,14 +17,15 @@
  */
 
 use crate::binary::sender::Sender;
+use crate::streaming::utils::PooledBuffer;
 use crate::tcp::COMPONENT;
 use crate::{server_error::ServerError, tcp::sender};
 use bytes::BytesMut;
+use compio::buf::{IoBuf, IoBufMut};
+use compio::io::AsyncWrite;
+use compio::net::TcpStream;
 use error_set::ErrContext;
 use iggy_common::IggyError;
-use monoio::buf::IoBufMut;
-use monoio::io::AsyncWriteRent;
-use monoio::net::TcpStream;
 use nix::libc;
 
 #[derive(Debug)]
@@ -62,7 +63,7 @@ impl Sender for TcpSender {
     async fn send_ok_response_vectored(
         &mut self,
         length: &[u8],
-        slices: Vec<libc::iovec>,
+        slices: Vec<PooledBuffer>,
     ) -> Result<(), IggyError> {
         sender::send_ok_response_vectored(&mut self.stream, length, 
slices).await
     }
diff --git a/core/server/src/tcp/tcp_tls_listener.rs 
b/core/server/src/tcp/tcp_tls_listener.rs
index 2c81eb60..00d7b5c4 100644
--- a/core/server/src/tcp/tcp_tls_listener.rs
+++ b/core/server/src/tcp/tcp_tls_listener.rs
@@ -24,7 +24,6 @@ use crate::streaming::clients::client_manager::Transport;
 use crate::tcp::connection_handler::{handle_connection, handle_error};
 use futures::FutureExt;
 use iggy_common::IggyError;
-use monoio_rustls::TlsAcceptor;
 use rustls::ServerConfig;
 use rustls::pki_types::{CertificateDer, PrivateKeyDer};
 use rustls_pemfile::{certs, private_key};
@@ -42,106 +41,108 @@ pub(crate) async fn start(
     socket: Socket,
     shard: Rc<IggyShard>,
 ) -> Result<(), IggyError> {
-        let _ = 
rustls::crypto::aws_lc_rs::default_provider().install_default();
-        let config = &shard.config.tcp.tls;
-
-        let (certs, key) =
-            if config.self_signed && 
!std::path::Path::new(&config.cert_file).exists() {
-                info!("Generating self-signed certificate for TCP TLS server");
-                generate_self_signed_cert()
-                    .unwrap_or_else(|e| panic!("Failed to generate self-signed 
certificate: {e}"))
-            } else {
-                load_certificates(&config.cert_file, &config.key_file)
-                    .unwrap_or_else(|e| panic!("Failed to load certificates: 
{e}"))
-            };
-
-        let server_config = ServerConfig::builder()
-            .with_no_client_auth()
-            .with_single_cert(certs, key)
-            .unwrap_or_else(|e| panic!("Unable to create TLS server config: 
{e}"));
-
-        let acceptor = TlsAcceptor::from(Arc::new(server_config));
-
-        socket
-            .bind(&addr.into())
-            .unwrap_or_else(|e| panic!("Unable to bind socket to address 
'{addr}': {e}",));
-
-        let listener: std::net::TcpListener = socket.into();
-        let listener = monoio::net::TcpListener::from_std(listener).unwrap();
-        info!("{server_name} server has started on: {:?}", addr);
-
-        loop {
-            let shutdown_check = async {
-                loop {
-                    if shard.is_shutting_down() {
-                        return;
-                    }
-                    monoio::time::sleep(Duration::from_millis(100)).await;
+    /*
+    let _ = rustls::crypto::aws_lc_rs::default_provider().install_default();
+    let config = &shard.config.tcp.tls;
+
+    let (certs, key) =
+        if config.self_signed && 
!std::path::Path::new(&config.cert_file).exists() {
+            info!("Generating self-signed certificate for TCP TLS server");
+            generate_self_signed_cert()
+                .unwrap_or_else(|e| panic!("Failed to generate self-signed 
certificate: {e}"))
+        } else {
+            load_certificates(&config.cert_file, &config.key_file)
+                .unwrap_or_else(|e| panic!("Failed to load certificates: {e}"))
+        };
+
+    let server_config = ServerConfig::builder()
+        .with_no_client_auth()
+        .with_single_cert(certs, key)
+        .unwrap_or_else(|e| panic!("Unable to create TLS server config: {e}"));
+
+    let acceptor = TlsAcceptor::from(Arc::new(server_config));
+
+    socket
+        .bind(&addr.into())
+        .unwrap_or_else(|e| panic!("Unable to bind socket to address '{addr}': 
{e}",));
+
+    let listener: std::net::TcpListener = socket.into();
+    let listener = monoio::net::TcpListener::from_std(listener).unwrap();
+    info!("{server_name} server has started on: {:?}", addr);
+
+    loop {
+        let shutdown_check = async {
+            loop {
+                if shard.is_shutting_down() {
+                    return;
                 }
-            };
+                monoio::time::sleep(Duration::from_millis(100)).await;
+            }
+        };
 
-            let accept_future = listener.accept();
-            futures::select! {
-                _ = shutdown_check.fuse() => {
-                    info!("TCP TLS server detected shutdown flag, no longer 
accepting connections");
-                    break;
-                }
-                result = accept_future.fuse() => {
-                    match result {
-                        Ok((stream, address)) => {
-                            if shard.is_shutting_down() {
-                                info!("Rejecting new TLS connection from {} 
during shutdown", address);
-                                continue;
-                            }
-                            let shard_clone = shard.clone();
-                            info!("Accepted new TCP TLS connection: {}", 
address);
-                            let transport = Transport::Tcp;
-                            let session = shard_clone.add_client(&address, 
transport);
-                            //TODO: Those can be shared with other shards.
-                            shard_clone.add_active_session(session.clone());
-                            // Broadcast session to all shards.
-                            let event = ShardEvent::NewSession { address, 
transport };
-                            // TODO: Fixme look inside of 
broadcast_event_to_all_shards method.
-                            let _responses = 
shard_clone.broadcast_event_to_all_shards(event.into());
-
-                            let client_id = session.client_id;
-                            info!("Created new session: {session}");
-                            let acceptor = acceptor.clone();
-                            
-                            let conn_stop_receiver = 
shard_clone.task_registry.add_connection(client_id);
-
-                            let shard_for_conn = shard_clone.clone();
-                            shard_clone.task_registry.spawn_tracked(async move 
{
-                                match acceptor.accept(stream).await {
-                                    Ok(tls_stream) => {
-                                        let mut sender = 
SenderKind::get_tcp_tls_sender(tls_stream.into());
-                                        if let Err(error) = 
handle_connection(&session, &mut sender, &shard_for_conn, 
conn_stop_receiver).await {
-                                            handle_error(error);
-                                        }
-                                        
shard_for_conn.task_registry.remove_connection(&client_id);
-
-                                        if let Err(error) = 
sender.shutdown().await {
-                                            error!(
-                                                "Failed to shutdown TCP TLS 
stream for client: {client_id}, address: {address}. {error}"
-                                            );
-                                        } else {
-                                            info!(
-                                                "Successfully closed TCP TLS 
stream for client: {client_id}, address: {address}."
-                                            );
-                                        }
+        let accept_future = listener.accept();
+        futures::select! {
+            _ = shutdown_check.fuse() => {
+                info!("TCP TLS server detected shutdown flag, no longer 
accepting connections");
+                break;
+            }
+            result = accept_future.fuse() => {
+                match result {
+                    Ok((stream, address)) => {
+                        if shard.is_shutting_down() {
+                            info!("Rejecting new TLS connection from {} during 
shutdown", address);
+                            continue;
+                        }
+                        let shard_clone = shard.clone();
+                        info!("Accepted new TCP TLS connection: {}", address);
+                        let transport = Transport::Tcp;
+                        let session = shard_clone.add_client(&address, 
transport);
+                        //TODO: Those can be shared with other shards.
+                        shard_clone.add_active_session(session.clone());
+                        // Broadcast session to all shards.
+                        let event = ShardEvent::NewSession { address, 
transport };
+                        // TODO: Fixme look inside of 
broadcast_event_to_all_shards method.
+                        let _responses = 
shard_clone.broadcast_event_to_all_shards(event.into());
+
+                        let client_id = session.client_id;
+                        info!("Created new session: {session}");
+                        let acceptor = acceptor.clone();
+
+                        let conn_stop_receiver = 
shard_clone.task_registry.add_connection(client_id);
+
+                        let shard_for_conn = shard_clone.clone();
+                        shard_clone.task_registry.spawn_tracked(async move {
+                            match acceptor.accept(stream).await {
+                                Ok(tls_stream) => {
+                                    let mut sender = 
SenderKind::get_tcp_tls_sender(tls_stream.into());
+                                    if let Err(error) = 
handle_connection(&session, &mut sender, &shard_for_conn, 
conn_stop_receiver).await {
+                                        handle_error(error);
                                     }
-                                    Err(e) => {
-                                        error!("Failed to accept TLS 
connection from '{address}': {e}");
-                                        
shard_for_conn.task_registry.remove_connection(&client_id);
+                                    
shard_for_conn.task_registry.remove_connection(&client_id);
+
+                                    if let Err(error) = 
sender.shutdown().await {
+                                        error!(
+                                            "Failed to shutdown TCP TLS stream 
for client: {client_id}, address: {address}. {error}"
+                                        );
+                                    } else {
+                                        info!(
+                                            "Successfully closed TCP TLS 
stream for client: {client_id}, address: {address}."
+                                        );
                                     }
                                 }
-                            });
-                        }
-                        Err(error) => error!("Unable to accept TCP TLS socket. 
{error}"),
+                                Err(e) => {
+                                    error!("Failed to accept TLS connection 
from '{address}': {e}");
+                                    
shard_for_conn.task_registry.remove_connection(&client_id);
+                                }
+                            }
+                        });
                     }
+                    Err(error) => error!("Unable to accept TCP TLS socket. 
{error}"),
                 }
             }
         }
+    }
+    */
     Ok(())
 }
 
diff --git a/core/server/src/tcp/tcp_tls_sender.rs 
b/core/server/src/tcp/tcp_tls_sender.rs
index b214e5a6..8dbb3972 100644
--- a/core/server/src/tcp/tcp_tls_sender.rs
+++ b/core/server/src/tcp/tcp_tls_sender.rs
@@ -17,41 +17,46 @@
  */
 
 use crate::binary::sender::Sender;
+use crate::streaming::utils::PooledBuffer;
 use crate::tcp::COMPONENT;
 use crate::{server_error::ServerError, tcp::sender};
 use bytes::BytesMut;
+use compio::buf::IoBufMut;
+use compio::io::AsyncWrite;
+use compio::net::TcpStream;
 use error_set::ErrContext;
 use iggy_common::IggyError;
-use monoio::buf::IoBufMut;
-use monoio::io::AsyncWriteRent;
-use monoio::net::TcpStream;
-use monoio_rustls::ServerTlsStream;
 //use tokio_rustls::server::TlsStream;
 use nix::libc;
 
 #[derive(Debug)]
 pub struct TcpTlsSender {
-    pub(crate) stream: ServerTlsStream<TcpStream>,
+    pub(crate) stream: TcpStream,
 }
 
 impl Sender for TcpTlsSender {
     async fn read<B: IoBufMut>(&mut self, buffer: B) -> (Result<usize, 
IggyError>, B) {
+        todo!();
         sender::read(&mut self.stream, buffer).await
     }
 
     async fn send_empty_ok_response(&mut self) -> Result<(), IggyError> {
+        todo!();
         sender::send_empty_ok_response(&mut self.stream).await
     }
 
     async fn send_ok_response(&mut self, payload: &[u8]) -> Result<(), 
IggyError> {
+        todo!();
         sender::send_ok_response(&mut self.stream, payload).await
     }
 
     async fn send_error_response(&mut self, error: IggyError) -> Result<(), 
IggyError> {
+        todo!();
         sender::send_error_response(&mut self.stream, error).await
     }
 
     async fn shutdown(&mut self) -> Result<(), ServerError> {
+        todo!();
         self.stream
             .shutdown()
             .await
@@ -64,8 +69,9 @@ impl Sender for TcpTlsSender {
     async fn send_ok_response_vectored(
         &mut self,
         length: &[u8],
-        slices: Vec<libc::iovec>,
+        slices: Vec<PooledBuffer>,
     ) -> Result<(), IggyError> {
+        todo!();
         sender::send_ok_response_vectored(&mut self.stream, length, 
slices).await
     }
 }

Reply via email to