This is an automated email from the ASF dual-hosted git repository.
gkoszyk pushed a commit to branch io_uring_tpc
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/io_uring_tpc by this push:
new 4f07d68c feat(io_uring): move to compio runtime (#1960)
4f07d68c is described below
commit 4f07d68c87c6556fcbc82bdd59653ab1783990d1
Author: Grzegorz Koszyk <[email protected]>
AuthorDate: Fri Jul 4 15:27:28 2025 +0200
feat(io_uring): move to compio runtime (#1960)
---
Cargo.lock | 418 ++++++++++++++-------
core/server/Cargo.toml | 4 +-
.../handlers/messages/poll_messages_handler.rs | 25 +-
.../binary/handlers/topics/create_topic_handler.rs | 3 +-
core/server/src/binary/sender.rs | 17 +-
core/server/src/bootstrap.rs | 35 +-
.../src/compat/index_rebuilding/index_rebuilder.rs | 42 ++-
core/server/src/io/file.rs | 117 ------
core/server/src/io/fs_utils.rs | 2 +-
core/server/src/io/mod.rs | 3 -
core/server/src/io/reader.rs | 44 ---
core/server/src/io/writer.rs | 57 ---
core/server/src/log/runtime.rs | 48 +++
core/server/src/main.rs | 52 ++-
core/server/src/quic/quic_sender.rs | 5 +-
core/server/src/shard/mod.rs | 15 +-
core/server/src/shard/system/snapshot/mod.rs | 50 ++-
core/server/src/shard/system/storage.rs | 11 +-
core/server/src/shard/system/streams.rs | 2 +-
core/server/src/shard/system/topics.rs | 35 +-
core/server/src/shard/task_registry.rs | 8 +-
core/server/src/shard/tasks/messages.rs | 2 +-
core/server/src/shard/transmission/event.rs | 3 +-
core/server/src/state/file.rs | 36 +-
core/server/src/streaming/partitions/storage.rs | 27 +-
core/server/src/streaming/persistence/persister.rs | 24 +-
core/server/src/streaming/persistence/task.rs | 10 +-
.../src/streaming/segments/indexes/index_reader.rs | 16 +-
.../src/streaming/segments/indexes/index_writer.rs | 15 +-
.../streaming/segments/messages/messages_reader.rs | 20 +-
.../streaming/segments/messages/messages_writer.rs | 23 +-
core/server/src/streaming/segments/messages/mod.rs | 32 +-
core/server/src/streaming/segments/segment.rs | 6 +-
core/server/src/streaming/streams/storage.rs | 6 +-
core/server/src/streaming/topics/storage.rs | 6 +-
core/server/src/streaming/utils/file.rs | 14 +-
core/server/src/streaming/utils/pooled_buffer.rs | 34 +-
core/server/src/tcp/connection_handler.rs | 1 +
core/server/src/tcp/sender.rs | 62 +--
core/server/src/tcp/tcp_listener.rs | 12 +-
core/server/src/tcp/tcp_sender.rs | 9 +-
core/server/src/tcp/tcp_tls_listener.rs | 185 ++++-----
core/server/src/tcp/tcp_tls_sender.rs | 18 +-
43 files changed, 809 insertions(+), 745 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index ccecbd7b..d9383e8a 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -142,7 +142,7 @@ dependencies = [
"actix-utils",
"futures-core",
"futures-util",
- "mio 1.0.4",
+ "mio",
"socket2",
"tokio",
"tracing",
@@ -307,6 +307,15 @@ dependencies = [
"memchr",
]
+[[package]]
+name = "aligned-array"
+version = "1.0.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e05c92d086290f52938013f6242ac62bf7d401fab8ad36798a609faa65c3fd2c"
+dependencies = [
+ "generic-array",
+]
+
[[package]]
name = "alloc-no-stdlib"
version = "2.0.4"
@@ -578,6 +587,12 @@ dependencies = [
"syn 2.0.104",
]
+[[package]]
+name = "async-task"
+version = "4.7.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "8b75356056920673b02621b35afd0f7dda9306d03c79a30f5c56c44cf256e3de"
+
[[package]]
name = "async-trait"
version = "0.1.88"
@@ -643,17 +658,6 @@ dependencies = [
"webpki-roots 0.26.11",
]
-[[package]]
-name = "auto-const-array"
-version = "0.2.2"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "fd73835ad7deb4bd2b389e6f10333b143f025d607c55ca04c66a0bcc6bb2fc6d"
-dependencies = [
- "proc-macro2",
- "quote",
- "syn 2.0.104",
-]
-
[[package]]
name = "autocfg"
version = "1.5.0"
@@ -1529,6 +1533,216 @@ dependencies = [
"static_assertions",
]
+[[package]]
+name = "compio"
+version = "0.15.0"
+source =
"git+https://github.com/compio-rs/compio.git?rev=5504e02ec2e821e9b024006224c34c06646410cc#5504e02ec2e821e9b024006224c34c06646410cc"
+dependencies = [
+ "compio-buf",
+ "compio-dispatcher",
+ "compio-driver",
+ "compio-fs",
+ "compio-io",
+ "compio-log",
+ "compio-macros",
+ "compio-net",
+ "compio-process",
+ "compio-quic",
+ "compio-runtime",
+ "compio-signal",
+ "compio-tls",
+]
+
+[[package]]
+name = "compio-buf"
+version = "0.6.0"
+source =
"git+https://github.com/compio-rs/compio.git?rev=5504e02ec2e821e9b024006224c34c06646410cc#5504e02ec2e821e9b024006224c34c06646410cc"
+dependencies = [
+ "arrayvec",
+ "bytes",
+ "libc",
+]
+
+[[package]]
+name = "compio-dispatcher"
+version = "0.7.0"
+source =
"git+https://github.com/compio-rs/compio.git?rev=5504e02ec2e821e9b024006224c34c06646410cc#5504e02ec2e821e9b024006224c34c06646410cc"
+dependencies = [
+ "compio-driver",
+ "compio-runtime",
+ "flume",
+ "futures-channel",
+]
+
+[[package]]
+name = "compio-driver"
+version = "0.8.1"
+source =
"git+https://github.com/compio-rs/compio.git?rev=5504e02ec2e821e9b024006224c34c06646410cc#5504e02ec2e821e9b024006224c34c06646410cc"
+dependencies = [
+ "aligned-array",
+ "cfg-if",
+ "cfg_aliases",
+ "compio-buf",
+ "compio-log",
+ "crossbeam-channel",
+ "crossbeam-queue",
+ "futures-util",
+ "io-uring",
+ "io_uring_buf_ring",
+ "libc",
+ "once_cell",
+ "paste",
+ "polling",
+ "slab",
+ "socket2",
+ "windows-sys 0.52.0",
+]
+
+[[package]]
+name = "compio-fs"
+version = "0.8.0"
+source =
"git+https://github.com/compio-rs/compio.git?rev=5504e02ec2e821e9b024006224c34c06646410cc#5504e02ec2e821e9b024006224c34c06646410cc"
+dependencies = [
+ "cfg-if",
+ "cfg_aliases",
+ "compio-buf",
+ "compio-driver",
+ "compio-io",
+ "compio-runtime",
+ "libc",
+ "os_pipe",
+ "widestring",
+ "windows-sys 0.52.0",
+]
+
+[[package]]
+name = "compio-io"
+version = "0.7.0"
+source =
"git+https://github.com/compio-rs/compio.git?rev=5504e02ec2e821e9b024006224c34c06646410cc#5504e02ec2e821e9b024006224c34c06646410cc"
+dependencies = [
+ "compio-buf",
+ "futures-util",
+ "paste",
+ "pin-project-lite",
+]
+
+[[package]]
+name = "compio-log"
+version = "0.1.0"
+source =
"git+https://github.com/compio-rs/compio.git?rev=5504e02ec2e821e9b024006224c34c06646410cc#5504e02ec2e821e9b024006224c34c06646410cc"
+dependencies = [
+ "tracing",
+]
+
+[[package]]
+name = "compio-macros"
+version = "0.1.2"
+source =
"git+https://github.com/compio-rs/compio.git?rev=5504e02ec2e821e9b024006224c34c06646410cc#5504e02ec2e821e9b024006224c34c06646410cc"
+dependencies = [
+ "proc-macro-crate 3.3.0",
+ "proc-macro2",
+ "quote",
+ "syn 2.0.104",
+]
+
+[[package]]
+name = "compio-net"
+version = "0.8.0"
+source =
"git+https://github.com/compio-rs/compio.git?rev=5504e02ec2e821e9b024006224c34c06646410cc#5504e02ec2e821e9b024006224c34c06646410cc"
+dependencies = [
+ "cfg-if",
+ "compio-buf",
+ "compio-driver",
+ "compio-io",
+ "compio-runtime",
+ "either",
+ "libc",
+ "once_cell",
+ "socket2",
+ "widestring",
+ "windows-sys 0.52.0",
+]
+
+[[package]]
+name = "compio-process"
+version = "0.5.0"
+source =
"git+https://github.com/compio-rs/compio.git?rev=5504e02ec2e821e9b024006224c34c06646410cc#5504e02ec2e821e9b024006224c34c06646410cc"
+dependencies = [
+ "cfg-if",
+ "compio-buf",
+ "compio-driver",
+ "compio-io",
+ "compio-runtime",
+ "futures-util",
+ "windows-sys 0.52.0",
+]
+
+[[package]]
+name = "compio-quic"
+version = "0.4.0"
+source =
"git+https://github.com/compio-rs/compio.git?rev=5504e02ec2e821e9b024006224c34c06646410cc#5504e02ec2e821e9b024006224c34c06646410cc"
+dependencies = [
+ "cfg_aliases",
+ "compio-buf",
+ "compio-io",
+ "compio-log",
+ "compio-net",
+ "compio-runtime",
+ "flume",
+ "futures-util",
+ "libc",
+ "quinn-proto",
+ "rustc-hash 2.1.1",
+ "rustls",
+ "thiserror 2.0.12",
+ "windows-sys 0.52.0",
+]
+
+[[package]]
+name = "compio-runtime"
+version = "0.8.1"
+source =
"git+https://github.com/compio-rs/compio.git?rev=5504e02ec2e821e9b024006224c34c06646410cc#5504e02ec2e821e9b024006224c34c06646410cc"
+dependencies = [
+ "async-task",
+ "cfg-if",
+ "compio-buf",
+ "compio-driver",
+ "compio-log",
+ "crossbeam-queue",
+ "futures-util",
+ "libc",
+ "once_cell",
+ "scoped-tls",
+ "slab",
+ "socket2",
+ "windows-sys 0.52.0",
+]
+
+[[package]]
+name = "compio-signal"
+version = "0.6.0"
+source =
"git+https://github.com/compio-rs/compio.git?rev=5504e02ec2e821e9b024006224c34c06646410cc#5504e02ec2e821e9b024006224c34c06646410cc"
+dependencies = [
+ "compio-buf",
+ "compio-driver",
+ "compio-runtime",
+ "libc",
+ "once_cell",
+ "os_pipe",
+ "slab",
+ "windows-sys 0.52.0",
+]
+
+[[package]]
+name = "compio-tls"
+version = "0.6.0"
+source =
"git+https://github.com/compio-rs/compio.git?rev=5504e02ec2e821e9b024006224c34c06646410cc#5504e02ec2e821e9b024006224c34c06646410cc"
+dependencies = [
+ "compio-buf",
+ "compio-io",
+ "rustls",
+]
+
[[package]]
name = "concurrent-queue"
version = "2.5.0"
@@ -1696,6 +1910,17 @@ version = "0.8.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b"
+[[package]]
+name = "core_affinity"
+version = "0.8.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a034b3a7b624016c6e13f5df875747cc25f884156aad2abd12b6c46797971342"
+dependencies = [
+ "libc",
+ "num_cpus",
+ "winapi",
+]
+
[[package]]
name = "cpufeatures"
version = "0.2.17"
@@ -1846,7 +2071,7 @@ version = "3.4.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "46f93780a459b7d656ef7f071fe699c4d3d2cb201c4b24d085b6ddc505276e73"
dependencies = [
- "nix 0.30.1",
+ "nix",
"windows-sys 0.59.0",
]
@@ -2682,15 +2907,6 @@ dependencies = [
"slab",
]
-[[package]]
-name = "fxhash"
-version = "0.2.1"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "c31b6d751ae2c7f11320402d34e41349dd1016f8d5d45e48c4312bc8625af50c"
-dependencies = [
- "byteorder",
-]
-
[[package]]
name = "generator"
version = "0.8.5"
@@ -4143,14 +4359,26 @@ dependencies = [
[[package]]
name = "io-uring"
-version = "0.6.4"
+version = "0.7.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "595a0399f411a508feb2ec1e970a4a30c249351e30208960d58298de8660b0e5"
+checksum = "b86e202f00093dcba4275d4636b93ef9dd75d025ae560d2521b45ea28ab49013"
dependencies = [
- "bitflags 1.3.2",
+ "bitflags 2.9.1",
+ "cfg-if",
"libc",
]
+[[package]]
+name = "io_uring_buf_ring"
+version = "0.2.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "8a8867874ff5758b47c1dac069e6e86541432f9da8be9111c5e94154134f07d0"
+dependencies = [
+ "bytes",
+ "io-uring",
+ "rustix 1.0.7",
+]
+
[[package]]
name = "ipnet"
version = "2.11.0"
@@ -4657,15 +4885,6 @@ version = "2.7.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "32a282da65faaf38286cf3be983213fcf1d2e2a58700e808f83f4ea9a4804bc0"
-[[package]]
-name = "memoffset"
-version = "0.7.1"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "5de893c32cde5f383baa4c04c5d6dbdd735cfd4a794b0debdb2bb1b421da5ff4"
-dependencies = [
- "autocfg",
-]
-
[[package]]
name = "mimalloc"
version = "0.1.47"
@@ -4715,18 +4934,6 @@ dependencies = [
"adler2",
]
-[[package]]
-name = "mio"
-version = "0.8.11"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "a4a650543ca06a924e8b371db273b2756685faae30f8487da1b56505a8f78b0c"
-dependencies = [
- "libc",
- "log",
- "wasi 0.11.1+wasi-snapshot-preview1",
- "windows-sys 0.48.0",
-]
-
[[package]]
name = "mio"
version = "1.0.4"
@@ -4787,62 +4994,6 @@ dependencies = [
"uuid",
]
-[[package]]
-name = "monoio"
-version = "0.2.4"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "3bd0f8bcde87b1949f95338b547543fcab187bc7e7a5024247e359a5e828ba6a"
-dependencies = [
- "auto-const-array",
- "bytes",
- "flume",
- "fxhash",
- "io-uring",
- "libc",
- "memchr",
- "mio 0.8.11",
- "monoio-macros",
- "nix 0.26.4",
- "once_cell",
- "pin-project-lite",
- "socket2",
- "threadpool",
- "windows-sys 0.48.0",
-]
-
-[[package]]
-name = "monoio-io-wrapper"
-version = "0.1.1"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "4bcfaa76e5daf87cc4d31b4d1b6bc93c12db59c19df50b9200afdbde42077655"
-dependencies = [
- "monoio",
-]
-
-[[package]]
-name = "monoio-macros"
-version = "0.1.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "176a5f5e69613d9e88337cf2a65e11135332b4efbcc628404a7c555e4452084c"
-dependencies = [
- "proc-macro2",
- "quote",
- "syn 2.0.104",
-]
-
-[[package]]
-name = "monoio-rustls"
-version = "0.4.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "6e31f422825bd7fb19957af6eaf89d7234ba143fcc0e515f5a2f526e332d1875"
-dependencies = [
- "bytes",
- "monoio",
- "monoio-io-wrapper",
- "rustls",
- "thiserror 1.0.69",
-]
-
[[package]]
name = "nanorand"
version = "0.7.0"
@@ -4858,19 +5009,6 @@ version = "6.6.666"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf5a574dadd7941adeaa71823ecba5e28331b8313fb2e1c6a5c7e5981ea53ad6"
-[[package]]
-name = "nix"
-version = "0.26.4"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "598beaf3cc6fdd9a5dfb1630c2800c7acd31df7aaf0f565796fba2b53ca1af1b"
-dependencies = [
- "bitflags 1.3.2",
- "cfg-if",
- "libc",
- "memoffset",
- "pin-utils",
-]
-
[[package]]
name = "nix"
version = "0.30.1"
@@ -4935,7 +5073,7 @@ dependencies = [
"kqueue",
"libc",
"log",
- "mio 1.0.4",
+ "mio",
"notify-types",
"walkdir",
"windows-sys 0.59.0",
@@ -5351,6 +5489,16 @@ dependencies = [
"hashbrown 0.14.5",
]
+[[package]]
+name = "os_pipe"
+version = "1.2.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "db335f4760b14ead6290116f2427bf33a14d4f0617d49f78a246de10c1831224"
+dependencies = [
+ "libc",
+ "windows-sys 0.59.0",
+]
+
[[package]]
name = "overload"
version = "0.1.1"
@@ -5602,6 +5750,21 @@ version = "0.3.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7edddbd0b52d732b21ad9a5fab5c704c14cd949e5e9a1ec5929a24fded1b904c"
+[[package]]
+name = "polling"
+version = "3.8.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b53a684391ad002dd6a596ceb6c74fd004fdce75f4be2e3f615068abbea5fd50"
+dependencies = [
+ "cfg-if",
+ "concurrent-queue",
+ "hermit-abi",
+ "pin-project-lite",
+ "rustix 1.0.7",
+ "tracing",
+ "windows-sys 0.59.0",
+]
+
[[package]]
name = "polonius-the-crab"
version = "0.2.1"
@@ -6874,7 +7037,9 @@ dependencies = [
"bytes",
"chrono",
"clap",
+ "compio",
"console-subscriber",
+ "core_affinity",
"crossbeam",
"ctrlc",
"dashmap",
@@ -6894,9 +7059,7 @@ dependencies = [
"mimalloc",
"mockall",
"moka",
- "monoio",
- "monoio-rustls",
- "nix 0.30.1",
+ "nix",
"once_cell",
"opentelemetry",
"opentelemetry-appender-tracing",
@@ -7409,15 +7572,6 @@ dependencies = [
"cfg-if",
]
-[[package]]
-name = "threadpool"
-version = "1.8.1"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "d050e60b33d41c19108b32cea32164033a9013fe3b46cbd4457559bfbf77afaa"
-dependencies = [
- "num_cpus",
-]
-
[[package]]
name = "time"
version = "0.3.41"
@@ -7494,7 +7648,7 @@ dependencies = [
"backtrace",
"bytes",
"libc",
- "mio 1.0.4",
+ "mio",
"parking_lot 0.12.4",
"pin-project-lite",
"signal-hook-registry",
@@ -8330,6 +8484,12 @@ dependencies = [
"safe_arch",
]
+[[package]]
+name = "widestring"
+version = "1.2.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "dd7cf3379ca1aac9eea11fba24fd7e315d621f8dfe35c8d7d2be8b793726e07d"
+
[[package]]
name = "winapi"
version = "0.3.9"
diff --git a/core/server/Cargo.toml b/core/server/Cargo.toml
index f91ac15c..57ab996a 100644
--- a/core/server/Cargo.toml
+++ b/core/server/Cargo.toml
@@ -63,12 +63,12 @@ human-repr = { workspace = true }
iggy_common = { workspace = true }
jsonwebtoken = "9.3.1"
socket2 = "0.5.10"
+core_affinity = "0.8.0"
lending-iterator = "0.1.7"
hash32 = "1.0.0"
mimalloc = { workspace = true, optional = true }
moka = { version = "0.12.10", features = ["future"] }
-monoio = { version = "0.2.4", features = ["mkdirat", "unlinkat", "renameat",
"sync"] }
-monoio-rustls = "0.4.0"
+compio = { git = "https://github.com/compio-rs/compio.git", rev =
"5504e02ec2e821e9b024006224c34c06646410cc", features = ["runtime", "macros",
"io-uring", "time", "rustls"] }
nix = { version = "0.30", features = ["fs"] }
once_cell = "1.21.3"
opentelemetry = { version = "0.30.0", features = ["trace", "logs"] }
diff --git a/core/server/src/binary/handlers/messages/poll_messages_handler.rs
b/core/server/src/binary/handlers/messages/poll_messages_handler.rs
index dfc4a89f..2f02a039 100644
--- a/core/server/src/binary/handlers/messages/poll_messages_handler.rs
+++ b/core/server/src/binary/handlers/messages/poll_messages_handler.rs
@@ -27,8 +27,10 @@ use crate::shard::transmission::frame::ShardResponse;
use crate::shard::transmission::message::{ShardMessage, ShardRequest,
ShardRequestPayload};
use crate::streaming::segments::IggyMessagesBatchSet;
use crate::streaming::session::Session;
+use crate::streaming::utils::PooledBuffer;
use crate::to_iovec;
use anyhow::Result;
+use bytes::BytesMut;
use error_set::ErrContext;
use iggy_common::{IggyError, PollMessages};
use std::io::IoSlice;
@@ -76,7 +78,7 @@ impl ServerCommandHandler for PollMessages {
let user_id = session.get_user_id();
let client_id = session.client_id;
- let (metadata, batch) = shard
+ let (metadata, mut batch) = shard
.poll_messages(
client_id,
user_id,
@@ -96,16 +98,19 @@ impl ServerCommandHandler for PollMessages {
let response_length = 4 + 8 + 4 + batch.size();
let response_length_bytes = response_length.to_le_bytes();
- let partition_id = metadata.partition_id.to_le_bytes();
- let current_offset = metadata.current_offset.to_le_bytes();
- let count = batch.count().to_le_bytes();
+ let mut bufs = Vec::with_capacity(batch.containers_count() + 5);
+ let mut partition_id_buf = PooledBuffer::with_capacity(4);
+ let mut current_offset_buf = PooledBuffer::with_capacity(8);
+ let mut count_buf = PooledBuffer::with_capacity(4);
+ partition_id_buf.put_u32_le(metadata.partition_id);
+ current_offset_buf.put_u64_le(metadata.current_offset);
+ count_buf.put_u32_le(batch.count());
- let mut iovecs = Vec::with_capacity(batch.containers_count() + 3);
- iovecs.push(to_iovec(&partition_id));
- iovecs.push(to_iovec(¤t_offset));
- iovecs.push(to_iovec(&count));
+ bufs.push(partition_id_buf);
+ bufs.push(current_offset_buf);
+ bufs.push(count_buf);
- iovecs.extend(batch.iter().map(|m| to_iovec(&m)));
+ batch.iter_mut().for_each(|m| bufs.push(m.take_messages()));
trace!(
"Sending {} messages to client ({} bytes) to client",
batch.count(),
@@ -113,7 +118,7 @@ impl ServerCommandHandler for PollMessages {
);
sender
- .send_ok_response_vectored(&response_length_bytes, iovecs)
+ .send_ok_response_vectored(&response_length_bytes, bufs)
.await?;
Ok(())
}
diff --git a/core/server/src/binary/handlers/topics/create_topic_handler.rs
b/core/server/src/binary/handlers/topics/create_topic_handler.rs
index 6785e4f0..51088b55 100644
--- a/core/server/src/binary/handlers/topics/create_topic_handler.rs
+++ b/core/server/src/binary/handlers/topics/create_topic_handler.rs
@@ -48,7 +48,7 @@ impl ServerCommandHandler for CreateTopic {
debug!("session: {session}, command: {self}");
let stream_id = self.stream_id.clone();
let topic_id = self.topic_id;
- let created_topic_id = shard
+ let (shards_assignment, created_topic_id) = shard
.create_topic(
session,
&self.stream_id,
@@ -72,6 +72,7 @@ impl ServerCommandHandler for CreateTopic {
compression_algorithm: self.compression_algorithm,
max_topic_size: self.max_topic_size,
replication_factor: self.replication_factor,
+ shards_assignment,
};
// Broadcast the event to all shards.
let _responses = shard.broadcast_event_to_all_shards(event.into());
diff --git a/core/server/src/binary/sender.rs b/core/server/src/binary/sender.rs
index 45066c75..96bed3d5 100644
--- a/core/server/src/binary/sender.rs
+++ b/core/server/src/binary/sender.rs
@@ -19,17 +19,17 @@
use std::future::Future;
use std::io::IoSlice;
+use crate::streaming::utils::PooledBuffer;
use crate::tcp::tcp_sender::TcpSender;
use crate::tcp::tcp_tls_sender::TcpTlsSender;
use crate::{quic::quic_sender::QuicSender, server_error::ServerError};
use bytes::BytesMut;
+use compio::buf::{IoBuf, IoBufMut};
+use compio::io::{AsyncReadExt, AsyncWriteExt};
+use compio::net::TcpStream;
use iggy_common::IggyError;
-use monoio::buf::IoBufMut;
-use monoio::io::{AsyncReadRent, AsyncWriteRent};
-use monoio::net::TcpStream;
use nix::libc;
use quinn::{RecvStream, SendStream};
-use monoio_rustls::{ServerTlsStream, TlsStream};
macro_rules! forward_async_methods {
(
@@ -65,7 +65,7 @@ pub trait Sender {
fn send_ok_response_vectored(
&mut self,
length: &[u8],
- slices: Vec<libc::iovec>,
+ slices: Vec<PooledBuffer>,
) -> impl Future<Output = Result<(), IggyError>>;
fn send_error_response(
&mut self,
@@ -85,8 +85,9 @@ impl SenderKind {
Self::Tcp(TcpSender { stream })
}
- pub fn get_tcp_tls_sender(stream: ServerTlsStream<TcpStream>) -> Self {
- Self::TcpTls(TcpTlsSender { stream })
+ pub fn get_tcp_tls_sender(stream: ()) -> Self {
+ todo!();
+ //Self::TcpTls(TcpTlsSender { stream })
}
pub fn get_quic_sender(send_stream: SendStream, recv_stream: RecvStream)
-> Self {
@@ -100,7 +101,7 @@ impl SenderKind {
async fn read<B: IoBufMut>(&mut self, buffer: B) -> (Result<usize,
IggyError>, B);
async fn send_empty_ok_response(&mut self) -> Result<(), IggyError>;
async fn send_ok_response(&mut self, payload: &[u8]) -> Result<(),
IggyError>;
- async fn send_ok_response_vectored(&mut self, length: &[u8], slices:
Vec<libc::iovec>) -> Result<(), IggyError>;
+ async fn send_ok_response_vectored(&mut self, length: &[u8], slices:
Vec<PooledBuffer>) -> Result<(), IggyError>;
async fn send_error_response(&mut self, error: IggyError) ->
Result<(), IggyError>;
async fn shutdown(&mut self) -> Result<(), ServerError>;
}
diff --git a/core/server/src/bootstrap.rs b/core/server/src/bootstrap.rs
index ddd0599c..6a2eb8af 100644
--- a/core/server/src/bootstrap.rs
+++ b/core/server/src/bootstrap.rs
@@ -1,3 +1,4 @@
+use compio::{fs::create_dir_all, runtime::Runtime};
use iggy_common::{
IggyError,
defaults::{
@@ -5,7 +6,6 @@ use iggy_common::{
MIN_PASSWORD_LENGTH, MIN_USERNAME_LENGTH,
},
};
-use monoio::{Buildable, Driver, Runtime, fs::create_dir_all, time::TimeDriver};
use tracing::info;
use crate::{
@@ -121,24 +121,21 @@ pub fn create_root_user() -> User {
user
}
-pub fn create_default_executor<D>() -> Runtime<D>
-where
- D: Driver + Buildable,
-{
- let builder = monoio::RuntimeBuilder::<D>::new();
- let rt = Buildable::build(builder).expect("Failed to create default
runtime");
- rt
-}
-
-pub fn create_shard_executor() -> Runtime<TimeDriver<monoio::IoUringDriver>> {
- // TODO: Figure out what else we could tweak there
- // We for sure want to disable the userspace interrupts on new cq entry
(set_coop_taskrun)
- // TODO: Shall we make the size of ring be configureable ?
- let builder = monoio::RuntimeBuilder::<monoio::IoUringDriver>::new()
- //.uring_builder(urb.setup_coop_taskrun()) // broken shit.
- .with_entries(1024) // Default size
- .enable_timer();
- let rt = Buildable::build(builder).expect("Failed to create default
runtime");
+pub fn create_shard_executor() -> Runtime {
+ //TODO: The event intererval tick, could be configureed based on the fact
+ // How many clients we expect to have connected.
+ // This roughly estimates the number of tasks we will create.
+ let proactor = compio::driver::ProactorBuilder::new()
+ .capacity(4096)
+ .coop_taskrun(true)
+ .taskrun_flag(false) //TODO: Try enabling this.
+ .thread_pool_limit(0)
+ .to_owned();
+ let rt = compio::runtime::RuntimeBuilder::new()
+ .with_proactor(proactor)
+ .event_interval(69)
+ .build()
+ .unwrap();
rt
}
diff --git a/core/server/src/compat/index_rebuilding/index_rebuilder.rs
b/core/server/src/compat/index_rebuilding/index_rebuilder.rs
index 1eebdacd..dbc93ebe 100644
--- a/core/server/src/compat/index_rebuilding/index_rebuilder.rs
+++ b/core/server/src/compat/index_rebuilding/index_rebuilder.rs
@@ -16,12 +16,14 @@
* under the License.
*/
-use crate::io::file::IggyFile;
-use crate::io::writer::IggyWriter;
+use crate::server_error::CompatError;
use crate::streaming::utils::file;
-use crate::{io::reader::IggyReader, server_error::CompatError};
+use async_zip::tokio::write;
+use compio::{
+ fs::File,
+ io::{AsyncBufRead, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt,
BufReader, BufWriter},
+};
use iggy_common::{IGGY_MESSAGE_HEADER_SIZE, IggyMessageHeader};
-use monoio::io::{AsyncReadRent, AsyncReadRentExt, AsyncWriteRent,
AsyncWriteRentExt};
use std::io::{Seek, SeekFrom};
pub struct IndexRebuilder {
@@ -40,17 +42,17 @@ impl IndexRebuilder {
}
async fn read_message_header(
- reader: &mut IggyReader<IggyFile>,
+ reader: &mut BufReader<std::io::Cursor<File>>,
) -> Result<IggyMessageHeader, std::io::Error> {
let buf = [0u8; IGGY_MESSAGE_HEADER_SIZE];
- let (result, buf) = reader.read_exact(Box::new(buf)).await;
+ let (result, buf) = reader.read_exact(Box::new(buf)).await.into();
result?;
IggyMessageHeader::from_raw_bytes(&*buf)
.map_err(|_| std::io::Error::from(std::io::ErrorKind::InvalidData))
}
async fn write_index_entry(
- writer: &mut IggyWriter<IggyFile>,
+ writer: &mut BufWriter<std::io::Cursor<File>>,
header: &IggyMessageHeader,
position: usize,
start_offset: u64,
@@ -58,26 +60,34 @@ impl IndexRebuilder {
// Write offset (4 bytes) - base_offset + last_offset_delta -
start_offset
let offset = start_offset - header.offset;
debug_assert!(offset <= u32::MAX as u64);
- let (result, _) =
writer.write_all(Box::new(offset.to_le_bytes())).await;
+ let (result, _) = writer
+ .write_all(Box::new(offset.to_le_bytes()))
+ .await
+ .into();
result?;
// Write position (4 bytes)
- let (result, _) =
writer.write_all(Box::new(position.to_le_bytes())).await;
+ let (result, _) = writer
+ .write_all(Box::new(position.to_le_bytes()))
+ .await
+ .into();
result?;
// Write timestamp (8 bytes)
let (result, _) = writer
.write_all(Box::new(header.timestamp.to_le_bytes()))
- .await;
+ .await
+ .into();
result?;
Ok(())
}
pub async fn rebuild(&self) -> Result<(), CompatError> {
- let mut reader =
-
IggyReader::new(IggyFile::new(file::open(&self.messages_file_path).await?));
- let mut writer =
IggyWriter::new(IggyFile::new(file::overwrite(&self.index_path).await?));
+ let read_cursor =
std::io::Cursor::new(file::open(&self.messages_file_path).await?);
+ let write_cursor =
std::io::Cursor::new(file::overwrite(&self.index_path).await?);
+ let mut reader = BufReader::new(read_cursor);
+ let mut writer = BufWriter::new(write_cursor);
let mut position = 0;
let mut next_position;
@@ -93,9 +103,9 @@ impl IndexRebuilder {
.await?;
// Skip message payload and headers
- reader.seek(SeekFrom::Current(
- header.payload_length as i64 +
header.user_headers_length as i64,
- ))?;
+ reader.consume(
+ header.payload_length as usize +
header.user_headers_length as usize,
+ );
// Update position for next iteration
position = next_position;
diff --git a/core/server/src/io/file.rs b/core/server/src/io/file.rs
deleted file mode 100644
index 95129aa8..00000000
--- a/core/server/src/io/file.rs
+++ /dev/null
@@ -1,117 +0,0 @@
-use std::io::SeekFrom;
-
-use monoio::{
- BufResult,
- buf::{IoBuf, IoBufMut, IoVecBuf, IoVecBufMut, IoVecWrapper},
- fs::{File, Metadata},
- io::{AsyncReadRent, AsyncWriteRent},
-};
-
-/// Wrapper around `monoio::fs::File` to provide a consistent API for reading
and writing files
-/// in an asynchronous context. This struct maintains the current position in
the file and provides
-/// methods to read and write data at specific positions.
-#[derive(Debug)]
-pub struct IggyFile {
- file: File,
- position: u64,
-}
-
-impl From<File> for IggyFile {
- fn from(file: File) -> Self {
- Self { file, position: 0 }
- }
-}
-
-impl std::io::Seek for IggyFile {
- /// This method doesn't do bound checking aswell as, the `End` variant is
not supported.
- fn seek(&mut self, pos: SeekFrom) -> std::io::Result<u64> {
- self.position = match pos {
- SeekFrom::Start(n) => n as i64,
- SeekFrom::End(_) => unreachable!("End variant is not supported in
IggyFile"),
- SeekFrom::Current(n) => self.position as i64 + n,
- } as u64;
- Ok(self.position)
- }
-}
-
-impl IggyFile {
- pub fn new(file: File) -> Self {
- Self { file, position: 0 }
- }
-
- pub async fn metadata(&self) -> std::io::Result<Metadata> {
- self.file.metadata().await
- }
-
- pub async fn write_at<T: IoBuf>(&self, buf: T, pos: usize) ->
BufResult<usize, T> {
- self.file.write_at(buf, pos as u64).await
- }
-
- pub async fn write_all_at<T: IoBuf>(&self, buf: T, pos: u64) ->
BufResult<(), T> {
- self.file.write_all_at(buf, pos).await
- }
-
- pub async fn read_exact_at<T: IoBufMut>(&self, buf: T, pos: u64) ->
BufResult<(), T> {
- self.file.read_exact_at(buf, pos).await
- }
-
- pub async fn read_at<T: IoBufMut>(&self, buf: T, pos: usize) ->
BufResult<usize, T> {
- self.file.read_at(buf, pos as u64).await
- }
-
- pub async fn sync_all(&self) -> std::io::Result<()> {
- self.file.sync_all().await
- }
-}
-
-impl AsyncReadRent for IggyFile {
- /// Reads `exactly` `buf.len()` bytes into the buffer.
- async fn read<T: IoBufMut>(&mut self, buf: T) -> BufResult<usize, T> {
- let (res, buf) = self.file.read_at(buf, self.position).await;
- let n = match res {
- Ok(n) => n,
- Err(e) => return (Err(e), buf),
- };
- self.position += n as u64;
- (Ok(n), buf)
- }
-
- //TODO(numinex) - maybe implement this ?
- fn readv<T: IoVecBufMut>(&mut self, buf: T) -> impl Future<Output =
BufResult<usize, T>> {
- async move { (Ok(0), buf) }
- }
-}
-
-impl AsyncWriteRent for IggyFile {
- /// Writes entire buffer to the file at the current position.
- async fn write<B: IoBuf>(&mut self, buf: B) -> BufResult<usize, B> {
- let (res, buf) = self.file.write_at(buf, self.position).await;
- let n = match res {
- Ok(n) => n,
- Err(e) => return (Err(e), buf),
- };
- self.position += n as u64;
- (Ok(n), buf)
- }
-
- // This is bait!!!!
- async fn writev<T: IoVecBuf>(&mut self, buf_vec: T) -> BufResult<usize, T>
{
- unimplemented!("writev is not implemented for IggyFile");
- }
-
- async fn flush(&mut self) -> std::io::Result<()> {
- self.file.sync_all().await
- }
-
- //TODO(numinex) - How to implement this ?
- async fn shutdown(&mut self) -> std::io::Result<()> {
- panic!("shutdown is not supported for IggyFile");
- /*
- self.file.sync_all().await;
- unsafe {
- let file = *self.file;
- file.close().await
- }
- */
- }
-}
diff --git a/core/server/src/io/fs_utils.rs b/core/server/src/io/fs_utils.rs
index d9be21e7..6bbc1b7f 100644
--- a/core/server/src/io/fs_utils.rs
+++ b/core/server/src/io/fs_utils.rs
@@ -16,7 +16,7 @@
* under the License.
*/
-use monoio::fs;
+use compio::fs;
use std::io;
use std::path::{Path, PathBuf};
diff --git a/core/server/src/io/mod.rs b/core/server/src/io/mod.rs
index 686717cd..c3140c9e 100644
--- a/core/server/src/io/mod.rs
+++ b/core/server/src/io/mod.rs
@@ -1,4 +1 @@
-pub mod file;
pub mod fs_utils;
-pub mod reader;
-pub mod writer;
diff --git a/core/server/src/io/reader.rs b/core/server/src/io/reader.rs
deleted file mode 100644
index 63443214..00000000
--- a/core/server/src/io/reader.rs
+++ /dev/null
@@ -1,44 +0,0 @@
-use std::io::{Seek, SeekFrom};
-
-use monoio::{
- BufResult,
- buf::{IoBufMut, IoVecBufMut},
- io::{AsyncReadRent, BufReader},
-};
-
-pub struct IggyReader<R: AsyncReadRent + Seek> {
- inner: BufReader<R>,
-}
-
-impl<R: AsyncReadRent + Seek> IggyReader<R> {
- pub fn new(reader: R) -> Self {
- Self {
- inner: BufReader::new(reader),
- }
- }
-
- pub fn with_capacity(capacity: usize, reader: R) -> Self {
- Self {
- inner: BufReader::with_capacity(capacity, reader),
- }
- }
-}
-
-impl<R: AsyncReadRent + Seek> Seek for IggyReader<R> {
- fn seek(&mut self, pos: SeekFrom) -> std::io::Result<u64> {
- self.inner.get_mut().seek(pos)
- }
-}
-
-impl<R> AsyncReadRent for IggyReader<R>
-where
- R: AsyncReadRent + Seek,
-{
- fn read<T: IoBufMut>(&mut self, buf: T) -> impl Future<Output =
BufResult<usize, T>> {
- self.inner.read(buf)
- }
-
- fn readv<T: IoVecBufMut>(&mut self, buf: T) -> impl Future<Output =
BufResult<usize, T>> {
- self.inner.readv(buf)
- }
-}
diff --git a/core/server/src/io/writer.rs b/core/server/src/io/writer.rs
deleted file mode 100644
index e249e405..00000000
--- a/core/server/src/io/writer.rs
+++ /dev/null
@@ -1,57 +0,0 @@
-use std::io::{Seek, SeekFrom};
-
-use monoio::{
- BufResult,
- io::{AsyncWriteRent, BufWriter},
-};
-
-pub struct IggyWriter<W: AsyncWriteRent + Seek> {
- inner: BufWriter<W>,
-}
-
-impl<W: AsyncWriteRent + Seek> IggyWriter<W> {
- pub fn new(writer: W) -> Self {
- Self {
- inner: BufWriter::new(writer),
- }
- }
-
- pub fn with_capacity(capacity: usize, writer: W) -> Self {
- Self {
- inner: BufWriter::with_capacity(capacity, writer),
- }
- }
-}
-
-impl<W: AsyncWriteRent + Seek> Seek for IggyWriter<W> {
- fn seek(&mut self, pos: SeekFrom) -> std::io::Result<u64> {
- self.inner.get_mut().seek(pos)
- }
-}
-
-impl<W> AsyncWriteRent for IggyWriter<W>
-where
- W: AsyncWriteRent + Seek,
-{
- fn write<T: monoio::buf::IoBuf>(
- &mut self,
- buf: T,
- ) -> impl Future<Output = BufResult<usize, T>> {
- self.inner.write(buf)
- }
-
- fn writev<T: monoio::buf::IoVecBuf>(
- &mut self,
- buf_vec: T,
- ) -> impl Future<Output = BufResult<usize, T>> {
- self.inner.writev(buf_vec)
- }
-
- fn flush(&mut self) -> impl Future<Output = std::io::Result<()>> {
- self.inner.flush()
- }
-
- fn shutdown(&mut self) -> impl Future<Output = std::io::Result<()>> {
- self.inner.shutdown()
- }
-}
diff --git a/core/server/src/log/runtime.rs b/core/server/src/log/runtime.rs
new file mode 100644
index 00000000..36bbdd80
--- /dev/null
+++ b/core/server/src/log/runtime.rs
@@ -0,0 +1,48 @@
+use std::{pin::Pin, time::Duration};
+
+use futures::{FutureExt, SinkExt, Stream};
+use opentelemetry_sdk::runtime::{Runtime, RuntimeChannel, TrySend};
+
+#[derive(Clone)]
+pub struct MonoioRuntime;
+
+impl Runtime for MonoioRuntime {
+ fn spawn<F>(&self, future: F)
+ where
+ F: Future<Output = ()> + Send + 'static,
+ {
+ // TODO: This wont' work, we init Opentelemetry in the main thread,
when there is no instance of monoio runtime
+ // running yet....
+ monoio::spawn(future);
+ }
+
+ fn delay(&self, duration: Duration) -> impl Future<Output = ()> + Send +
'static {
+ let sleep = Sleep::new(duration);
+ sleep
+ }
+}
+
+
+pub struct Sleep {
+ pub inner: Pin<Box<monoio::time::Sleep>>,
+}
+
+impl Sleep {
+ pub fn new(duration: Duration) -> Self {
+ Self {
+ inner: Box::pin(monoio::time::sleep(duration)),
+ }
+ }
+}
+
+impl Future for Sleep {
+ type Output = ();
+
+ fn poll(mut self: std::pin::Pin<&mut Self>, cx: &mut
std::task::Context<'_>) -> std::task::Poll<Self::Output> {
+ self.inner.as_mut().poll_unpin(cx)
+ }
+}
+
+// Safety: There is no way for `Sleep` future to be flipped like a burger to
another thread,
+// because we create instance of OpenTelemetry SDK runtime in the main thread,
and monoio futures don't require Send & Sync bounds.
+unsafe impl Send for Sleep {}
\ No newline at end of file
diff --git a/core/server/src/main.rs b/core/server/src/main.rs
index 53fa2d7b..5d5f8698 100644
--- a/core/server/src/main.rs
+++ b/core/server/src/main.rs
@@ -30,8 +30,8 @@ use iggy_common::defaults::DEFAULT_ROOT_USER_ID;
use iggy_common::{Aes256GcmEncryptor, EncryptorKind, IggyError};
use server::args::Args;
use server::bootstrap::{
- create_default_executor, create_directories, create_root_user,
create_shard_connections,
- create_shard_executor, load_config, resolve_persister,
+ create_directories, create_root_user, create_shard_connections,
create_shard_executor,
+ load_config, resolve_persister,
};
use server::configs::config_provider::{self};
use server::configs::server::ServerConfig;
@@ -84,7 +84,8 @@ fn main() -> Result<(), ServerError> {
let config_provider = config_provider::resolve(&args.config_provider)?;
// Load config and create directories.
// Remove `local_data` directory if run with `--fresh` flag.
- let mut rt = create_default_executor::<monoio::IoUringDriver>();
+ // TODO: Replace this once we use `monoio::main` macro.
+ let rt = create_shard_executor();
let config = rt
.block_on(async {
let config = load_config(&config_provider)
@@ -94,7 +95,7 @@ fn main() -> Result<(), ServerError> {
})?;
if args.fresh {
let system_path = config.system.get_system_path();
- if monoio::fs::metadata(&system_path).await.is_ok() {
+ if compio::fs::metadata(&system_path).await.is_ok() {
println!(
"Removing system path at: {} because `--fresh` flag
was set",
system_path
@@ -122,6 +123,7 @@ fn main() -> Result<(), ServerError> {
// TODO: Make this configurable from config as a range
// for example this instance of Iggy will use cores from 0..4
+ let core_ids = core_affinity::get_core_ids().unwrap();
let available_cpus = available_parallelism().expect("Failed to get num of
cores");
let shards_count = available_cpus.into();
let shards_set = 0..shards_count;
@@ -136,14 +138,24 @@ fn main() -> Result<(), ServerError> {
let config = config.clone();
let state_persister =
resolve_persister(config.system.state.enforce_fsync);
+ let core_ids = core_ids.clone();
let handle = std::thread::Builder::new()
.name(format!("shard-{id}"))
.spawn(move || {
+ let core_ids = core_ids.clone();
MemoryPool::init_pool(config.system.clone());
- monoio::utils::bind_to_cpu_set(Some(shard_id))
+ if core_ids.len() > shard_id {
+ core_affinity::set_for_current(core_ids[shard_id]);
+ }
+
+ // TODO: Fix this once this PR gets resolved.
+ // https://github.com/compio-rs/compio/pull/445
+ /*
+ compio::utils::bind_to_cpu_set(Some(shard_id))
.unwrap_or_else(|e| panic!("Failed to set CPU affinity for
shard-{id}: {e}"));
+ */
- let mut rt = create_shard_executor();
+ let rt = create_shard_executor();
rt.block_on(async move {
let version = SemanticVersion::current().expect("Invalid
version");
info!(
@@ -253,20 +265,22 @@ fn main() -> Result<(), ServerError> {
}
let shutdown_handles_for_signal = shutdown_handles.clone();
- ctrlc::set_handler(move || {
- info!("Received shutdown signal (SIGTERM/SIGINT), initiating graceful
shutdown...");
-
- for (shard_id, stop_sender) in &shutdown_handles_for_signal {
- info!("Sending shutdown signal to shard {}", shard_id);
- if let Err(e) = stop_sender.send_blocking(()) {
- error!(
- "Failed to send shutdown signal to shard {}: {}",
- shard_id, e
- );
+ /*
+ ::set_handler(move || {
+ info!("Received shutdown signal (SIGTERM/SIGINT), initiating
graceful shutdown...");
+
+ for (shard_id, stop_sender) in &shutdown_handles_for_signal {
+ info!("Sending shutdown signal to shard {}", shard_id);
+ if let Err(e) = stop_sender.send_blocking(()) {
+ error!(
+ "Failed to send shutdown signal to shard {}: {}",
+ shard_id, e
+ );
+ }
}
- }
- })
- .expect("Error setting Ctrl-C handler");
+ })
+ .expect("Error setting Ctrl-C handler");
+ */
info!("Iggy server is running. Press Ctrl+C or send SIGTERM to shutdown.");
for (idx, handle) in handles.into_iter().enumerate() {
diff --git a/core/server/src/quic/quic_sender.rs
b/core/server/src/quic/quic_sender.rs
index c6403349..54bda604 100644
--- a/core/server/src/quic/quic_sender.rs
+++ b/core/server/src/quic/quic_sender.rs
@@ -17,11 +17,12 @@
*/
use crate::quic::COMPONENT;
+use crate::streaming::utils::PooledBuffer;
use crate::{binary::sender::Sender, server_error::ServerError};
use bytes::BytesMut;
+use compio::buf::{IoBuf, IoBufMut};
use error_set::ErrContext;
use iggy_common::IggyError;
-use monoio::buf::IoBufMut;
use nix::libc;
use quinn::{RecvStream, SendStream};
use std::io::IoSlice;
@@ -71,7 +72,7 @@ impl Sender for QuicSender {
async fn send_ok_response_vectored(
&mut self,
length: &[u8],
- slices: Vec<libc::iovec>,
+ slices: Vec<PooledBuffer>,
) -> Result<(), IggyError> {
debug!("Sending vectored response with status: {:?}...", STATUS_OK);
diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs
index 8ffc6560..2178d2b3 100644
--- a/core/server/src/shard/mod.rs
+++ b/core/server/src/shard/mod.rs
@@ -110,6 +110,7 @@ impl Shard {
}
}
+#[derive(Debug, Clone, Eq, PartialEq, Hash)]
pub struct ShardInfo {
id: u16,
}
@@ -118,6 +119,10 @@ impl ShardInfo {
pub fn new(id: u16) -> Self {
Self { id }
}
+
+ pub fn id(&self) -> u16 {
+ self.id
+ }
}
pub struct IggyShard {
@@ -193,7 +198,8 @@ impl IggyShard {
let stop_receiver = self.get_stop_receiver();
let shard_for_shutdown = self.clone();
- monoio::spawn(async move {
+ /*
+ compio::runtime::spawn(async move {
let _ = stop_receiver.recv().await;
info!("Shard {} received shutdown signal", shard_for_shutdown.id);
@@ -207,6 +213,7 @@ impl IggyShard {
);
}
});
+ */
let result = try_join_all(tasks).await;
result?;
@@ -506,7 +513,7 @@ impl IggyShard {
async fn handle_event(&self, event: Arc<ShardEvent>) -> Result<(),
IggyError> {
match &*event {
ShardEvent::CreatedStream { stream_id, name } => {
- self.create_stream_bypass_auth(*stream_id, name).await
+ self.create_stream_bypass_auth(*stream_id, name)
}
ShardEvent::CreatedTopic {
stream_id,
@@ -517,6 +524,7 @@ impl IggyShard {
compression_algorithm,
max_topic_size,
replication_factor,
+ shards_assignment,
} => {
self.create_topic_bypass_auth(
stream_id,
@@ -527,6 +535,7 @@ impl IggyShard {
*compression_algorithm,
*max_topic_size,
*replication_factor,
+ shards_assignment.clone()
)
.await
}
@@ -604,7 +613,7 @@ impl IggyShard {
pub fn insert_shard_table_records(
&self,
- records: impl Iterator<Item = (IggyNamespace, ShardInfo)>,
+ records: impl IntoIterator<Item = (IggyNamespace, ShardInfo)>,
) {
self.shards_table.borrow_mut().extend(records);
}
diff --git a/core/server/src/shard/system/snapshot/mod.rs
b/core/server/src/shard/system/snapshot/mod.rs
index eb726509..f6bf7857 100644
--- a/core/server/src/shard/system/snapshot/mod.rs
+++ b/core/server/src/shard/system/snapshot/mod.rs
@@ -23,14 +23,14 @@ use crate::shard::IggyShard;
use crate::streaming::session::Session;
use async_zip::tokio::write::ZipFileWriter;
use async_zip::{Compression, ZipEntryBuilder};
+use compio::fs::{File, OpenOptions};
+use compio::io::{AsyncReadAtExt, AsyncWriteAtExt};
use iggy_common::{IggyDuration, IggyError, Snapshot, SnapshotCompression,
SystemSnapshotType};
-use monoio::fs::{File, OpenOptions};
use std::io::Cursor;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Instant;
use tempfile::NamedTempFile;
-use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::process::Command;
use tokio_util::compat::TokioAsyncWriteCompatExt;
use tracing::{error, info};
@@ -59,6 +59,8 @@ impl IggyShard {
// and impl the monoio async writer, based on this example:
// https://youtu.be/RYHYiXMJdZI?si=d2roKeHn5lJrw2ri&t=1140
// and rc-zip-tokio crate.
+
+ /*
let cursor = Cursor::new(Vec::new());
let mut zip_writer = ZipFileWriter::new(cursor.compat_write());
@@ -80,13 +82,17 @@ impl IggyShard {
let filename = format!("{snapshot_type}.txt");
let entry = ZipEntryBuilder::new(filename.clone().into(),
compression);
- let file = File::open(temp_file.path()).await.map_err(|e| {
- error!("Failed to open temporary file: {}", e);
- IggyError::SnapshotFileCompletionFailed
- })?;
+ let file = OpenOptions::new()
+ .read(true)
+ .open(temp_file.path())
+ .await
+ .map_err(|e| {
+ error!("Failed to open temporary file: {}", e);
+ IggyError::SnapshotFileCompletionFailed
+ })?;
let content = Vec::new();
- let (result, content) = file.read_exact_at(content,
0).await;
+ let (result, content) = file.read_exact_at(content,
0).await.into();
if let Err(e) = result {
error!("Failed to read temporary file: {}", e);
continue;
@@ -112,7 +118,9 @@ impl IggyShard {
}
}
}
+ */
+ /*
info!(
"Snapshot commands {:?} finished in {}",
snapshot_types,
@@ -128,7 +136,8 @@ impl IggyShard {
let zip_data = cursor.into_inner();
info!("Final zip size: {} bytes", zip_data.len());
- Ok(Snapshot::new(zip_data))
+ */
+ Ok(Snapshot::new(vec![]))
}
}
@@ -137,8 +146,11 @@ async fn write_command_output_to_temp_file(
) -> Result<NamedTempFile, std::io::Error> {
let output = command.output().await?;
let temp_file = NamedTempFile::new()?;
- let file = File::from_std(temp_file.as_file().try_clone()?).unwrap();
- let (result, _) = file.write_all_at(output.stdout, 0).await;
+ let mut file = OpenOptions::new()
+ .write(true)
+ .open(temp_file.path())
+ .await?;
+ let (result, _) = file.write_all_at(output.stdout, 0).await.into();
result?;
file.sync_all().await?;
Ok(temp_file)
@@ -150,33 +162,39 @@ async fn get_filesystem_overview() ->
Result<NamedTempFile, std::io::Error> {
async fn get_process_info() -> Result<NamedTempFile, std::io::Error> {
let temp_file = NamedTempFile::new()?;
- let file = File::from_std(temp_file.as_file().try_clone()?).unwrap();
+ let mut file = OpenOptions::new()
+ .create(true)
+ .write(true)
+ .open(temp_file.path())
+ .await?;
let mut position = 0;
let ps_output = Command::new("ps").arg("aux").output().await?;
let (result, written) = file
.write_all_at(b"=== Process List (ps aux) ===\n", 0)
- .await;
+ .await
+ .into();
result?;
position += written.len() as u64;
- let (result, written) = file.write_all_at(ps_output.stdout,
position).await;
+ let (result, written) = file.write_all_at(ps_output.stdout,
position).await.into();
result?;
position += written.len() as u64;
- let (result, written) = file.write_all_at(b"\n\n", position).await;
+ let (result, written) = file.write_all_at(b"\n\n", position).await.into();
result?;
position += written.len() as u64;
let (result, written) = file
.write_all_at(b"=== Detailed Process Information ===\n", position)
- .await;
+ .await
+ .into();
result?;
position += written.len() as u64;
let proc_info = procdump::get_proc_info().await?;
let bytes = proc_info.as_bytes().to_owned();
- let (result, _) = file.write_all_at(bytes, position).await;
+ let (result, _) = file.write_all_at(bytes, position).await.into();
result?;
file.sync_all().await?;
diff --git a/core/server/src/shard/system/storage.rs
b/core/server/src/shard/system/storage.rs
index 3d895d4c..e1e7cf84 100644
--- a/core/server/src/shard/system/storage.rs
+++ b/core/server/src/shard/system/storage.rs
@@ -17,18 +17,17 @@
*/
use super::COMPONENT;
-use crate::io::file::IggyFile;
use crate::shard::system::info::SystemInfo;
use crate::streaming::persistence::persister::PersisterKind;
use crate::streaming::storage::SystemInfoStorage;
use crate::streaming::utils::PooledBuffer;
use crate::streaming::utils::file;
use anyhow::Context;
+use compio::io::AsyncReadAtExt;
+use compio::io::AsyncReadExt;
use error_set::ErrContext;
use iggy_common::IggyError;
-use monoio::io::AsyncReadRentExt;
use std::sync::Arc;
-use tokio::io::AsyncReadExt;
use tracing::info;
#[derive(Debug)]
@@ -63,10 +62,12 @@ impl SystemInfoStorage for FileSystemInfoStorage {
.map_err(|_| IggyError::CannotReadFileMetadata)?
.len() as usize;
- let mut file = IggyFile::new(file);
+ let file = file::open(&self.path)
+ .await
+ .map_err(|_| IggyError::CannotReadFile)?;
let mut buffer = PooledBuffer::with_capacity(file_size);
buffer.put_bytes(0, file_size);
- let (result, buffer) = file.read_exact(buffer).await;
+ let (result, buffer) = file.read_exact_at(buffer, 0).await.into();
result
.with_error_context(|error| {
format!(
diff --git a/core/server/src/shard/system/streams.rs
b/core/server/src/shard/system/streams.rs
index b4921066..bc8fc2bb 100644
--- a/core/server/src/shard/system/streams.rs
+++ b/core/server/src/shard/system/streams.rs
@@ -186,7 +186,7 @@ impl IggyShard {
}))
}
- pub async fn create_stream_bypass_auth(
+ pub fn create_stream_bypass_auth(
&self,
stream_id: Option<u32>,
name: &str,
diff --git a/core/server/src/shard/system/topics.rs
b/core/server/src/shard/system/topics.rs
index 0f8155fb..d71b0f1d 100644
--- a/core/server/src/shard/system/topics.rs
+++ b/core/server/src/shard/system/topics.rs
@@ -112,8 +112,9 @@ impl IggyShard {
compression_algorithm: CompressionAlgorithm,
max_topic_size: MaxTopicSize,
replication_factor: Option<u8>,
+ shards_assignment: Vec<(IggyNamespace, ShardInfo)>,
) -> Result<(), IggyError> {
- let (topic_id, partition_ids) = self.create_topic_base(
+ let (topic_id, _) = self.create_topic_base(
stream_id,
topic_id,
name,
@@ -132,22 +133,16 @@ impl IggyShard {
.with_error_context(|error| {
format!("{COMPONENT} (error: {error}) - failed to get topic
with ID: {topic_id} in stream with ID: {stream_id}")
})?;
- topic.persist().await.with_error_context(|error| {
- format!("{COMPONENT} (error: {error}) - failed to persist topic:
{topic}")
- })?;
- // TODO: Figure out a way how to distribute the shards table among
different shards,
- // without the need to do code from below, everytime we handle a
`ShardEvent`.
- // I think we shouldn't be sharing it tho and still maintain a single
shard table per shard,
- // but find a way how to distribute it smarter (maybe move the
broadcast inside of the `insert_shard_table_records`) method.
- let records = partition_ids.into_iter().map(|partition_id| {
- let namespace = IggyNamespace::new(stream_id, topic_id,
partition_id);
- let hash = namespace.generate_hash();
- let shard_id = hash % self.get_available_shards_count();
- let shard_info = ShardInfo::new(shard_id as u16);
- (namespace, shard_info)
- });
- self.insert_shard_table_records(records);
+ for (_, shard_info) in &shards_assignment {
+ if shard_info.id() == self.id {
+ topic.persist().await.with_error_context(|error| {
+ format!("{COMPONENT} (error: {error}) - failed to persist
topic: {topic}")
+ })?;
+ }
+ }
+
+ self.insert_shard_table_records(shards_assignment);
self.metrics.increment_topics(1);
self.metrics.increment_partitions(partitions_count);
@@ -167,7 +162,7 @@ impl IggyShard {
compression_algorithm: CompressionAlgorithm,
max_topic_size: MaxTopicSize,
replication_factor: Option<u8>,
- ) -> Result<Identifier, IggyError> {
+ ) -> Result<(Vec<(IggyNamespace, ShardInfo)>, Identifier), IggyError> {
self.ensure_authenticated(session)?;
{
let stream = self.get_stream(stream_id).with_error_context(|error|
{
@@ -208,6 +203,7 @@ impl IggyShard {
format!("{COMPONENT} (error: {error}) - failed to persist topic:
{topic}")
})?;
+ // TODO: Refactor
let records = partition_ids.into_iter().map(|partition_id| {
let namespace = IggyNamespace::new(stream_id, topic_id,
partition_id);
let hash = namespace.generate_hash();
@@ -215,13 +211,14 @@ impl IggyShard {
let shard_info = ShardInfo::new(shard_id as u16);
(namespace, shard_info)
});
- self.insert_shard_table_records(records);
+ let records = records.collect::<Vec<_>>();
+ self.insert_shard_table_records(records.clone());
self.metrics.increment_topics(1);
self.metrics.increment_partitions(partitions_count);
self.metrics.increment_segments(partitions_count);
- Ok(Identifier::numeric(topic_id)?)
+ Ok((records, Identifier::numeric(topic_id)?))
}
fn create_topic_base(
diff --git a/core/server/src/shard/task_registry.rs
b/core/server/src/shard/task_registry.rs
index 1a201d3b..ff674a8e 100644
--- a/core/server/src/shard/task_registry.rs
+++ b/core/server/src/shard/task_registry.rs
@@ -17,8 +17,8 @@
*/
use async_channel::{Receiver, Sender, bounded};
+use compio::runtime::JoinHandle;
use futures::future::join_all;
-use monoio::task::JoinHandle;
use std::cell::RefCell;
use std::collections::HashMap;
use std::future::Future;
@@ -42,7 +42,7 @@ impl TaskRegistry {
where
F: Future<Output = ()> + 'static,
{
- let handle = monoio::spawn(future);
+ let handle = compio::runtime::spawn(future);
self.tasks.borrow_mut().push(handle);
}
@@ -85,8 +85,8 @@ impl TaskRegistry {
.into_iter()
.enumerate()
.map(|(idx, handle)| async move {
- match monoio::time::timeout(timeout, handle).await {
- Ok(()) => (idx, true),
+ match compio::time::timeout(timeout, handle).await {
+ Ok(_) => (idx, true),
Err(_) => {
warn!("Task {} did not complete within timeout", idx);
(idx, false)
diff --git a/core/server/src/shard/tasks/messages.rs
b/core/server/src/shard/tasks/messages.rs
index ec4018c0..d573c058 100644
--- a/core/server/src/shard/tasks/messages.rs
+++ b/core/server/src/shard/tasks/messages.rs
@@ -14,7 +14,7 @@ async fn run_shard_messages_receiver(shard: Rc<IggyShard>) ->
Result<(), IggyErr
if shard.is_shutting_down() {
return;
}
- monoio::time::sleep(Duration::from_millis(100)).await;
+ compio::time::sleep(Duration::from_millis(100)).await;
}
};
diff --git a/core/server/src/shard/transmission/event.rs
b/core/server/src/shard/transmission/event.rs
index 49794c3f..60a28dfc 100644
--- a/core/server/src/shard/transmission/event.rs
+++ b/core/server/src/shard/transmission/event.rs
@@ -2,7 +2,7 @@ use std::net::SocketAddr;
use iggy_common::{CompressionAlgorithm, Identifier, IggyExpiry, MaxTopicSize};
-use crate::streaming::clients::client_manager::Transport;
+use crate::{shard::{namespace::IggyNamespace, ShardInfo},
streaming::clients::client_manager::Transport};
#[derive(Debug)]
pub enum ShardEvent {
@@ -24,6 +24,7 @@ pub enum ShardEvent {
compression_algorithm: CompressionAlgorithm,
max_topic_size: MaxTopicSize,
replication_factor: Option<u8>,
+ shards_assignment: Vec<(IggyNamespace, ShardInfo)>,
},
//CreatedConsumerGroup(Identifier, Identifier, Option<u32>, String),
//DeletedConsumerGroup(Identifier, Identifier, Identifier),
diff --git a/core/server/src/state/file.rs b/core/server/src/state/file.rs
index e8eb6ee6..5716d7bb 100644
--- a/core/server/src/state/file.rs
+++ b/core/server/src/state/file.rs
@@ -16,28 +16,27 @@
* under the License.
*/
-use crate::io::file::IggyFile;
-use crate::io::reader::IggyReader;
use crate::state::command::EntryCommand;
use crate::state::{COMPONENT, State, StateEntry};
use crate::streaming::persistence::persister::PersisterKind;
use crate::streaming::utils::file;
use crate::versioning::SemanticVersion;
use bytes::{Buf, BufMut, Bytes, BytesMut};
+use compio::fs::File;
+use compio::io::{AsyncReadExt, AsyncWriteExt};
use error_set::ErrContext;
use iggy_common::BytesSerializable;
use iggy_common::EncryptorKind;
use iggy_common::IggyByteSize;
use iggy_common::IggyError;
use iggy_common::IggyTimestamp;
-use monoio::io::AsyncReadRentExt;
use std::fmt::Debug;
use std::path::Path;
use std::sync::Arc;
use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
use tracing::{debug, error, info};
-pub const BUF_READER_CAPACITY_BYTES: usize = 512 * 1000;
+pub const BUF_cursor_CAPACITY_BYTES: usize = 512 * 1000;
const FILE_STATE_PARSE_ERROR: &str = "STATE - failed to parse file state";
#[derive(Debug)]
@@ -129,7 +128,6 @@ impl State for FileState {
.map_err(|_| IggyError::CannotReadFileMetadata)?
.len();
- let file = IggyFile::new(file);
if file_size == 0 {
info!("State file is empty");
return Ok(Vec::new());
@@ -141,11 +139,11 @@ impl State for FileState {
);
let mut entries = Vec::new();
let mut total_size: u64 = 0;
- let mut reader = IggyReader::with_capacity(BUF_READER_CAPACITY_BYTES,
file);
+ let mut cursor = std::io::Cursor::new(file);
let mut current_index = 0;
let mut entries_count = 0;
loop {
- let index = reader
+ let index = cursor
.read_u64_le()
.await
.with_error_context(|error| format!("{FILE_STATE_PARSE_ERROR}
index. {error}"))
@@ -163,32 +161,32 @@ impl State for FileState {
current_index = index;
entries_count += 1;
- let term = reader
+ let term = cursor
.read_u64_le()
.await
.with_error_context(|error| format!("{FILE_STATE_PARSE_ERROR}
term. {error}"))
.map_err(|_| IggyError::InvalidNumberEncoding)?;
total_size += 8;
- let leader_id = reader
+ let leader_id = cursor
.read_u32_le()
.await
.with_error_context(|error| format!("{FILE_STATE_PARSE_ERROR}
leader_id. {error}"))
.map_err(|_| IggyError::InvalidNumberEncoding)?;
total_size += 4;
- let version = reader
+ let version = cursor
.read_u32_le()
.await
.with_error_context(|error| format!("{FILE_STATE_PARSE_ERROR}
version. {error}"))
.map_err(|_| IggyError::InvalidNumberEncoding)?;
total_size += 4;
- let flags = reader
+ let flags = cursor
.read_u64_le()
.await
.with_error_context(|error| format!("{FILE_STATE_PARSE_ERROR}
flags. {error}"))
.map_err(|_| IggyError::InvalidNumberEncoding)?;
total_size += 8;
let timestamp = IggyTimestamp::from(
- reader
+ cursor
.read_u64_le()
.await
.with_error_context(|error| {
@@ -197,19 +195,19 @@ impl State for FileState {
.map_err(|_| IggyError::InvalidNumberEncoding)?,
);
total_size += 8;
- let user_id = reader
+ let user_id = cursor
.read_u32_le()
.await
.with_error_context(|error| format!("{FILE_STATE_PARSE_ERROR}
user_id. {error}"))
.map_err(|_| IggyError::InvalidNumberEncoding)?;
total_size += 4;
- let checksum = reader
+ let checksum = cursor
.read_u32_le()
.await
.with_error_context(|error| format!("{FILE_STATE_PARSE_ERROR}
checksum. {error}"))
.map_err(|_| IggyError::InvalidNumberEncoding)?;
total_size += 4;
- let context_length = reader
+ let context_length = cursor
.read_u32_le()
.await
.with_error_context(|error| {
@@ -220,20 +218,20 @@ impl State for FileState {
total_size += 4;
let mut context = BytesMut::with_capacity(context_length);
context.put_bytes(0, context_length);
- let (result, context) = reader.read_exact(context).await;
+ let (result, context) = cursor.read_exact(context).await.into();
result
.with_error_context(|error| format!("{FILE_STATE_PARSE_ERROR}
code. {error}"))
.map_err(|_| IggyError::CannotReadFile)?;
let context = context.freeze();
total_size += context_length as u64;
- let code = reader
+ let code = cursor
.read_u32_le()
.await
.with_error_context(|error| format!("{FILE_STATE_PARSE_ERROR}
code. {error}"))
.map_err(|_| IggyError::InvalidNumberEncoding)?;
total_size += 4;
- let mut command_length = reader
+ let mut command_length = cursor
.read_u32_le()
.await
.with_error_context(|error| {
@@ -244,7 +242,7 @@ impl State for FileState {
total_size += 4;
let mut command = BytesMut::with_capacity(command_length);
command.put_bytes(0, command_length);
- let (result, command) = reader.read_exact(command).await;
+ let (result, command) = cursor.read_exact(command).await.into();
result
.with_error_context(|error| format!("{FILE_STATE_PARSE_ERROR}
command. {error}"))
.map_err(|_| IggyError::CannotReadFile)?;
diff --git a/core/server/src/streaming/partitions/storage.rs
b/core/server/src/streaming/partitions/storage.rs
index 6621ea19..67e3d3f4 100644
--- a/core/server/src/streaming/partitions/storage.rs
+++ b/core/server/src/streaming/partitions/storage.rs
@@ -18,7 +18,6 @@
use crate::compat::index_rebuilding::index_rebuilder::IndexRebuilder;
use crate::configs::cache_indexes::CacheIndexesConfig;
-use crate::io::file::IggyFile;
use crate::io::fs_utils;
use crate::state::system::PartitionState;
use crate::streaming::partitions::COMPONENT;
@@ -27,12 +26,12 @@ use crate::streaming::persistence::persister::PersisterKind;
use crate::streaming::segments::*;
use crate::streaming::storage::PartitionStorage;
use crate::streaming::utils::file;
+use compio::fs;
+use compio::fs::create_dir_all;
+use compio::io::AsyncReadExt;
use error_set::ErrContext;
use iggy_common::ConsumerKind;
use iggy_common::IggyError;
-use monoio::fs;
-use monoio::fs::create_dir_all;
-use monoio::io::AsyncReadRentExt;
use std::path::Path;
use std::sync::Arc;
use std::sync::atomic::Ordering;
@@ -117,8 +116,18 @@ impl PartitionStorage for FilePartitionStorage {
let messages_file_path = segment.messages_file_path().to_owned();
let time_index_path = index_path.replace(INDEX_EXTENSION,
"timeindex");
- let index_path_exists =
tokio::fs::try_exists(&index_path).await.unwrap();
- let time_index_path_exists =
tokio::fs::try_exists(&time_index_path).await.unwrap();
+ // TODO: Move to fs_utils
+ async fn try_exists(index_path: &str) -> Result<bool,
std::io::Error> {
+ match compio::fs::metadata(index_path).await {
+ Ok(_) => Ok(true),
+ Err(err) => match err.kind() {
+ std::io::ErrorKind::NotFound => Ok(false),
+ _ => Err(err),
+ },
+ }
+ }
+ let index_path_exists = try_exists(&index_path).await.unwrap();
+ let time_index_path_exists =
try_exists(&time_index_path).await.unwrap();
let index_cache_enabled = matches!(
partition.config.segment.cache_indexes,
CacheIndexesConfig::All | CacheIndexesConfig::OpenSegment
@@ -416,6 +425,7 @@ impl PartitionStorage for FilePartitionStorage {
let mut consumer_offsets = Vec::new();
let mut dir_entries = dir_entries.unwrap();
+ // TODO: reimplement using the compio dir walk
while let Some(dir_entry) = dir_entries.next() {
let dir_entry = dir_entry.unwrap();
let metadata = dir_entry.metadata();
@@ -451,8 +461,9 @@ impl PartitionStorage for FilePartitionStorage {
)
})
.map_err(|_| IggyError::CannotReadFile)?;
- let mut file = IggyFile::new(file);
- let offset = file
+ // TODO: This is like awfull.
+ let mut cursor = std::io::Cursor::new(file);
+ let offset = cursor
.read_u64_le()
.await
.with_error_context(|error| {
diff --git a/core/server/src/streaming/persistence/persister.rs
b/core/server/src/streaming/persistence/persister.rs
index 6ba5c87b..ca522b85 100644
--- a/core/server/src/streaming/persistence/persister.rs
+++ b/core/server/src/streaming/persistence/persister.rs
@@ -18,12 +18,13 @@
use crate::streaming::persistence::COMPONENT;
use crate::streaming::utils::file;
+use compio::buf::IoBuf;
+use compio::fs::remove_file;
+use compio::io::AsyncWriteAtExt;
use error_set::ErrContext;
use iggy_common::IggyError;
-use monoio::buf::IoBuf;
use std::fmt::Debug;
use std::future::Future;
-use tokio::fs;
#[cfg(test)]
use mockall::automock;
@@ -85,13 +86,13 @@ pub struct FileWithSyncPersister;
impl Persister for FilePersister {
async fn append<B: IoBuf>(&self, path: &str, bytes: B) -> Result<(),
IggyError> {
- let file = file::append(path)
+ let (mut file, position) = file::append(path)
.await
.with_error_context(|error| {
format!("{COMPONENT} (error: {error}) - failed to append to
file: {path}")
})
.map_err(|_| IggyError::CannotAppendToFile)?;
- file.write_all_at(bytes, 0)
+ file.write_all_at(bytes, position)
.await
.0
.with_error_context(|error| {
@@ -102,7 +103,7 @@ impl Persister for FilePersister {
}
async fn overwrite<B: IoBuf>(&self, path: &str, bytes: B) -> Result<(),
IggyError> {
- let file = file::overwrite(path)
+ let mut file = file::overwrite(path)
.await
.with_error_context(|error| {
format!("{COMPONENT} (error: {error}) - failed to overwrite
file: {path}")
@@ -120,7 +121,7 @@ impl Persister for FilePersister {
}
async fn delete(&self, path: &str) -> Result<(), IggyError> {
- fs::remove_file(path)
+ remove_file(path)
.await
.with_error_context(|error| {
format!("{COMPONENT} (error: {error}) - failed to delete file:
{path}")
@@ -132,17 +133,12 @@ impl Persister for FilePersister {
impl Persister for FileWithSyncPersister {
async fn append<B: IoBuf>(&self, path: &str, bytes: B) -> Result<(),
IggyError> {
- let file = file::append(path)
+ let (mut file, position) = file::append(path)
.await
.with_error_context(|error| {
format!("{COMPONENT} (error: {error}) - failed to append to
file: {path}")
})
.map_err(|_| IggyError::CannotAppendToFile)?;
- let position = file
- .metadata()
- .await
- .map_err(|_| IggyError::CannotReadFileMetadata)?
- .len();
file.write_all_at(bytes, position)
.await
.0
@@ -162,7 +158,7 @@ impl Persister for FileWithSyncPersister {
}
async fn overwrite<B: IoBuf>(&self, path: &str, bytes: B) -> Result<(),
IggyError> {
- let file = file::overwrite(path)
+ let mut file = file::overwrite(path)
.await
.with_error_context(|error| {
format!("{COMPONENT} (error: {error}) - failed to overwrite
file: {path}")
@@ -188,7 +184,7 @@ impl Persister for FileWithSyncPersister {
}
async fn delete(&self, path: &str) -> Result<(), IggyError> {
- fs::remove_file(path)
+ remove_file(path)
.await
.with_error_context(|error| {
format!("{COMPONENT} (error: {error}) - failed to delete file:
{path}")
diff --git a/core/server/src/streaming/persistence/task.rs
b/core/server/src/streaming/persistence/task.rs
index d1c716aa..1b359ef7 100644
--- a/core/server/src/streaming/persistence/task.rs
+++ b/core/server/src/streaming/persistence/task.rs
@@ -18,18 +18,18 @@
use crate::streaming::persistence::COMPONENT;
use bytes::Bytes;
+use compio::runtime::Task;
use error_set::ErrContext;
use flume::{Receiver, Sender, unbounded};
use iggy_common::IggyError;
-use monoio::task;
-use std::{sync::Arc, time::Duration};
+use std::{any::Any, sync::Arc, time::Duration};
use tracing::error;
use super::persister::PersisterKind;
pub struct LogPersisterTask {
_sender: Option<Sender<Bytes>>,
- _task_handle: Option<task::JoinHandle<()>>,
+ _task_handle: Option<Task<Result<(), Box<dyn Any + Send>>>>,
}
impl std::fmt::Debug for LogPersisterTask {
@@ -50,7 +50,7 @@ impl LogPersisterTask {
) -> Self {
let (sender, receiver): (Sender<Bytes>, Receiver<Bytes>) = unbounded();
- let task_handle = monoio::spawn(async move {
+ let task_handle = compio::runtime::spawn(async move {
loop {
match receiver.recv_async().await {
Ok(data) => {
@@ -129,7 +129,7 @@ impl Drop for LogPersisterTask {
self._sender.take();
if let Some(handle) = self._task_handle.take() {
- monoio::spawn(async move { handle.await });
+ compio::runtime::spawn(async move { handle.await });
}
}
}
diff --git a/core/server/src/streaming/segments/indexes/index_reader.rs
b/core/server/src/streaming/segments/indexes/index_reader.rs
index 8b09666a..ce235ef4 100644
--- a/core/server/src/streaming/segments/indexes/index_reader.rs
+++ b/core/server/src/streaming/segments/indexes/index_reader.rs
@@ -17,13 +17,16 @@
*/
use super::IggyIndexesMut;
-use crate::{io::file::IggyFile, streaming::utils::PooledBuffer};
+use crate::streaming::utils::PooledBuffer;
use bytes::BytesMut;
+use compio::{
+ BufResult,
+ fs::{File, OpenOptions},
+ io::AsyncReadAtExt,
+};
use error_set::ErrContext;
use iggy_common::{INDEX_SIZE, IggyError, IggyIndex, IggyIndexView};
-use monoio::fs::OpenOptions;
use std::{
- fs::File as StdFile,
io::ErrorKind,
os::unix::fs::FileExt,
sync::{
@@ -37,7 +40,7 @@ use tracing::{error, trace};
#[derive(Debug)]
pub struct IndexReader {
file_path: String,
- file: IggyFile,
+ file: File,
index_size_bytes: Arc<AtomicU64>,
}
@@ -51,7 +54,6 @@ impl IndexReader {
.with_error_context(|error| format!("Failed to open index file:
{file_path}. {error}"))
.map_err(|_| IggyError::CannotReadFile)?;
- let file = IggyFile::new(file);
trace!(
"Opened index file for reading: {file_path}, size: {}",
index_size_bytes.load(Ordering::Acquire)
@@ -337,13 +339,13 @@ impl IndexReader {
if use_pool {
let mut buf = PooledBuffer::with_capacity(len as usize);
unsafe { buf.set_len(len as usize) };
- let (result, buf) = self.file.read_exact_at(buf, offset as
u64).await;
+ let (result, buf) = self.file.read_exact_at(buf, offset as
u64).await.into();
result?;
Ok(buf)
} else {
let mut buf = BytesMut::with_capacity(len as usize);
unsafe { buf.set_len(len as usize) };
- let (result, buf) = self.file.read_exact_at(buf, offset as
u64).await;
+ let (result, buf) = self.file.read_exact_at(buf, offset as
u64).await.into();
result?;
Ok(PooledBuffer::from_existing(buf))
}
diff --git a/core/server/src/streaming/segments/indexes/index_writer.rs
b/core/server/src/streaming/segments/indexes/index_writer.rs
index 4b79f9d2..416cde21 100644
--- a/core/server/src/streaming/segments/indexes/index_writer.rs
+++ b/core/server/src/streaming/segments/indexes/index_writer.rs
@@ -16,25 +16,25 @@
* under the License.
*/
+use compio::fs::File;
+use compio::fs::OpenOptions;
+use compio::io::AsyncWriteAtExt;
use error_set::ErrContext;
use iggy_common::INDEX_SIZE;
use iggy_common::IggyError;
-use monoio::fs::OpenOptions;
-use monoio::io::AsyncWriteRentExt;
use std::sync::{
Arc,
atomic::{AtomicU64, Ordering},
};
use tracing::trace;
-use crate::io::file::IggyFile;
use crate::streaming::utils::PooledBuffer;
/// A dedicated struct for writing to the index file.
#[derive(Debug)]
pub struct IndexWriter {
file_path: String,
- file: IggyFile,
+ file: File,
index_size_bytes: Arc<AtomicU64>,
fsync: bool,
}
@@ -48,15 +48,13 @@ impl IndexWriter {
file_exists: bool,
) -> Result<Self, IggyError> {
let file = OpenOptions::new()
- .write(true)
- .append(true)
.create(true)
+ .write(true)
.open(file_path)
.await
.with_error_context(|error| format!("Failed to open index file:
{file_path}. {error}"))
.map_err(|_| IggyError::CannotReadFile)?;
- let file = IggyFile::from(file);
if file_exists {
let _ = file.sync_all().await.with_error_context(|error| {
format!("Failed to fsync index file after creation:
{file_path}. {error}",)
@@ -96,8 +94,9 @@ impl IndexWriter {
let count = indexes.len() / INDEX_SIZE;
let len = indexes.len();
+ let position = self.index_size_bytes.load(Ordering::Relaxed);
self.file
- .write_all(indexes)
+ .write_all_at(indexes, position)
.await
.0
.with_error_context(|error| {
diff --git a/core/server/src/streaming/segments/messages/messages_reader.rs
b/core/server/src/streaming/segments/messages/messages_reader.rs
index 4091e8e3..e81eb793 100644
--- a/core/server/src/streaming/segments/messages/messages_reader.rs
+++ b/core/server/src/streaming/segments/messages/messages_reader.rs
@@ -16,13 +16,13 @@
* under the License.
*/
-use crate::io::file::IggyFile;
use crate::streaming::segments::{IggyIndexesMut, IggyMessagesBatchMut};
-use crate::streaming::utils::{PooledBuffer, file};
+use crate::streaming::utils::PooledBuffer;
use bytes::BytesMut;
+use compio::fs::{File, OpenOptions};
+use compio::io::AsyncReadAtExt;
use error_set::ErrContext;
use iggy_common::IggyError;
-use monoio::fs::File;
use std::{
io::ErrorKind,
sync::{
@@ -36,7 +36,7 @@ use tracing::{error, trace};
#[derive(Debug)]
pub struct MessagesReader {
file_path: String,
- file: IggyFile,
+ file: File,
messages_size_bytes: Arc<AtomicU64>,
}
@@ -46,9 +46,12 @@ impl MessagesReader {
file_path: &str,
messages_size_bytes: Arc<AtomicU64>,
) -> Result<Self, IggyError> {
- let file = file::open_std(file_path)
+ let file = OpenOptions::new()
+ .read(true)
+ .open(file_path)
+ .await
.with_error_context(|error| {
- format!("Failed to open messages file: {file_path}, error:
{error}")
+ format!("Failed to open messages file: {file_path}. {error}")
})
.map_err(|_| IggyError::CannotReadFile)?;
@@ -67,7 +70,6 @@ impl MessagesReader {
)
});
}
- let file = File::from_std(file).unwrap();
trace!(
"Opened messages file for reading: {file_path}, size: {}",
@@ -177,13 +179,13 @@ impl MessagesReader {
if use_pool {
let mut buf = PooledBuffer::with_capacity(len as usize);
unsafe { buf.set_len(len as usize) };
- let (result, buf) = self.file.read_exact_at(buf, offset as
u64).await;
+ let (result, buf) = self.file.read_exact_at(buf, offset as
u64).await.into();
result?;
Ok(buf)
} else {
let mut buf = BytesMut::with_capacity(len as usize);
unsafe { buf.set_len(len as usize) };
- let (result, buf) = self.file.read_exact_at(buf, offset as
u64).await;
+ let (result, buf) = self.file.read_exact_at(buf, offset as
u64).await.into();
result?;
Ok(PooledBuffer::from_existing(buf))
}
diff --git a/core/server/src/streaming/segments/messages/messages_writer.rs
b/core/server/src/streaming/segments/messages/messages_writer.rs
index 63e424a9..f3d43d1e 100644
--- a/core/server/src/streaming/segments/messages/messages_writer.rs
+++ b/core/server/src/streaming/segments/messages/messages_writer.rs
@@ -16,13 +16,10 @@
* under the License.
*/
-use crate::{
- io::file::IggyFile,
- streaming::segments::{IggyMessagesBatchSet, messages::write_batch},
-};
+use crate::streaming::segments::{IggyMessagesBatchSet, messages::write_batch};
+use compio::fs::{File, OpenOptions};
use error_set::ErrContext;
use iggy_common::{IggyByteSize, IggyError};
-use monoio::fs::{File, OpenOptions};
use std::sync::{
Arc,
atomic::{AtomicU64, Ordering},
@@ -34,7 +31,7 @@ use tracing::{error, trace};
pub struct MessagesWriter {
file_path: String,
/// Holds the file for synchronous writes; when asynchronous persistence
is enabled, this will be None.
- file: Option<IggyFile>,
+ file: Option<File>,
/// When set, asynchronous writes are handled by this persister task.
messages_size_bytes: Arc<AtomicU64>,
fsync: bool,
@@ -53,11 +50,13 @@ impl MessagesWriter {
file_exists: bool,
) -> Result<Self, IggyError> {
let file = OpenOptions::new()
- .write(true)
.create(true)
- .append(true)
+ .write(true)
.open(file_path)
.await
+ .with_error_context(|err| {
+ format!("Failed to open messages file: {file_path}, error:
{err}")
+ })
.map_err(|_| IggyError::CannotReadFile)?;
if file_exists {
@@ -82,7 +81,7 @@ impl MessagesWriter {
messages_size_bytes.load(Ordering::Acquire)
);
- let file = Some(IggyFile::new(file));
+ let file = Some(file);
Ok(Self {
file_path: file_path.to_string(),
file,
@@ -103,16 +102,16 @@ impl MessagesWriter {
"Saving batch set of size {messages_size} bytes
({containers_count} containers, {messages_count} messages) to messages file:
{}",
self.file_path
);
+ let position = self.messages_size_bytes.load(Ordering::Relaxed);
if let Some(ref mut file) = self.file {
- write_batch(file, &self.file_path, batch_set)
+ write_batch(file, position, batch_set)
.await
.with_error_context(|error| {
format!(
"Failed to write batch to messages file: {}. {error}",
self.file_path
)
- })
- .map_err(|_| IggyError::CannotWriteToFile)?;
+ })?;
} else {
error!("File handle is not available for synchronous write.");
return Err(IggyError::CannotWriteToFile);
diff --git a/core/server/src/streaming/segments/messages/mod.rs
b/core/server/src/streaming/segments/messages/mod.rs
index 9799608f..a3989a2e 100644
--- a/core/server/src/streaming/segments/messages/mod.rs
+++ b/core/server/src/streaming/segments/messages/mod.rs
@@ -19,37 +19,25 @@
mod messages_reader;
mod messages_writer;
-use crate::io::file::IggyFile;
-
use super::IggyMessagesBatchSet;
-use error_set::ErrContext;
+use compio::{fs::File, io::AsyncWriteAtExt};
use iggy_common::IggyError;
-use monoio::io::AsyncWriteRentExt;
pub use messages_reader::MessagesReader;
pub use messages_writer::MessagesWriter;
/// Vectored write a batches of messages to file
async fn write_batch(
- file: &mut IggyFile,
- file_path: &str,
+ file: &mut File,
+ position: u64,
mut batches: IggyMessagesBatchSet,
) -> Result<usize, IggyError> {
- //let mut slices = batches.iter().map(|b|
to_iovec(&b)).collect::<Vec<iovec>>();
- let mut total_written = 0;
- // TODO: Fork monoio, piece of shit runtime.
- for batch in batches.iter_mut() {
- let messages = batch.take_messages();
- let writen = file
- .write_all(messages)
- .await
- .0
- .with_error_context(|error| {
- format!("Failed to write messages to file: {file_path}, error:
{error}",)
- // TODO: Better error variant.
- })
- .map_err(|_| IggyError::CannotAppendMessage)?;
- total_written += writen;
- }
+ let total_written = batches.iter().map(|b| b.size() as usize).sum();
+ let batches = batches
+ .iter_mut()
+ .map(|b| b.take_messages())
+ .collect::<Vec<_>>();
+ let (result, _) = file.write_vectored_all_at(batches,
position).await.into();
+ result.map_err(|_| IggyError::CannotWriteToFile)?;
Ok(total_written)
}
diff --git a/core/server/src/streaming/segments/segment.rs
b/core/server/src/streaming/segments/segment.rs
index 9887df90..c13ce07a 100644
--- a/core/server/src/streaming/segments/segment.rs
+++ b/core/server/src/streaming/segments/segment.rs
@@ -21,6 +21,7 @@ use super::messages::*;
use super::messages_accumulator::MessagesAccumulator;
use crate::configs::system::SystemConfig;
use crate::streaming::segments::*;
+use compio::fs::remove_file;
use error_set::ErrContext;
use iggy_common::INDEX_SIZE;
use iggy_common::IggyByteSize;
@@ -30,7 +31,6 @@ use iggy_common::IggyTimestamp;
use std::rc::Rc;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
-use tokio::fs::remove_file;
use tracing::{info, warn};
const SIZE_16MB: usize = 16 * 1024 * 1024;
@@ -302,7 +302,7 @@ impl Segment {
pub async fn shutdown_writing(&mut self) {
if let Some(log_writer) = self.messages_writer.take() {
//TODO: Fixme not sure whether we should spawn a task here.
- monoio::spawn(async move {
+ compio::runtime::spawn(async move {
let _ = log_writer.fsync().await;
});
} else {
@@ -314,7 +314,7 @@ impl Segment {
if let Some(index_writer) = self.index_writer.take() {
//TODO: Fixme not sure whether we should spawn a task here.
- monoio::spawn(async move {
+ compio::runtime::spawn(async move {
let _ = index_writer.fsync().await;
drop(index_writer)
});
diff --git a/core/server/src/streaming/streams/storage.rs
b/core/server/src/streaming/streams/storage.rs
index d9848f03..b2b9672c 100644
--- a/core/server/src/streaming/streams/storage.rs
+++ b/core/server/src/streaming/streams/storage.rs
@@ -23,12 +23,12 @@ use crate::streaming::streams::COMPONENT;
use crate::streaming::streams::stream::Stream;
use crate::streaming::topics::topic::Topic;
use ahash::AHashSet;
+use compio::fs;
+use compio::fs::create_dir_all;
use error_set::ErrContext;
use futures::future::join_all;
use iggy_common::IggyError;
use iggy_common::IggyTimestamp;
-use monoio::fs;
-use monoio::fs::create_dir_all;
use serde::{Deserialize, Serialize};
use std::path::Path;
use std::sync::Arc;
@@ -157,7 +157,7 @@ impl StreamStorage for FileStreamStorage {
for mut topic in unloaded_topics {
let loaded_topics = loaded_topics.clone();
let topic_state = state.topics.remove(&topic.topic_id).unwrap();
- let load_topic = monoio::spawn(async move {
+ let load_topic = compio::runtime::spawn(async move {
match topic.load(topic_state).await {
Ok(_) => loaded_topics.lock().await.push(topic),
Err(error) => error!(
diff --git a/core/server/src/streaming/topics/storage.rs
b/core/server/src/streaming/topics/storage.rs
index 8d92b9d1..f7a1fe38 100644
--- a/core/server/src/streaming/topics/storage.rs
+++ b/core/server/src/streaming/topics/storage.rs
@@ -25,13 +25,13 @@ use crate::streaming::topics::consumer_group::ConsumerGroup;
use crate::streaming::topics::topic::Topic;
use ahash::AHashSet;
use anyhow::Context;
+use compio::fs;
+use compio::fs::create_dir_all;
use error_set::ErrContext;
use futures::future::join_all;
use iggy_common::IggyError;
use iggy_common::locking::IggyRwLock;
use iggy_common::locking::IggySharedMutFn;
-use monoio::fs;
-use monoio::fs::create_dir_all;
use serde::{Deserialize, Serialize};
use std::path::Path;
use std::sync::Arc;
@@ -188,7 +188,7 @@ impl TopicStorage for FileTopicStorage {
for mut partition in unloaded_partitions {
let loaded_partitions = loaded_partitions.clone();
let partition_state =
state.partitions.remove(&partition.partition_id).unwrap();
- let load_partition = monoio::spawn(async move {
+ let load_partition = compio::runtime::spawn(async move {
match partition.load(partition_state).await {
Ok(_) => {
loaded_partitions.lock().await.push(partition);
diff --git a/core/server/src/streaming/utils/file.rs
b/core/server/src/streaming/utils/file.rs
index 7a5bb524..de00aa5e 100644
--- a/core/server/src/streaming/utils/file.rs
+++ b/core/server/src/streaming/utils/file.rs
@@ -16,7 +16,7 @@
* under the License.
*/
-use monoio::fs::{File, OpenOptions, remove_file};
+use compio::fs::{File, OpenOptions, remove_file};
use std::path::Path;
pub fn open_std(path: &str) -> Result<std::fs::File, std::io::Error> {
@@ -27,8 +27,14 @@ pub async fn open(path: &str) -> Result<File,
std::io::Error> {
OpenOptions::new().read(true).open(path).await
}
-pub async fn append(path: &str) -> Result<File, std::io::Error> {
- OpenOptions::new().read(true).append(true).open(path).await
+pub async fn append(path: &str) -> Result<(File, u64), std::io::Error> {
+ let file = OpenOptions::new()
+ .create(true)
+ .write(true)
+ .open(path)
+ .await?;
+ let position = file.metadata().await?.len();
+ Ok((file, position))
}
pub async fn overwrite(path: &str) -> Result<File, std::io::Error> {
@@ -45,7 +51,7 @@ pub async fn remove(path: &str) -> Result<(), std::io::Error>
{
}
pub async fn rename(old_path: &str, new_path: &str) -> Result<(),
std::io::Error> {
- monoio::fs::rename(Path::new(old_path), Path::new(new_path)).await
+ compio::fs::rename(Path::new(old_path), Path::new(new_path)).await
}
pub async fn exists(path: &str) -> Result<bool, std::io::Error> {
diff --git a/core/server/src/streaming/utils/pooled_buffer.rs
b/core/server/src/streaming/utils/pooled_buffer.rs
index a8664325..39ef25be 100644
--- a/core/server/src/streaming/utils/pooled_buffer.rs
+++ b/core/server/src/streaming/utils/pooled_buffer.rs
@@ -18,7 +18,7 @@
use super::memory_pool::{BytesMutExt, memory_pool};
use bytes::{Buf, BufMut, BytesMut};
-use monoio::buf::{IoBuf, IoBufMut};
+use compio::buf::{IoBuf, IoBufMut, SetBufInit};
use std::ops::{Deref, DerefMut};
#[derive(Debug)]
@@ -217,26 +217,32 @@ impl Buf for PooledBuffer {
}
}
-unsafe impl IoBufMut for PooledBuffer {
- fn write_ptr(&mut self) -> *mut u8 {
- self.inner.write_ptr()
- }
-
- fn bytes_total(&mut self) -> usize {
- self.inner.bytes_total()
+impl SetBufInit for PooledBuffer {
+ unsafe fn set_buf_init(&mut self, len: usize) {
+ if self.inner.len() <= len {
+ unsafe {
+ self.inner.set_len(len);
+ }
+ }
}
+}
- unsafe fn set_init(&mut self, pos: usize) {
- unsafe { self.inner.set_init(pos) }
+unsafe impl IoBufMut for PooledBuffer {
+ fn as_buf_mut_ptr(&mut self) -> *mut u8 {
+ self.inner.as_buf_mut_ptr()
}
}
unsafe impl IoBuf for PooledBuffer {
- fn read_ptr(&self) -> *const u8 {
- self.inner.read_ptr()
+ fn as_buf_ptr(&self) -> *const u8 {
+ self.inner.as_buf_ptr()
+ }
+
+ fn buf_len(&self) -> usize {
+ self.inner.len()
}
- fn bytes_init(&self) -> usize {
- self.inner.bytes_init()
+ fn buf_capacity(&self) -> usize {
+ self.inner.capacity()
}
}
diff --git a/core/server/src/tcp/connection_handler.rs
b/core/server/src/tcp/connection_handler.rs
index 418af387..9c6803b8 100644
--- a/core/server/src/tcp/connection_handler.rs
+++ b/core/server/src/tcp/connection_handler.rs
@@ -62,6 +62,7 @@ pub(crate) async fn handle_connection(
if error.as_code() ==
IggyError::ConnectionClosed.as_code() {
return Err(ConnectionError::from(error));
} else {
+ error!("got error: {:?}", error);
sender.send_error_response(error).await?;
continue;
}
diff --git a/core/server/src/tcp/sender.rs b/core/server/src/tcp/sender.rs
index 2f445d4f..2c0ecfa9 100644
--- a/core/server/src/tcp/sender.rs
+++ b/core/server/src/tcp/sender.rs
@@ -16,30 +16,37 @@
* under the License.
*/
-use bytes::BytesMut;
-use iggy_common::IggyError;
-use monoio::{
- buf::IoBufMut,
- io::{AsyncReadRent, AsyncReadRentExt, AsyncWriteRent, AsyncWriteRentExt},
+use bytes::{Bytes, BytesMut};
+use compio::{
+ BufResult,
+ buf::{IoBuf, IoBufMut},
+ io::{AsyncRead, AsyncReadAtExt, AsyncReadExt, AsyncWriteExt},
};
+use iggy_common::IggyError;
use nix::libc;
use std::io::IoSlice;
-use tracing::debug;
+use tracing::{debug, error};
+
+use crate::streaming::utils::PooledBuffer;
const STATUS_OK: &[u8] = &[0; 4];
pub(crate) async fn read<T, B>(stream: &mut T, buffer: B) -> (Result<usize,
IggyError>, B)
where
- T: AsyncReadRent + AsyncWriteRent + Unpin,
+ T: AsyncReadExt + AsyncWriteExt + Unpin,
B: IoBufMut,
{
- match stream.read_exact(buffer).await {
- (Ok(0), buffer) => (Err(IggyError::ConnectionClosed), buffer),
- (Ok(read_bytes), buffer) => (Ok(read_bytes), buffer),
+ let BufResult(result, buffer) = stream.read_exact(buffer).await;
+ match (result, buffer) {
+ (Ok(_), buffer) => (Ok(buffer.buf_len()), buffer),
+ // TODO: How to handle this ?(Ok(0), buffer) =>
(Err(IggyError::ConnectionClosed), buffer),
+ // `read_exact` from compio doesn't return how many bytes it read.
(Err(error), buffer) => {
if error.kind() == std::io::ErrorKind::UnexpectedEof {
+ //error!("Got some error tho.. {}", error);
(Err(IggyError::ConnectionClosed), buffer)
} else {
+ //error!("Got some other error tho.. {}", error);
(Err(IggyError::TcpError), buffer)
}
}
@@ -48,14 +55,14 @@ where
pub(crate) async fn send_empty_ok_response<T>(stream: &mut T) -> Result<(),
IggyError>
where
- T: AsyncReadRent + AsyncWriteRent + Unpin,
+ T: AsyncReadExt + AsyncWriteExt + Unpin,
{
send_ok_response(stream, &[]).await
}
pub(crate) async fn send_ok_response<T>(stream: &mut T, payload: &[u8]) ->
Result<(), IggyError>
where
- T: AsyncReadRent + AsyncWriteRent + Unpin,
+ T: AsyncReadExt + AsyncWriteExt + Unpin,
{
send_response(stream, STATUS_OK, payload).await
}
@@ -63,10 +70,10 @@ where
pub(crate) async fn send_ok_response_vectored<T>(
stream: &mut T,
length: &[u8],
- slices: Vec<libc::iovec>,
+ slices: Vec<PooledBuffer>,
) -> Result<(), IggyError>
where
- T: AsyncReadRentExt + AsyncWriteRentExt + Unpin,
+ T: AsyncReadExt + AsyncWriteExt + Unpin,
{
send_response_vectored(stream, STATUS_OK, length, slices).await
}
@@ -76,7 +83,7 @@ pub(crate) async fn send_error_response<T>(
error: IggyError,
) -> Result<(), IggyError>
where
- T: AsyncReadRent + AsyncWriteRent + Unpin,
+ T: AsyncReadExt + AsyncWriteExt + Unpin,
{
send_response(stream, &error.as_code().to_le_bytes(), &[]).await
}
@@ -87,7 +94,7 @@ pub(crate) async fn send_response<T>(
payload: &[u8],
) -> Result<(), IggyError>
where
- T: AsyncReadRent + AsyncWriteRent + Unpin,
+ T: AsyncReadExt + AsyncWriteExt + Unpin,
{
debug!(
"Sending response of len: {} with status: {:?}...",
@@ -108,32 +115,25 @@ pub(crate) async fn send_response_vectored<T>(
stream: &mut T,
status: &[u8],
length: &[u8],
- mut slices: Vec<libc::iovec>,
+ mut slices: Vec<PooledBuffer>,
) -> Result<(), IggyError>
where
- T: AsyncReadRentExt + AsyncWriteRentExt + Unpin,
+ T: AsyncReadExt + AsyncWriteExt + Unpin,
{
+ let resp_status = u32::from_le_bytes(status.try_into().unwrap());
debug!(
"Sending vectored response of len: {} with status: {:?}...",
slices.len(),
- status
+ resp_status
);
- let prefix = [
- libc::iovec {
- iov_base: status.as_ptr() as _,
- iov_len: status.len(),
- },
- libc::iovec {
- iov_base: length.as_ptr() as _,
- iov_len: length.len(),
- },
- ];
- slices.splice(0..0, prefix);
+ let status = PooledBuffer::from(status);
+ let length = PooledBuffer::from(length);
+ slices.splice(0..0, [status, length]);
stream
.write_vectored_all(slices)
.await
.0
.map_err(|_| IggyError::TcpError)?;
- debug!("Sent response with status: {:?}", status);
+ debug!("Sent response with status: {:?}", resp_status);
Ok(())
}
diff --git a/core/server/src/tcp/tcp_listener.rs
b/core/server/src/tcp/tcp_listener.rs
index 4f528016..b541305b 100644
--- a/core/server/src/tcp/tcp_listener.rs
+++ b/core/server/src/tcp/tcp_listener.rs
@@ -29,13 +29,19 @@ use std::rc::Rc;
use std::time::Duration;
use tracing::{error, info};
-pub async fn start(server_name: &'static str, addr: SocketAddr, socket:
Socket, shard: Rc<IggyShard>) -> Result<(), IggyError> {
+pub async fn start(
+ server_name: &'static str,
+ addr: SocketAddr,
+ socket: Socket,
+ shard: Rc<IggyShard>,
+) -> Result<(), IggyError> {
socket
.bind(&addr.into())
.expect("Failed to bind TCP listener");
socket.listen(1024).unwrap();
+ //TODO: Fix it later to use `TcpOpts` from compio.
let listener: std::net::TcpListener = socket.into();
- let listener = monoio::net::TcpListener::from_std(listener).unwrap();
+ let listener = compio::net::TcpListener::from_std(listener).unwrap();
info!("{server_name} server has started on: {:?}", addr);
loop {
@@ -44,7 +50,7 @@ pub async fn start(server_name: &'static str, addr:
SocketAddr, socket: Socket,
if shard.is_shutting_down() {
return;
}
- monoio::time::sleep(Duration::from_millis(100)).await;
+ compio::time::sleep(Duration::from_millis(100)).await;
}
};
diff --git a/core/server/src/tcp/tcp_sender.rs
b/core/server/src/tcp/tcp_sender.rs
index cfa25aad..d0a1c27c 100644
--- a/core/server/src/tcp/tcp_sender.rs
+++ b/core/server/src/tcp/tcp_sender.rs
@@ -17,14 +17,15 @@
*/
use crate::binary::sender::Sender;
+use crate::streaming::utils::PooledBuffer;
use crate::tcp::COMPONENT;
use crate::{server_error::ServerError, tcp::sender};
use bytes::BytesMut;
+use compio::buf::{IoBuf, IoBufMut};
+use compio::io::AsyncWrite;
+use compio::net::TcpStream;
use error_set::ErrContext;
use iggy_common::IggyError;
-use monoio::buf::IoBufMut;
-use monoio::io::AsyncWriteRent;
-use monoio::net::TcpStream;
use nix::libc;
#[derive(Debug)]
@@ -62,7 +63,7 @@ impl Sender for TcpSender {
async fn send_ok_response_vectored(
&mut self,
length: &[u8],
- slices: Vec<libc::iovec>,
+ slices: Vec<PooledBuffer>,
) -> Result<(), IggyError> {
sender::send_ok_response_vectored(&mut self.stream, length,
slices).await
}
diff --git a/core/server/src/tcp/tcp_tls_listener.rs
b/core/server/src/tcp/tcp_tls_listener.rs
index 2c81eb60..00d7b5c4 100644
--- a/core/server/src/tcp/tcp_tls_listener.rs
+++ b/core/server/src/tcp/tcp_tls_listener.rs
@@ -24,7 +24,6 @@ use crate::streaming::clients::client_manager::Transport;
use crate::tcp::connection_handler::{handle_connection, handle_error};
use futures::FutureExt;
use iggy_common::IggyError;
-use monoio_rustls::TlsAcceptor;
use rustls::ServerConfig;
use rustls::pki_types::{CertificateDer, PrivateKeyDer};
use rustls_pemfile::{certs, private_key};
@@ -42,106 +41,108 @@ pub(crate) async fn start(
socket: Socket,
shard: Rc<IggyShard>,
) -> Result<(), IggyError> {
- let _ =
rustls::crypto::aws_lc_rs::default_provider().install_default();
- let config = &shard.config.tcp.tls;
-
- let (certs, key) =
- if config.self_signed &&
!std::path::Path::new(&config.cert_file).exists() {
- info!("Generating self-signed certificate for TCP TLS server");
- generate_self_signed_cert()
- .unwrap_or_else(|e| panic!("Failed to generate self-signed
certificate: {e}"))
- } else {
- load_certificates(&config.cert_file, &config.key_file)
- .unwrap_or_else(|e| panic!("Failed to load certificates:
{e}"))
- };
-
- let server_config = ServerConfig::builder()
- .with_no_client_auth()
- .with_single_cert(certs, key)
- .unwrap_or_else(|e| panic!("Unable to create TLS server config:
{e}"));
-
- let acceptor = TlsAcceptor::from(Arc::new(server_config));
-
- socket
- .bind(&addr.into())
- .unwrap_or_else(|e| panic!("Unable to bind socket to address
'{addr}': {e}",));
-
- let listener: std::net::TcpListener = socket.into();
- let listener = monoio::net::TcpListener::from_std(listener).unwrap();
- info!("{server_name} server has started on: {:?}", addr);
-
- loop {
- let shutdown_check = async {
- loop {
- if shard.is_shutting_down() {
- return;
- }
- monoio::time::sleep(Duration::from_millis(100)).await;
+ /*
+ let _ = rustls::crypto::aws_lc_rs::default_provider().install_default();
+ let config = &shard.config.tcp.tls;
+
+ let (certs, key) =
+ if config.self_signed &&
!std::path::Path::new(&config.cert_file).exists() {
+ info!("Generating self-signed certificate for TCP TLS server");
+ generate_self_signed_cert()
+ .unwrap_or_else(|e| panic!("Failed to generate self-signed
certificate: {e}"))
+ } else {
+ load_certificates(&config.cert_file, &config.key_file)
+ .unwrap_or_else(|e| panic!("Failed to load certificates: {e}"))
+ };
+
+ let server_config = ServerConfig::builder()
+ .with_no_client_auth()
+ .with_single_cert(certs, key)
+ .unwrap_or_else(|e| panic!("Unable to create TLS server config: {e}"));
+
+ let acceptor = TlsAcceptor::from(Arc::new(server_config));
+
+ socket
+ .bind(&addr.into())
+ .unwrap_or_else(|e| panic!("Unable to bind socket to address '{addr}':
{e}",));
+
+ let listener: std::net::TcpListener = socket.into();
+ let listener = monoio::net::TcpListener::from_std(listener).unwrap();
+ info!("{server_name} server has started on: {:?}", addr);
+
+ loop {
+ let shutdown_check = async {
+ loop {
+ if shard.is_shutting_down() {
+ return;
}
- };
+ monoio::time::sleep(Duration::from_millis(100)).await;
+ }
+ };
- let accept_future = listener.accept();
- futures::select! {
- _ = shutdown_check.fuse() => {
- info!("TCP TLS server detected shutdown flag, no longer
accepting connections");
- break;
- }
- result = accept_future.fuse() => {
- match result {
- Ok((stream, address)) => {
- if shard.is_shutting_down() {
- info!("Rejecting new TLS connection from {}
during shutdown", address);
- continue;
- }
- let shard_clone = shard.clone();
- info!("Accepted new TCP TLS connection: {}",
address);
- let transport = Transport::Tcp;
- let session = shard_clone.add_client(&address,
transport);
- //TODO: Those can be shared with other shards.
- shard_clone.add_active_session(session.clone());
- // Broadcast session to all shards.
- let event = ShardEvent::NewSession { address,
transport };
- // TODO: Fixme look inside of
broadcast_event_to_all_shards method.
- let _responses =
shard_clone.broadcast_event_to_all_shards(event.into());
-
- let client_id = session.client_id;
- info!("Created new session: {session}");
- let acceptor = acceptor.clone();
-
- let conn_stop_receiver =
shard_clone.task_registry.add_connection(client_id);
-
- let shard_for_conn = shard_clone.clone();
- shard_clone.task_registry.spawn_tracked(async move
{
- match acceptor.accept(stream).await {
- Ok(tls_stream) => {
- let mut sender =
SenderKind::get_tcp_tls_sender(tls_stream.into());
- if let Err(error) =
handle_connection(&session, &mut sender, &shard_for_conn,
conn_stop_receiver).await {
- handle_error(error);
- }
-
shard_for_conn.task_registry.remove_connection(&client_id);
-
- if let Err(error) =
sender.shutdown().await {
- error!(
- "Failed to shutdown TCP TLS
stream for client: {client_id}, address: {address}. {error}"
- );
- } else {
- info!(
- "Successfully closed TCP TLS
stream for client: {client_id}, address: {address}."
- );
- }
+ let accept_future = listener.accept();
+ futures::select! {
+ _ = shutdown_check.fuse() => {
+ info!("TCP TLS server detected shutdown flag, no longer
accepting connections");
+ break;
+ }
+ result = accept_future.fuse() => {
+ match result {
+ Ok((stream, address)) => {
+ if shard.is_shutting_down() {
+ info!("Rejecting new TLS connection from {} during
shutdown", address);
+ continue;
+ }
+ let shard_clone = shard.clone();
+ info!("Accepted new TCP TLS connection: {}", address);
+ let transport = Transport::Tcp;
+ let session = shard_clone.add_client(&address,
transport);
+ //TODO: Those can be shared with other shards.
+ shard_clone.add_active_session(session.clone());
+ // Broadcast session to all shards.
+ let event = ShardEvent::NewSession { address,
transport };
+ // TODO: Fixme look inside of
broadcast_event_to_all_shards method.
+ let _responses =
shard_clone.broadcast_event_to_all_shards(event.into());
+
+ let client_id = session.client_id;
+ info!("Created new session: {session}");
+ let acceptor = acceptor.clone();
+
+ let conn_stop_receiver =
shard_clone.task_registry.add_connection(client_id);
+
+ let shard_for_conn = shard_clone.clone();
+ shard_clone.task_registry.spawn_tracked(async move {
+ match acceptor.accept(stream).await {
+ Ok(tls_stream) => {
+ let mut sender =
SenderKind::get_tcp_tls_sender(tls_stream.into());
+ if let Err(error) =
handle_connection(&session, &mut sender, &shard_for_conn,
conn_stop_receiver).await {
+ handle_error(error);
}
- Err(e) => {
- error!("Failed to accept TLS
connection from '{address}': {e}");
-
shard_for_conn.task_registry.remove_connection(&client_id);
+
shard_for_conn.task_registry.remove_connection(&client_id);
+
+ if let Err(error) =
sender.shutdown().await {
+ error!(
+ "Failed to shutdown TCP TLS stream
for client: {client_id}, address: {address}. {error}"
+ );
+ } else {
+ info!(
+ "Successfully closed TCP TLS
stream for client: {client_id}, address: {address}."
+ );
}
}
- });
- }
- Err(error) => error!("Unable to accept TCP TLS socket.
{error}"),
+ Err(e) => {
+ error!("Failed to accept TLS connection
from '{address}': {e}");
+
shard_for_conn.task_registry.remove_connection(&client_id);
+ }
+ }
+ });
}
+ Err(error) => error!("Unable to accept TCP TLS socket.
{error}"),
}
}
}
+ }
+ */
Ok(())
}
diff --git a/core/server/src/tcp/tcp_tls_sender.rs
b/core/server/src/tcp/tcp_tls_sender.rs
index b214e5a6..8dbb3972 100644
--- a/core/server/src/tcp/tcp_tls_sender.rs
+++ b/core/server/src/tcp/tcp_tls_sender.rs
@@ -17,41 +17,46 @@
*/
use crate::binary::sender::Sender;
+use crate::streaming::utils::PooledBuffer;
use crate::tcp::COMPONENT;
use crate::{server_error::ServerError, tcp::sender};
use bytes::BytesMut;
+use compio::buf::IoBufMut;
+use compio::io::AsyncWrite;
+use compio::net::TcpStream;
use error_set::ErrContext;
use iggy_common::IggyError;
-use monoio::buf::IoBufMut;
-use monoio::io::AsyncWriteRent;
-use monoio::net::TcpStream;
-use monoio_rustls::ServerTlsStream;
//use tokio_rustls::server::TlsStream;
use nix::libc;
#[derive(Debug)]
pub struct TcpTlsSender {
- pub(crate) stream: ServerTlsStream<TcpStream>,
+ pub(crate) stream: TcpStream,
}
impl Sender for TcpTlsSender {
async fn read<B: IoBufMut>(&mut self, buffer: B) -> (Result<usize,
IggyError>, B) {
+ todo!();
sender::read(&mut self.stream, buffer).await
}
async fn send_empty_ok_response(&mut self) -> Result<(), IggyError> {
+ todo!();
sender::send_empty_ok_response(&mut self.stream).await
}
async fn send_ok_response(&mut self, payload: &[u8]) -> Result<(),
IggyError> {
+ todo!();
sender::send_ok_response(&mut self.stream, payload).await
}
async fn send_error_response(&mut self, error: IggyError) -> Result<(),
IggyError> {
+ todo!();
sender::send_error_response(&mut self.stream, error).await
}
async fn shutdown(&mut self) -> Result<(), ServerError> {
+ todo!();
self.stream
.shutdown()
.await
@@ -64,8 +69,9 @@ impl Sender for TcpTlsSender {
async fn send_ok_response_vectored(
&mut self,
length: &[u8],
- slices: Vec<libc::iovec>,
+ slices: Vec<PooledBuffer>,
) -> Result<(), IggyError> {
+ todo!();
sender::send_ok_response_vectored(&mut self.stream, length,
slices).await
}
}