This is an automated email from the ASF dual-hosted git repository. hgruszecki pushed a commit to branch io_uring_tpc_direct_io_socket_transfer in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 9e984026bf01c1a77b60041a166bc4c65d24b7b3 Author: Hubert Gruszecki <[email protected]> AuthorDate: Mon Jul 7 21:33:25 2025 +0200 feat(io_uring): implement directio and aligned PooledBuffer --- Cargo.lock | 1018 ++++---------------- core/server/Cargo.toml | 1 + .../handlers/messages/send_messages_handler.rs | 42 +- core/server/src/binary/handlers/utils.rs | 3 +- core/server/src/binary/sender.rs | 7 +- core/server/src/main.rs | 11 + core/server/src/quic/quic_sender.rs | 2 +- core/server/src/shard/mod.rs | 28 +- core/server/src/shard/system/storage.rs | 2 +- core/server/src/streaming/segments/direct_file.rs | 443 +++++++++ .../src/streaming/segments/indexes/index_reader.rs | 27 +- .../src/streaming/segments/indexes/index_writer.rs | 101 +- .../streaming/segments/messages/messages_reader.rs | 26 +- .../streaming/segments/messages/messages_writer.rs | 113 +-- core/server/src/streaming/segments/messages/mod.rs | 28 +- core/server/src/streaming/segments/mod.rs | 2 + core/server/src/streaming/segments/segment.rs | 40 +- .../streaming/segments/types/messages_batch_mut.rs | 15 +- .../src/streaming/segments/writing_messages.rs | 12 +- core/server/src/streaming/utils/memory_pool.rs | 64 +- core/server/src/streaming/utils/mod.rs | 2 +- core/server/src/streaming/utils/pooled_buffer.rs | 152 +-- core/server/src/tcp/connection_handler.rs | 13 +- core/server/src/tcp/sender.rs | 18 +- core/server/src/tcp/tcp_sender.rs | 6 +- core/server/src/tcp/tcp_tls_sender.rs | 2 +- 26 files changed, 977 insertions(+), 1201 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2a32b031d..20d0a61bd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -308,12 +308,12 @@ dependencies = [ ] [[package]] -name = "aligned-array" -version = "1.0.1" +name = "aligned-vec" +version = "0.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e05c92d086290f52938013f6242ac62bf7d401fab8ad36798a609faa65c3fd2c" +checksum = "dc890384c8602f339876ded803c97ad529f3842aba97f6392b3dba0dd171769b" dependencies = [ - "generic-array", + "equator", ] [[package]] @@ -619,15 +619,6 @@ dependencies = [ "tokio-util", ] -[[package]] -name = "atoi" -version = "2.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f28d99ec8bfea296261ca1af174f24225171fea9664ba9003cbebee704810528" -dependencies = [ - "num-traits", -] - [[package]] name = "atomic" version = "0.6.1" @@ -652,12 +643,44 @@ version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0" +[[package]] +name = "attohttpc" +version = "0.28.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07a9b245ba0739fc90935094c29adbaee3f977218b5fb95e822e261cda7f56a3" +dependencies = [ + "http 1.3.1", + "log", + "rustls", + "serde", + "serde_json", + "url", + "webpki-roots 0.26.11", +] + [[package]] name = "autocfg" version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8" +[[package]] +name = "aws-creds" +version = "0.38.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba912106484991c456adb3364338a2534d0818bd9374b324b608074e3b55f581" +dependencies = [ + "attohttpc", + "home", + "log", + "quick-xml 0.32.0", + "rust-ini", + "serde", + "thiserror 1.0.69", + "time", + "url", +] + [[package]] name = "aws-lc-rs" version = "1.13.1" @@ -681,6 +704,15 @@ dependencies = [ "fs_extra", ] +[[package]] +name = "aws-region" +version = "0.26.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "73ae4ae7c45238b60af0a3b27ef2fcc7bd5b8fdcd8a6d679919558b40d3eff7a" +dependencies = [ + "thiserror 1.0.69", +] + [[package]] name = "axum" version = "0.7.9" @@ -831,12 +863,6 @@ version = "0.22.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" -[[package]] -name = "base64ct" -version = "1.8.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "55248b47b0caf0546f7988906588779981c43bb1bc9d0c44087278f80cdb44ba" - [[package]] name = "bcrypt" version = "0.17.0" @@ -1285,6 +1311,15 @@ dependencies = [ "thiserror 2.0.12", ] +[[package]] +name = "castaway" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0abae9be0aaf9ea96a3b1b8b1b55c602ca751eba1b1500220cea4ecbafe7c0d5" +dependencies = [ + "rustversion", +] + [[package]] name = "cc" version = "1.2.27" @@ -1492,23 +1527,16 @@ dependencies = [ ] [[package]] -name = "compio" -version = "0.15.0" +name = "compact_str" +version = "0.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "713c6293af093c202ad318e8f7bdc1de1a36d7a793bb77f7fc6bd6f1788659a9" +checksum = "f86b9c4c00838774a6d902ef931eff7470720c51d90c2e32cfe15dc304737b3f" dependencies = [ - "compio-buf 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)", - "compio-dispatcher", - "compio-driver 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)", - "compio-fs 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)", - "compio-io 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)", - "compio-log 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", - "compio-net 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)", - "compio-process", - "compio-quic", - "compio-runtime 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)", - "compio-signal", - "compio-tls 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)", + "castaway", + "cfg-if", + "itoa", + "ryu", + "static_assertions", ] [[package]] @@ -1516,26 +1544,15 @@ name = "compio" version = "0.15.0" source = "git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be#fe4243f0b6811ebc325afd081c9b087b4d9817be" dependencies = [ - "compio-buf 0.6.0 (git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be)", - "compio-driver 0.8.1 (git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be)", - "compio-fs 0.8.0 (git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be)", - "compio-io 0.7.0 (git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be)", - "compio-log 0.1.0 (git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be)", + "compio-buf", + "compio-driver", + "compio-fs", + "compio-io", + "compio-log", "compio-macros", - "compio-net 0.8.0 (git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be)", - "compio-runtime 0.8.1 (git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be)", - "compio-tls 0.6.0 (git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be)", -] - -[[package]] -name = "compio-buf" -version = "0.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3ce94a45a47ef8c0e3f44084fe67c8effc25e7ac1de6de2ee1a29a59e6c6ba8e" -dependencies = [ - "arrayvec", - "bytes", - "libc", + "compio-net", + "compio-runtime", + "compio-tls", ] [[package]] @@ -1548,43 +1565,6 @@ dependencies = [ "libc", ] -[[package]] -name = "compio-dispatcher" -version = "0.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0cdf8c613be826be410d8744ab30acc49cc5134a78e2aa25efae9efa44bed6a7" -dependencies = [ - "compio-driver 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)", - "compio-runtime 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)", - "flume", - "futures-channel", -] - -[[package]] -name = "compio-driver" -version = "0.8.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "737212fe00b4af769f7e8f156c25ffafd5888d4d21834e100ea068dea1086ef8" -dependencies = [ - "aligned-array", - "cfg-if", - "cfg_aliases", - "compio-buf 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)", - "compio-log 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", - "crossbeam-channel", - "crossbeam-queue", - "futures-util", - "io-uring", - "io_uring_buf_ring", - "libc", - "once_cell", - "paste", - "polling", - "slab", - "socket2 0.5.10", - "windows-sys 0.52.0", -] - [[package]] name = "compio-driver" version = "0.8.1" @@ -1592,8 +1572,8 @@ source = "git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd dependencies = [ "cfg-if", "cfg_aliases", - "compio-buf 0.6.0 (git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be)", - "compio-log 0.1.0 (git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be)", + "compio-buf", + "compio-log", "crossbeam-channel", "crossbeam-queue", "futures-util", @@ -1608,24 +1588,6 @@ dependencies = [ "windows-sys 0.60.2", ] -[[package]] -name = "compio-fs" -version = "0.8.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9bcf65e631d521c666bca25595f8e5c78173e96f0b3b61f0a7d93f31d9661d32" -dependencies = [ - "cfg-if", - "cfg_aliases", - "compio-buf 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)", - "compio-driver 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)", - "compio-io 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)", - "compio-runtime 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)", - "libc", - "os_pipe", - "widestring", - "windows-sys 0.52.0", -] - [[package]] name = "compio-fs" version = "0.8.0" @@ -1633,48 +1595,27 @@ source = "git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd dependencies = [ "cfg-if", "cfg_aliases", - "compio-buf 0.6.0 (git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be)", - "compio-driver 0.8.1 (git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be)", - "compio-io 0.7.0 (git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be)", - "compio-runtime 0.8.1 (git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be)", + "compio-buf", + "compio-driver", + "compio-io", + "compio-runtime", "libc", "os_pipe", "widestring", "windows-sys 0.60.2", ] -[[package]] -name = "compio-io" -version = "0.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c2b05cc4142659f2c90b6e44c68568ff71c83c6fb9285aca686952250b914932" -dependencies = [ - "compio-buf 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)", - "futures-util", - "paste", - "pin-project-lite", -] - [[package]] name = "compio-io" version = "0.7.0" source = "git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be#fe4243f0b6811ebc325afd081c9b087b4d9817be" dependencies = [ - "compio-buf 0.6.0 (git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be)", + "compio-buf", "futures-util", "paste", "pin-project-lite", ] -[[package]] -name = "compio-log" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fc4e560213c1996b618da369b7c9109564b41af9033802ae534465c4ee4e132f" -dependencies = [ - "tracing", -] - [[package]] name = "compio-log" version = "0.1.0" @@ -1694,35 +1635,16 @@ dependencies = [ "syn 2.0.104", ] -[[package]] -name = "compio-net" -version = "0.8.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0c1fabe3393bc0c3a0dca8e99a35bf97e42caa12bb3cc6bba83df04e28c9c142" -dependencies = [ - "cfg-if", - "compio-buf 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)", - "compio-driver 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)", - "compio-io 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)", - "compio-runtime 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)", - "either", - "libc", - "once_cell", - "socket2 0.5.10", - "widestring", - "windows-sys 0.52.0", -] - [[package]] name = "compio-net" version = "0.8.0" source = "git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be#fe4243f0b6811ebc325afd081c9b087b4d9817be" dependencies = [ "cfg-if", - "compio-buf 0.6.0 (git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be)", - "compio-driver 0.8.1 (git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be)", - "compio-io 0.7.0 (git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be)", - "compio-runtime 0.8.1 (git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be)", + "compio-buf", + "compio-driver", + "compio-io", + "compio-runtime", "either", "libc", "once_cell", @@ -1731,64 +1653,6 @@ dependencies = [ "windows-sys 0.60.2", ] -[[package]] -name = "compio-process" -version = "0.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3867cfe7b23eaae89ff815aba4fdde61cb6fd55f81fd368128300c6b7e645016" -dependencies = [ - "cfg-if", - "compio-buf 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)", - "compio-driver 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)", - "compio-io 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)", - "compio-runtime 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)", - "futures-util", - "windows-sys 0.52.0", -] - -[[package]] -name = "compio-quic" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6f107e044329f1e171930801b09bfc6e764c5e171e45c7a3e382f98561da619a" -dependencies = [ - "cfg_aliases", - "compio-buf 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)", - "compio-io 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)", - "compio-log 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", - "compio-net 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)", - "compio-runtime 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)", - "flume", - "futures-util", - "libc", - "quinn-proto", - "rustc-hash 2.1.1", - "rustls", - "thiserror 2.0.12", - "windows-sys 0.52.0", -] - -[[package]] -name = "compio-runtime" -version = "0.8.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b7df559e87b7ab05ba61c32619f6076dd5cc2daf5a8cb30cb9931fb355d20aff" -dependencies = [ - "async-task", - "cfg-if", - "compio-buf 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)", - "compio-driver 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)", - "compio-log 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", - "crossbeam-queue", - "futures-util", - "libc", - "once_cell", - "scoped-tls", - "slab", - "socket2 0.5.10", - "windows-sys 0.52.0", -] - [[package]] name = "compio-runtime" version = "0.8.1" @@ -1796,9 +1660,9 @@ source = "git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd dependencies = [ "async-task", "cfg-if", - "compio-buf 0.6.0 (git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be)", - "compio-driver 0.8.1 (git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be)", - "compio-log 0.1.0 (git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be)", + "compio-buf", + "compio-driver", + "compio-log", "core_affinity", "crossbeam-queue", "futures-util", @@ -1810,40 +1674,13 @@ dependencies = [ "windows-sys 0.60.2", ] -[[package]] -name = "compio-signal" -version = "0.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "03d2931880b03b33d4df7d2b8a008e93731366d185358c7442fc8d24d5f9c1bd" -dependencies = [ - "compio-buf 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)", - "compio-driver 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)", - "compio-runtime 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)", - "libc", - "once_cell", - "os_pipe", - "slab", - "windows-sys 0.52.0", -] - -[[package]] -name = "compio-tls" -version = "0.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "542bb0e0f6f65cb84bc09b7e052fa54f006d1ba228a8dfad6d7b9676defe7232" -dependencies = [ - "compio-buf 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)", - "compio-io 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)", - "rustls", -] - [[package]] name = "compio-tls" version = "0.6.0" source = "git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be#fe4243f0b6811ebc325afd081c9b087b4d9817be" dependencies = [ - "compio-buf 0.6.0 (git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be)", - "compio-io 0.7.0 (git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be)", + "compio-buf", + "compio-io", "rustls", ] @@ -1937,12 +1774,6 @@ dependencies = [ "wasm-bindgen", ] -[[package]] -name = "const-oid" -version = "0.9.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c2459377285ad874054d797f3ccebf984978aa39129f6eafde5cdc8315b612f8" - [[package]] name = "const-random" version = "0.1.18" @@ -2040,21 +1871,6 @@ dependencies = [ "libc", ] -[[package]] -name = "crc" -version = "3.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9710d3b3739c2e349eb44fe848ad0b7c8cb1e42bd87ee49371df2f7acaf3e675" -dependencies = [ - "crc-catalog", -] - -[[package]] -name = "crc-catalog" -version = "2.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "19d374276b40fb8bbdee95aef7c7fa6b5316ec764510eb64b8dd0e2ed0d7e7f5" - [[package]] name = "crc32fast" version = "1.4.2" @@ -2259,46 +2075,6 @@ dependencies = [ "regex-syntax 0.7.5", ] -[[package]] -name = "cyper" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "89b65af5073b4f53c9697b611b414042e71c6a14e11088438a67b1ef36f51ca2" -dependencies = [ - "async-stream", - "base64 0.22.1", - "compio 0.15.0 (registry+https://github.com/rust-lang/crates.io-index)", - "cyper-core", - "encoding_rs", - "futures-util", - "http 1.3.1", - "http-body-util", - "hyper", - "hyper-util", - "mime", - "send_wrapper", - "serde", - "serde_urlencoded", - "thiserror 2.0.12", - "url", -] - -[[package]] -name = "cyper-core" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6343deaa569c748860d9afefab636648e7e6f9abdfc26b3b9dde327170ae6b2b" -dependencies = [ - "cfg-if", - "compio 0.15.0 (registry+https://github.com/rust-lang/crates.io-index)", - "futures-util", - "hyper", - "hyper-util", - "rustls-platform-verifier 0.6.0", - "send_wrapper", - "tower-service", -] - [[package]] name = "darling" version = "0.20.11" @@ -2379,17 +2155,6 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "da692b8d1080ea3045efaab14434d40468c3d8657e42abddfffca87b428f4c1b" -[[package]] -name = "der" -version = "0.7.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e7c1832837b905bbfb5101e07cc24c8deddf52f93225eee6ead5f4d63d53ddcb" -dependencies = [ - "const-oid", - "pem-rfc7468", - "zeroize", -] - [[package]] name = "deranged" version = "0.4.0" @@ -2501,7 +2266,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292" dependencies = [ "block-buffer", - "const-oid", "crypto-common", "subtle", ] @@ -2643,9 +2407,6 @@ name = "either" version = "1.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719" -dependencies = [ - "serde", -] [[package]] name = "embedded-io" @@ -2710,10 +2471,30 @@ dependencies = [ ] [[package]] -name = "equivalent" -version = "1.0.2" +name = "equator" +version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "877a4ace8713b0bcf2a4e7eec82529c029f1d0619886d18145fea96c3ffe5c0f" +checksum = "4711b213838dfee0117e3be6ac926007d7f433d7bbe33595975d4190cb07e6fc" +dependencies = [ + "equator-macro", +] + +[[package]] +name = "equator-macro" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "44f23cf4b44bfce11a86ace86f8a73ffdec849c9fd00a386a53d278bd9e81fb3" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.104", +] + +[[package]] +name = "equivalent" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "877a4ace8713b0bcf2a4e7eec82529c029f1d0619886d18145fea96c3ffe5c0f" [[package]] name = "err_trail" @@ -2758,17 +2539,6 @@ dependencies = [ "syn 2.0.104", ] -[[package]] -name = "etcetera" -version = "0.8.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "136d1b5283a1ab77bd9257427ffd09d8667ced0570b6f938942bc7568ed5b943" -dependencies = [ - "cfg-if", - "home", - "windows-sys 0.48.0", -] - [[package]] name = "event-listener" version = "5.4.0" @@ -3042,17 +2812,6 @@ dependencies = [ "futures-util", ] -[[package]] -name = "futures-intrusive" -version = "0.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1d930c203dd0b6ff06e0201a4a2fe9149b43c684fd4420555b26d21b1a02956f" -dependencies = [ - "futures-core", - "lock_api", - "parking_lot 0.12.4", -] - [[package]] name = "futures-io" version = "0.3.31" @@ -3843,15 +3602,6 @@ version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" -[[package]] -name = "hkdf" -version = "0.12.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7b5f8eb2ad728638ea2c7d47a21db23b7b58a72ed6a38256b8a1849f15fbbdf7" -dependencies = [ - "hmac", -] - [[package]] name = "hmac" version = "0.12.1" @@ -4219,7 +3969,6 @@ dependencies = [ "integration", "nonzero_lit", "rand 0.9.1", - "rayon", "serde", "sysinfo 0.35.2", "tokio", @@ -4329,43 +4078,6 @@ dependencies = [ "tracing", ] -[[package]] -name = "iggy_connector_postgres_sink" -version = "0.1.0" -dependencies = [ - "async-trait", - "chrono", - "dashmap", - "futures", - "iggy_connector_sdk", - "once_cell", - "serde", - "simd-json", - "sqlx", - "tokio", - "tracing", -] - -[[package]] -name = "iggy_connector_postgres_source" -version = "0.1.0" -dependencies = [ - "async-trait", - "chrono", - "dashmap", - "futures", - "humantime", - "iggy_connector_sdk", - "once_cell", - "serde", - "serde_json", - "simd-json", - "sqlx", - "tokio", - "tracing", - "uuid", -] - [[package]] name = "iggy_connector_quickwit_sink" version = "0.1.0" @@ -4591,7 +4303,6 @@ dependencies = [ "async-trait", "bytes", "chrono", - "compio 0.15.0 (git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be)", "ctor", "derive_more 2.0.1", "env_logger", @@ -4869,9 +4580,6 @@ name = "lazy_static" version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" -dependencies = [ - "spin", -] [[package]] name = "lazycell" @@ -4968,12 +4676,6 @@ dependencies = [ "pkg-config", ] -[[package]] -name = "libm" -version = "0.2.15" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f9fbbcab51052fe104eb5e5d351cf728d30a5be1fe14d9be8a3b097481fb97de" - [[package]] name = "libmimalloc-sys" version = "0.1.43" @@ -4995,16 +4697,6 @@ dependencies = [ "redox_syscall 0.5.13", ] -[[package]] -name = "libsqlite3-sys" -version = "0.30.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2e99fb7a497b1e3339bc746195567ed8d3e24945ecd636e3619d20b9de9e9149" -dependencies = [ - "pkg-config", - "vcpkg", -] - [[package]] name = "libyml" version = "0.0.5" @@ -5184,15 +4876,22 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "47e1ffaa40ddd1f3ed91f717a33c8c0ee23fff369e3aa8772b9605cc1d22f4c3" [[package]] -name = "md-5" -version = "0.10.6" +name = "maybe-async" +version = "0.2.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d89e7ee0cfbedfc4da3340218492196241d89eefb6dab27de5df917a6d2e78cf" +checksum = "5cf92c10c7e361d6b99666ec1c6f9805b0bea2c3bd8c78dc6fe98ac5bd78db11" dependencies = [ - "cfg-if", - "digest", + "proc-macro2", + "quote", + "syn 2.0.104", ] +[[package]] +name = "md5" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "490cc448043f947bae3cbee9c203358d62dbee0db12107a74be5c30ccfd09771" + [[package]] name = "memchr" version = "2.7.5" @@ -5246,6 +4945,15 @@ dependencies = [ "unicase", ] +[[package]] +name = "minidom" +version = "0.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e394a0e3c7ccc2daea3dffabe82f09857b6b510cb25af87d54bf3e910ac1642d" +dependencies = [ + "rxml", +] + [[package]] name = "minimal-lexical" version = "0.2.1" @@ -5476,23 +5184,6 @@ dependencies = [ "num-traits", ] -[[package]] -name = "num-bigint-dig" -version = "0.8.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dc84195820f291c7697304f3cbdadd1cb7199c0efc917ff5eafd71225c136151" -dependencies = [ - "byteorder", - "lazy_static", - "libm", - "num-integer", - "num-iter", - "num-traits", - "rand 0.8.5", - "smallvec", - "zeroize", -] - [[package]] name = "num-complex" version = "0.4.6" @@ -5561,7 +5252,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "071dfc062690e90b734c0b2273ce72ad0ffa95f0c74596bc250dcfd960262841" dependencies = [ "autocfg", - "libm", ] [[package]] @@ -5996,15 +5686,6 @@ dependencies = [ "serde", ] -[[package]] -name = "pem-rfc7468" -version = "0.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "88b39c9bfcfc231068454382784bb460aae594343fb030d46e9f50a645418412" -dependencies = [ - "base64ct", -] - [[package]] name = "percent-encoding" version = "2.3.1" @@ -6098,27 +5779,6 @@ dependencies = [ "thiserror 1.0.69", ] -[[package]] -name = "pkcs1" -version = "0.7.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c8ffb9f10fa047879315e6625af03c164b16962a5368d724ed16323b68ace47f" -dependencies = [ - "der", - "pkcs8", - "spki", -] - -[[package]] -name = "pkcs8" -version = "0.10.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f950b2377845cebe5cf8b5165cb3cc1a5e0fa5cfa3e1f7f55707d8fd82e0a7b7" -dependencies = [ - "der", - "spki", -] - [[package]] name = "pkg-config" version = "0.3.32" @@ -6463,9 +6123,19 @@ dependencies = [ [[package]] name = "quick-xml" -version = "0.37.5" +version = "0.32.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "331e97a1af0bf59823e6eadffe373d7b27f485be8748f71471c662c1f269b7fb" +checksum = "1d3a6e5838b60e0e8fa7a43f22ade549a37d61f8bdbe636d0d7816191de969c2" +dependencies = [ + "memchr", + "serde", +] + +[[package]] +name = "quick-xml" +version = "0.36.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7649a7b4df05aed9ea7ec6f628c67c9953a43869b8bc50929569b2999d443fe" dependencies = [ "memchr", "serde", @@ -6506,7 +6176,7 @@ dependencies = [ "rustc-hash 2.1.1", "rustls", "rustls-pki-types", - "rustls-platform-verifier 0.5.3", + "rustls-platform-verifier", "slab", "thiserror 2.0.12", "tinyvec", @@ -6794,12 +6464,14 @@ dependencies = [ "sync_wrapper", "tokio", "tokio-rustls", + "tokio-util", "tower 0.5.2", "tower-http", "tower-service", "url", "wasm-bindgen", "wasm-bindgen-futures", + "wasm-streams", "web-sys", "webpki-roots 1.0.1", ] @@ -6911,26 +6583,6 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "afab94fb28594581f62d981211a9a4d53cc8130bbcbbb89a0440d9b8e81a7746" -[[package]] -name = "rsa" -version = "0.9.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "78928ac1ed176a5ca1d17e578a1825f3d81ca54cf41053a592584b020cfd691b" -dependencies = [ - "const-oid", - "digest", - "num-bigint-dig", - "num-integer", - "num-traits", - "pkcs1", - "pkcs8", - "rand_core 0.6.4", - "signature", - "spki", - "subtle", - "zeroize", -] - [[package]] name = "rust-ini" version = "0.21.1" @@ -6942,6 +6594,40 @@ dependencies = [ "trim-in-place", ] +[[package]] +name = "rust-s3" +version = "0.36.0-beta.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc7d6f3a3dd397743e8f344ffc80ea7137aee423983ae25b512e5332ad11362f" +dependencies = [ + "async-trait", + "aws-creds", + "aws-region", + "base64 0.22.1", + "bytes", + "cfg-if", + "futures", + "hex", + "hmac", + "http 1.3.1", + "log", + "maybe-async", + "md5", + "minidom", + "percent-encoding", + "quick-xml 0.36.2", + "reqwest", + "serde", + "serde_derive", + "serde_json", + "sha2", + "thiserror 1.0.69", + "time", + "tokio", + "tokio-stream", + "url", +] + [[package]] name = "rust_decimal" version = "1.37.2" @@ -7079,27 +6765,6 @@ dependencies = [ "windows-sys 0.59.0", ] -[[package]] -name = "rustls-platform-verifier" -version = "0.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eda84358ed17f1f354cf4b1909ad346e6c7bc2513e8c40eb08e0157aa13a9070" -dependencies = [ - "core-foundation", - "core-foundation-sys", - "jni", - "log", - "once_cell", - "rustls", - "rustls-native-certs", - "rustls-platform-verifier-android", - "rustls-webpki", - "security-framework", - "security-framework-sys", - "webpki-root-certs 1.0.1", - "windows-sys 0.52.0", -] - [[package]] name = "rustls-platform-verifier-android" version = "0.1.1" @@ -7125,22 +6790,22 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a0d197bd2c9dc6e53b84da9556a69ba4cdfab8619eb41a8bd1cc2027a0f6b1d" [[package]] -name = "rusty-s3" -version = "0.7.0" +name = "rxml" +version = "0.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8f51a5a6b15f25d3e10c068039ee13befb6110fcb36c2b26317bcbdc23484d96" +checksum = "65bc94b580d0f5a6b7a2d604e597513d3c673154b52ddeccd1d5c32360d945ee" dependencies = [ - "base64 0.22.1", - "hmac", - "md-5", - "percent-encoding", - "quick-xml", - "serde", - "serde_json", - "sha2", - "time", - "url", - "zeroize", + "bytes", + "rxml_validation", +] + +[[package]] +name = "rxml_validation" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "826e80413b9a35e9d33217b3dcac04cf95f6559d15944b93887a08be5496c4a4" +dependencies = [ + "compact_str", ] [[package]] @@ -7274,15 +6939,6 @@ dependencies = [ "serde", ] -[[package]] -name = "send_wrapper" -version = "0.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cd0b0ec5f1c1ca621c432a25813d8d60c88abe6d3e08a3eb9cf37d97a0fe3d73" -dependencies = [ - "futures-core", -] - [[package]] name = "serde" version = "1.0.219" @@ -7444,6 +7100,7 @@ name = "server" version = "0.5.0" dependencies = [ "ahash 0.8.12", + "aligned-vec", "anyhow", "async-channel", "async_zip", @@ -7455,11 +7112,10 @@ dependencies = [ "bytes", "chrono", "clap", - "compio 0.15.0 (git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be)", + "compio", "console-subscriber", "crossbeam", "ctrlc", - "cyper", "dashmap", "derive_more 2.0.1", "dotenvy", @@ -7488,9 +7144,9 @@ dependencies = [ "quinn", "reqwest", "ring", + "rust-s3", "rustls", "rustls-pemfile", - "rusty-s3", "serde", "serde_with", "serial_test", @@ -7571,16 +7227,6 @@ dependencies = [ "libc", ] -[[package]] -name = "signature" -version = "2.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "77549399552de45a898a580c1b41d445bf730df867cc44e6c0233bbc4b8329de" -dependencies = [ - "digest", - "rand_core 0.6.4", -] - [[package]] name = "simd-adler32" version = "0.3.7" @@ -7637,9 +7283,6 @@ name = "smallvec" version = "1.15.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "67b1b7a3b5fe4f1376887184045fcf45c69e92af734b7aaddc05fb777b6fbd03" -dependencies = [ - "serde", -] [[package]] name = "smart-default" @@ -7717,214 +7360,6 @@ dependencies = [ "lock_api", ] -[[package]] -name = "spki" -version = "0.7.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d91ed6c858b01f942cd56b37a94b3e0a1798290327d1236e4d9cf4eaca44d29d" -dependencies = [ - "base64ct", - "der", -] - -[[package]] -name = "sqlx" -version = "0.8.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1fefb893899429669dcdd979aff487bd78f4064e5e7907e4269081e0ef7d97dc" -dependencies = [ - "sqlx-core", - "sqlx-macros", - "sqlx-mysql", - "sqlx-postgres", - "sqlx-sqlite", -] - -[[package]] -name = "sqlx-core" -version = "0.8.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ee6798b1838b6a0f69c007c133b8df5866302197e404e8b6ee8ed3e3a5e68dc6" -dependencies = [ - "base64 0.22.1", - "bytes", - "chrono", - "crc", - "crossbeam-queue", - "either", - "event-listener", - "futures-core", - "futures-intrusive", - "futures-io", - "futures-util", - "hashbrown 0.15.4", - "hashlink", - "indexmap 2.10.0", - "log", - "memchr", - "once_cell", - "percent-encoding", - "rustls", - "serde", - "serde_json", - "sha2", - "smallvec", - "thiserror 2.0.12", - "tokio", - "tokio-stream", - "tracing", - "url", - "uuid", - "webpki-roots 0.26.11", -] - -[[package]] -name = "sqlx-macros" -version = "0.8.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a2d452988ccaacfbf5e0bdbc348fb91d7c8af5bee192173ac3636b5fb6e6715d" -dependencies = [ - "proc-macro2", - "quote", - "sqlx-core", - "sqlx-macros-core", - "syn 2.0.104", -] - -[[package]] -name = "sqlx-macros-core" -version = "0.8.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "19a9c1841124ac5a61741f96e1d9e2ec77424bf323962dd894bdb93f37d5219b" -dependencies = [ - "dotenvy", - "either", - "heck 0.5.0", - "hex", - "once_cell", - "proc-macro2", - "quote", - "serde", - "serde_json", - "sha2", - "sqlx-core", - "sqlx-mysql", - "sqlx-postgres", - "sqlx-sqlite", - "syn 2.0.104", - "tokio", - "url", -] - -[[package]] -name = "sqlx-mysql" -version = "0.8.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aa003f0038df784eb8fecbbac13affe3da23b45194bd57dba231c8f48199c526" -dependencies = [ - "atoi", - "base64 0.22.1", - "bitflags 2.9.1", - "byteorder", - "bytes", - "chrono", - "crc", - "digest", - "dotenvy", - "either", - "futures-channel", - "futures-core", - "futures-io", - "futures-util", - "generic-array", - "hex", - "hkdf", - "hmac", - "itoa", - "log", - "md-5", - "memchr", - "once_cell", - "percent-encoding", - "rand 0.8.5", - "rsa", - "serde", - "sha1", - "sha2", - "smallvec", - "sqlx-core", - "stringprep", - "thiserror 2.0.12", - "tracing", - "uuid", - "whoami", -] - -[[package]] -name = "sqlx-postgres" -version = "0.8.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "db58fcd5a53cf07c184b154801ff91347e4c30d17a3562a635ff028ad5deda46" -dependencies = [ - "atoi", - "base64 0.22.1", - "bitflags 2.9.1", - "byteorder", - "chrono", - "crc", - "dotenvy", - "etcetera", - "futures-channel", - "futures-core", - "futures-util", - "hex", - "hkdf", - "hmac", - "home", - "itoa", - "log", - "md-5", - "memchr", - "once_cell", - "rand 0.8.5", - "serde", - "serde_json", - "sha2", - "smallvec", - "sqlx-core", - "stringprep", - "thiserror 2.0.12", - "tracing", - "uuid", - "whoami", -] - -[[package]] -name = "sqlx-sqlite" -version = "0.8.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c2d12fe70b2c1b4401038055f90f151b78208de1f9f89a7dbfd41587a10c3eea" -dependencies = [ - "atoi", - "chrono", - "flume", - "futures-channel", - "futures-core", - "futures-executor", - "futures-intrusive", - "futures-util", - "libsqlite3-sys", - "log", - "percent-encoding", - "serde", - "serde_urlencoded", - "sqlx-core", - "thiserror 2.0.12", - "tracing", - "url", - "uuid", -] - [[package]] name = "stable_deref_trait" version = "1.2.0" @@ -7946,15 +7381,10 @@ dependencies = [ ] [[package]] -name = "stringprep" -version = "0.1.5" +name = "static_assertions" +version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7b4df3d392d81bd458a8a621b8bffbd2302a12ffe288a9d931670948749463b1" -dependencies = [ - "unicode-bidi", - "unicode-normalization", - "unicode-properties", -] +checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" [[package]] name = "strsim" @@ -8730,12 +8160,6 @@ version = "2.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "75b844d17643ee918803943289730bec8aac480150456169e647ed0b576ba539" -[[package]] -name = "unicode-bidi" -version = "0.3.18" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5c1cb5db39152898a79168971543b1cb5020dff7fe43c8dc468b0885f5e29df5" - [[package]] name = "unicode-ident" version = "1.0.18" @@ -8748,21 +8172,6 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3b09c83c3c29d37506a3e260c08c03743a6bb66a9cd432c6934ab501a190571f" -[[package]] -name = "unicode-normalization" -version = "0.1.24" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5033c97c4262335cded6d6fc3e5c18ab755e1a3dc96376350f3d8e9f009ad956" -dependencies = [ - "tinyvec", -] - -[[package]] -name = "unicode-properties" -version = "0.1.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e70f2a8b45122e719eb623c01822704c4e0907e7e426a05927e1a1cfff5b75d0" - [[package]] name = "unicode-segmentation" version = "1.12.0" @@ -8987,12 +8396,6 @@ dependencies = [ "wit-bindgen-rt", ] -[[package]] -name = "wasite" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b8dad83b4f25e74f184f64c43b150b91efe7647395b42289f38e50566d82855b" - [[package]] name = "wasm-bindgen" version = "0.2.100" @@ -9064,6 +8467,19 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "wasm-streams" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "15053d8d85c7eccdbefef60f06769760a563c7f0a9d6902a13d35c7800b0ad65" +dependencies = [ + "futures-util", + "js-sys", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", +] + [[package]] name = "wasm-timer" version = "0.2.5" @@ -9148,16 +8564,6 @@ dependencies = [ "rustix 0.38.44", ] -[[package]] -name = "whoami" -version = "1.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6994d13118ab492c3c80c1f81928718159254c53c472bf9ce36f8dae4add02a7" -dependencies = [ - "redox_syscall 0.5.13", - "wasite", -] - [[package]] name = "wide" version = "0.7.33" diff --git a/core/server/Cargo.toml b/core/server/Cargo.toml index fa7fc48f0..2932faf09 100644 --- a/core/server/Cargo.toml +++ b/core/server/Cargo.toml @@ -36,6 +36,7 @@ mimalloc = ["dep:mimalloc"] [dependencies] ahash = { workspace = true } +aligned-vec = "0.6" anyhow = { workspace = true } async_zip = { workspace = true } axum = { workspace = true } diff --git a/core/server/src/binary/handlers/messages/send_messages_handler.rs b/core/server/src/binary/handlers/messages/send_messages_handler.rs index bb536c61e..6d155bcac 100644 --- a/core/server/src/binary/handlers/messages/send_messages_handler.rs +++ b/core/server/src/binary/handlers/messages/send_messages_handler.rs @@ -57,18 +57,19 @@ impl ServerCommandHandler for SendMessages { let total_payload_size = length as usize - std::mem::size_of::<u32>(); let metadata_len_field_size = std::mem::size_of::<u32>(); - let metadata_length_buffer = PooledBuffer::with_capacity(4); - let (result, metadata_len_buf) = sender.read(metadata_length_buffer.slice(0..4)).await; - let metadata_len_buf = metadata_len_buf.into_inner(); - result?; - let metadata_size = u32::from_le_bytes(metadata_len_buf[..].try_into().unwrap()); - - let metadata_buffer = PooledBuffer::with_capacity(metadata_size as usize); - let (result, metadata_buf) = sender + let mut metadata_length_buffer = PooledBuffer::with_capacity(4); + + let metadata_length_buffer = sender + .read(metadata_length_buffer.slice(0..4)) + .await? + .into_inner(); + let metadata_size = u32::from_le_bytes(metadata_length_buffer[0..4].try_into().unwrap()); + + let mut metadata_buffer = PooledBuffer::with_capacity(metadata_size as usize); + let metadata_buf = sender .read(metadata_buffer.slice(0..metadata_size as usize)) - .await; - result?; - let metadata_buf = metadata_buf.into_inner(); + .await? + .into_inner(); let mut element_size = 0; @@ -91,17 +92,20 @@ impl ServerCommandHandler for SendMessages { ); let indexes_size = messages_count as usize * INDEX_SIZE; - let indexes_buffer = PooledBuffer::with_capacity(indexes_size); - let (result, indexes_buffer) = sender.read(indexes_buffer.slice(0..indexes_size)).await; - result?; - let indexes_buffer = indexes_buffer.into_inner(); + let mut indexes_buffer = PooledBuffer::with_capacity(indexes_size + 512); // extra space for possible padding to not cause reallocations + let indexes_buffer = sender + .read(indexes_buffer.slice(0..indexes_size)) + .await? + .into_inner(); let messages_size = total_payload_size - metadata_size as usize - indexes_size - metadata_len_field_size; - let messages_buffer = PooledBuffer::with_capacity(messages_size); - let (result, messages_buffer) = sender.read(messages_buffer.slice(0..messages_size)).await; - result?; - let messages_buffer = messages_buffer.into_inner(); + + let mut messages_buffer = PooledBuffer::with_capacity(messages_size + 512); // extra space for possible padding to not cause reallocations + let messages_buffer = sender + .read(messages_buffer.slice(0..messages_size)) + .await? + .into_inner(); let indexes = IggyIndexesMut::from_bytes(indexes_buffer, 0); let batch = IggyMessagesBatchMut::from_indexes_and_messages( diff --git a/core/server/src/binary/handlers/utils.rs b/core/server/src/binary/handlers/utils.rs index 80bcede49..54411d912 100644 --- a/core/server/src/binary/handlers/utils.rs +++ b/core/server/src/binary/handlers/utils.rs @@ -32,8 +32,7 @@ pub async fn receive_and_validate( let buffer = if length == 0 { buffer } else { - let (result, buffer) = sender.read(buffer).await; - result?; + let buffer = sender.read(buffer).await?; buffer }; diff --git a/core/server/src/binary/sender.rs b/core/server/src/binary/sender.rs index 96bed3d5c..c6f1a8112 100644 --- a/core/server/src/binary/sender.rs +++ b/core/server/src/binary/sender.rs @@ -56,10 +56,7 @@ macro_rules! forward_async_methods { } pub trait Sender { - fn read<B: IoBufMut>( - &mut self, - buffer: B, - ) -> impl Future<Output = (Result<usize, IggyError>, B)>; + fn read<B: IoBufMut>(&mut self, buffer: B) -> impl Future<Output = Result<B, IggyError>>; fn send_empty_ok_response(&mut self) -> impl Future<Output = Result<(), IggyError>>; fn send_ok_response(&mut self, payload: &[u8]) -> impl Future<Output = Result<(), IggyError>>; fn send_ok_response_vectored( @@ -98,7 +95,7 @@ impl SenderKind { } forward_async_methods! { - async fn read<B: IoBufMut>(&mut self, buffer: B) -> (Result<usize, IggyError>, B); + async fn read<B: IoBufMut>(&mut self, buffer: B) -> Result<B, IggyError>; async fn send_empty_ok_response(&mut self) -> Result<(), IggyError>; async fn send_ok_response(&mut self, payload: &[u8]) -> Result<(), IggyError>; async fn send_ok_response_vectored(&mut self, length: &[u8], slices: Vec<PooledBuffer>) -> Result<(), IggyError>; diff --git a/core/server/src/main.rs b/core/server/src/main.rs index b1481c228..c9798aaae 100644 --- a/core/server/src/main.rs +++ b/core/server/src/main.rs @@ -58,6 +58,15 @@ use tracing::{error, info, instrument}; const COMPONENT: &str = "MAIN"; +fn thread_print_memory_pool_stats() { + std::thread::sleep(std::time::Duration::from_secs(5)); + let pool = server::streaming::utils::memory_pool(); + loop { + pool.log_stats(); + std::thread::sleep(std::time::Duration::from_secs(5)); + } +} + #[instrument(skip_all, name = "trace_start_server")] fn main() -> Result<(), ServerError> { let startup_timestamp = Instant::now(); @@ -312,6 +321,8 @@ fn main() -> Result<(), ServerError> { .expect("Error setting Ctrl-C handler"); */ + std::thread::spawn(thread_print_memory_pool_stats); + info!("Iggy server is running. Press Ctrl+C or send SIGTERM to shutdown."); for (idx, handle) in handles.into_iter().enumerate() { info!("Waiting for shard thread {} to complete...", idx); diff --git a/core/server/src/quic/quic_sender.rs b/core/server/src/quic/quic_sender.rs index 54bda604e..30c4a6a7f 100644 --- a/core/server/src/quic/quic_sender.rs +++ b/core/server/src/quic/quic_sender.rs @@ -37,7 +37,7 @@ pub struct QuicSender { } impl Sender for QuicSender { - async fn read<B: IoBufMut>(&mut self, buffer: B) -> (Result<usize, IggyError>, B) { + async fn read<B: IoBufMut>(&mut self, buffer: B) -> Result<B, IggyError> { //TODO: Fixme // Not-so-nice code because quinn recv stream has different API for read_exact /* diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs index 43280c777..019358680 100644 --- a/core/server/src/shard/mod.rs +++ b/core/server/src/shard/mod.rs @@ -283,16 +283,6 @@ impl IggyShard { } async fn load_version(&self) -> Result<(), IggyError> { - async fn update_system_info( - storage: &Rc<SystemStorage>, - system_info: &mut SystemInfo, - version: &SemanticVersion, - ) -> Result<(), IggyError> { - system_info.update_version(version); - storage.info.save(system_info).await?; - Ok(()) - } - let current_version = &self.version; let mut system_info; let load_system_info = self.storage.info.load().await; @@ -301,7 +291,7 @@ impl IggyShard { if let IggyError::ResourceNotFound(_) = error { info!("System info not found, creating..."); system_info = SystemInfo::default(); - update_system_info(&self.storage, &mut system_info, current_version).await?; + Self::update_system_info(&self.storage, &mut system_info, current_version).await?; } else { return Err(error); } @@ -310,24 +300,34 @@ impl IggyShard { } info!("Loaded {system_info}."); - let loaded_version = SemanticVersion::from_str(&system_info.version.version)?; + let loaded_version = SemanticVersion::from_str(&system_info.version.version).unwrap(); if current_version.is_equal_to(&loaded_version) { info!("System version {current_version} is up to date."); } else if current_version.is_greater_than(&loaded_version) { info!( "System version {current_version} is greater than {loaded_version}, checking the available migrations..." ); - update_system_info(&self.storage, &mut system_info, current_version).await?; + Self::update_system_info(&self.storage, &mut system_info, current_version).await?; } else { info!( "System version {current_version} is lower than {loaded_version}, possible downgrade." ); - update_system_info(&self.storage, &mut system_info, current_version).await?; + Self::update_system_info(&self.storage, &mut system_info, current_version).await?; } Ok(()) } + async fn update_system_info( + storage: &Rc<SystemStorage>, + system_info: &mut SystemInfo, + version: &SemanticVersion, + ) -> Result<(), IggyError> { + system_info.update_version(version); + storage.info.save(system_info).await?; + Ok(()) + } + async fn load_state(&self) -> Result<SystemState, IggyError> { let state_entries = self.state.init().await.with_error_context(|error| { format!("{COMPONENT} (error: {error}) - failed to initialize state entries") diff --git a/core/server/src/shard/system/storage.rs b/core/server/src/shard/system/storage.rs index 736d8e488..ac4aca3ea 100644 --- a/core/server/src/shard/system/storage.rs +++ b/core/server/src/shard/system/storage.rs @@ -65,7 +65,7 @@ impl SystemInfoStorage for FileSystemInfoStorage { let file = file::open(&self.path) .await .map_err(|_| IggyError::CannotReadFile)?; - let buffer = PooledBuffer::with_capacity(file_size); + let mut buffer = PooledBuffer::with_capacity(file_size); let (result, buffer) = file .read_exact_at(buffer.slice(0..file_size), 0) .await diff --git a/core/server/src/streaming/segments/direct_file.rs b/core/server/src/streaming/segments/direct_file.rs new file mode 100644 index 000000000..ce1ecb788 --- /dev/null +++ b/core/server/src/streaming/segments/direct_file.rs @@ -0,0 +1,443 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use crate::streaming::utils::{ALIGNMENT, PooledBuffer}; +use compio::fs::{File, OpenOptions}; +use compio::io::AsyncWriteAtExt; +use error_set::ErrContext; +use iggy_common::IggyError; + +#[derive(Debug)] +pub struct DirectFile { + file_path: String, + file: File, + file_position: u64, + tail: PooledBuffer, + tail_len: usize, +} + +impl DirectFile { + pub async fn open( + file_path: &str, + initial_position: u64, + file_exists: bool, + ) -> Result<Self, IggyError> { + let mut file = OpenOptions::new() + .create(true) + .write(true) + .custom_flags(0x4000) + .open(file_path) + .await + .with_error_context(|err| { + format!("Failed to open file with O_DIRECT: {file_path}, error: {err}") + }) + .map_err(|_| IggyError::CannotReadFile)?; + + if !file_exists { + let init_buffer = PooledBuffer::with_capacity(ALIGNMENT); + let (write_result, _) = file.write_all_at(init_buffer, 0).await.into(); + write_result + .with_error_context(|error| { + tracing::error!( + "Failed to initialize file with dummy block: {file_path}, error: {error}" + ); + format!( + "Failed to initialize file with dummy block: {file_path}, error: {error}" + ) + }) + .map_err(|_| IggyError::CannotWriteToFile)?; + + tracing::trace!("Successfully initialized new file with dummy block: {file_path}"); + } + + tracing::trace!( + "Successfully opened DirectFile: {}, position: {}, exists: {}", + file_path, + initial_position, + file_exists + ); + + Ok(Self { + file_path: file_path.to_string(), + file, + file_position: initial_position, + tail: PooledBuffer::with_capacity(ALIGNMENT), + tail_len: 0, + }) + } + + pub async fn get_file_size(&self) -> Result<u64, IggyError> { + self.file + .metadata() + .await + .with_error_context(|error| { + format!( + "Failed to get metadata of file: {}, error: {error}", + self.file_path + ) + }) + .map_err(|_| IggyError::CannotReadFileMetadata) + .map(|metadata| metadata.len()) + } + + fn new(file: File, file_path: String, initial_position: u64) -> Self { + Self { + file_path, + file, + file_position: initial_position, + tail: PooledBuffer::with_capacity(ALIGNMENT), + tail_len: 0, + } + } + + pub async fn write_all(&mut self, mut data: &[u8]) -> Result<usize, IggyError> { + let initial_len = data.len(); + tracing::trace!( + "DirectFile write_all called for file: {}, data_len: {}, position: {}, tail_len: {}", + self.file_path, + initial_len, + self.file_position, + self.tail_len + ); + + if self.tail_len > 0 { + let need = ALIGNMENT - self.tail_len; + let take = need.min(data.len()); + self.tail.extend_from_slice(&data[..take]); + self.tail_len += take; + data = &data[take..]; + + if self.tail_len == ALIGNMENT { + self.flush_tail().await?; + } + } + + if !data.is_empty() { + let whole_sectors_end = data.len() & !(ALIGNMENT - 1); + if whole_sectors_end > 0 { + let whole_sectors = &data[..whole_sectors_end]; + let mut written = 0; + + while written < whole_sectors.len() { + let chunk_size = (whole_sectors.len() - written).min(128 * 1024 * 1024); + let chunk = &whole_sectors[written..written + chunk_size]; + + let chunk_buffer = PooledBuffer::from(chunk); + + let (result, _) = self + .file + .write_all_at(chunk_buffer, self.file_position) + .await + .into(); + + result.map_err(|e| { + tracing::error!("Failed to write to direct file: {} at position {}, chunk size: {}, error: {}", + self.file_path, self.file_position, chunk_size, e); + IggyError::CannotWriteToFile + })?; + + self.file_position += chunk_size as u64; + written += chunk_size; + } + + data = &data[whole_sectors_end..]; + } + } + + if !data.is_empty() { + self.tail.clear(); + self.tail.extend_from_slice(data); + self.tail_len = data.len(); + } + + Ok(initial_len) + } + + pub async fn flush(&mut self) -> Result<(), IggyError> { + if self.tail_len > 0 { + self.tail.resize(ALIGNMENT, 0); + self.flush_tail().await?; + } + Ok(()) + } + + pub fn position(&self) -> u64 { + self.file_position + } + + pub fn tail_len(&self) -> usize { + self.tail_len + } + + pub fn file_path(&self) -> &str { + &self.file_path + } + + pub fn tail_buffer(&self) -> &PooledBuffer { + &self.tail + } + + pub fn take_tail(&mut self) -> (PooledBuffer, usize) { + let tail = std::mem::replace(&mut self.tail, PooledBuffer::with_capacity(ALIGNMENT)); + let tail_len = self.tail_len; + self.tail_len = 0; + (tail, tail_len) + } + + pub fn set_tail(&mut self, tail: PooledBuffer, tail_len: usize) { + self.tail = tail; + self.tail_len = tail_len; + } + + async fn flush_tail(&mut self) -> Result<(), IggyError> { + assert_eq!(self.tail.len(), ALIGNMENT); + + let tail_buffer = std::mem::replace(&mut self.tail, PooledBuffer::with_capacity(ALIGNMENT)); + + let (result, returned_buf) = self + .file + .write_all_at(tail_buffer, self.file_position) + .await + .into(); + + result.map_err(|e| { + tracing::error!( + "Failed to flush tail for file: {} at position {}, tail size: {}, error: {}", + self.file_path, + self.file_position, + ALIGNMENT, + e + ); + IggyError::CannotWriteToFile + })?; + + self.file_position += ALIGNMENT as u64; + self.tail_len = 0; + self.tail = returned_buf; + self.tail.clear(); + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::configs::system::SystemConfig; + use crate::streaming::utils::MemoryPool; + use compio::fs::OpenOptions; + use compio::io::{AsyncReadAt, AsyncReadAtExt}; + use std::sync::Arc; + use tempfile::tempdir; + + #[test] + fn test_direct_file_small_writes() { + compio::runtime::Runtime::new().unwrap().block_on(async { + MemoryPool::init_pool(Arc::new(SystemConfig::default())); + let temp_dir = tempdir().unwrap(); + let file_path = temp_dir.path().join("test_direct_io.bin"); + + let mut direct_file = DirectFile::open(file_path.to_str().unwrap(), 0, false) + .await + .unwrap(); + + for i in 0..10u64 { + let buf = i.to_le_bytes(); + direct_file.write_all(&buf).await.unwrap(); + } + + direct_file.flush().await.unwrap(); + + let file = OpenOptions::new() + .read(true) + .custom_flags(0x4000) + .open(&file_path) + .await + .unwrap(); + + let mut read_buffer = vec![0u8; 512]; + let (result, buf) = file.read_at(read_buffer, 0).await.into(); + result.unwrap(); + read_buffer = buf; + + for i in 0..10u64 { + let start = i as usize * 8; + let num = u64::from_le_bytes(read_buffer[start..start + 8].try_into().unwrap()); + assert_eq!(num, i, "Expected {} at position {}, got {}", i, start, num); + } + }); + } + + #[test] + fn test_direct_file_exact_sector_write() { + compio::runtime::Runtime::new().unwrap().block_on(async { + MemoryPool::init_pool(Arc::new(SystemConfig::default())); + let temp_dir = tempdir().unwrap(); + let file_path = temp_dir.path().join("test_exact_sector.bin"); + + let mut direct_file = DirectFile::open(file_path.to_str().unwrap(), 0, false) + .await + .unwrap(); + + let data = vec![42u8; ALIGNMENT]; + direct_file.write_all(&data).await.unwrap(); + + assert_eq!(direct_file.tail_len(), 0); + assert_eq!(direct_file.position(), ALIGNMENT as u64); + + let file = OpenOptions::new() + .read(true) + .custom_flags(0x4000) + .open(&file_path) + .await + .unwrap(); + + let mut read_buffer = vec![0u8; ALIGNMENT]; + let (result, buf) = file.read_at(read_buffer, 0).await.into(); + result.unwrap(); + read_buffer = buf; + + assert_eq!(read_buffer, vec![42u8; ALIGNMENT]); + }); + } + + #[test] + fn test_direct_file_multiple_sector_writes() { + compio::runtime::Runtime::new().unwrap().block_on(async { + MemoryPool::init_pool(Arc::new(SystemConfig::default())); + let temp_dir = tempdir().unwrap(); + let file_path = temp_dir.path().join("test_multiple_sectors.bin"); + + let mut direct_file = DirectFile::open(file_path.to_str().unwrap(), 0, false) + .await + .unwrap(); + + let data1 = vec![1u8; ALIGNMENT * 2]; + let data2 = vec![2u8; ALIGNMENT * 3]; + let data3 = vec![3u8; ALIGNMENT]; + + direct_file.write_all(&data1).await.unwrap(); + direct_file.write_all(&data2).await.unwrap(); + direct_file.write_all(&data3).await.unwrap(); + + let file = OpenOptions::new() + .read(true) + .custom_flags(0x4000) + .open(&file_path) + .await + .unwrap(); + + let mut read_buffer = vec![0u8; ALIGNMENT * 6]; + let (result, buf) = file.read_at(read_buffer, 0).await.into(); + result.unwrap(); + read_buffer = buf; + + assert_eq!(&read_buffer[0..ALIGNMENT * 2], &vec![1u8; ALIGNMENT * 2]); + assert_eq!( + &read_buffer[ALIGNMENT * 2..ALIGNMENT * 5], + &vec![2u8; ALIGNMENT * 3] + ); + assert_eq!( + &read_buffer[ALIGNMENT * 5..ALIGNMENT * 6], + &vec![3u8; ALIGNMENT] + ); + }); + } + + #[test] + fn test_direct_file_unaligned_write() { + compio::runtime::Runtime::new().unwrap().block_on(async { + MemoryPool::init_pool(Arc::new(SystemConfig::default())); + let temp_dir = tempdir().unwrap(); + let file_path = temp_dir.path().join("test_unaligned.bin"); + + let mut direct_file = DirectFile::open(file_path.to_str().unwrap(), 0, false) + .await + .unwrap(); + + let data = vec![77u8; 1000]; + direct_file.write_all(&data).await.unwrap(); + + assert_eq!(direct_file.tail_len(), 1000 % ALIGNMENT); + assert_eq!(direct_file.position(), ALIGNMENT as u64); + + direct_file.flush().await.unwrap(); + + assert_eq!(direct_file.tail_len(), 0); + assert_eq!(direct_file.position(), (ALIGNMENT * 2) as u64); + + let file = OpenOptions::new() + .read(true) + .custom_flags(0x4000) + .open(&file_path) + .await + .unwrap(); + + let mut read_buffer = vec![0u8; ALIGNMENT * 2]; + let (result, buf) = file.read_at(read_buffer, 0).await.into(); + result.unwrap(); + read_buffer = buf; + + assert_eq!(&read_buffer[0..1000], &vec![77u8; 1000]); + assert_eq!( + &read_buffer[1000..ALIGNMENT * 2], + &vec![0u8; ALIGNMENT * 2 - 1000] + ); + }); + } + + #[test] + fn test_direct_file_cross_sector_boundary() { + compio::runtime::Runtime::new().unwrap().block_on(async { + MemoryPool::init_pool(Arc::new(SystemConfig::default())); + let temp_dir = tempdir().unwrap(); + let file_path = temp_dir.path().join("test_cross_boundary.bin"); + + let mut direct_file = DirectFile::open(file_path.to_str().unwrap(), 0, false) + .await + .unwrap(); + + let data1 = vec![1u8; 400]; + let data2 = vec![2u8; 200]; + let data3 = vec![3u8; 100]; + + direct_file.write_all(&data1).await.unwrap(); + direct_file.write_all(&data2).await.unwrap(); + direct_file.write_all(&data3).await.unwrap(); + + assert_eq!(direct_file.tail_len(), 700 % ALIGNMENT); + + direct_file.flush().await.unwrap(); + + let file = OpenOptions::new() + .read(true) + .custom_flags(0x4000) + .open(&file_path) + .await + .unwrap(); + + let mut read_buffer = vec![0u8; ALIGNMENT * 2]; + let (result, buf) = file.read_at(read_buffer, 0).await.into(); + result.unwrap(); + read_buffer = buf; + + assert_eq!(&read_buffer[0..400], &vec![1u8; 400]); + assert_eq!(&read_buffer[400..600], &vec![2u8; 200]); + assert_eq!(&read_buffer[600..700], &vec![3u8; 100]); + }); + } +} diff --git a/core/server/src/streaming/segments/indexes/index_reader.rs b/core/server/src/streaming/segments/indexes/index_reader.rs index 48ed12b36..f0a589331 100644 --- a/core/server/src/streaming/segments/indexes/index_reader.rs +++ b/core/server/src/streaming/segments/indexes/index_reader.rs @@ -339,24 +339,15 @@ impl IndexReader { len: u32, use_pool: bool, ) -> Result<PooledBuffer, std::io::Error> { - if use_pool { - let len = len as usize; - let buf = PooledBuffer::with_capacity(len as usize); - let (result, buf) = self - .file - .read_exact_at(buf.slice(..len), offset as u64) - .await - .into(); - let buf = buf.into_inner(); - result?; - Ok(buf) - } else { - let mut buf = BytesMut::with_capacity(len as usize); - unsafe { buf.set_len(len as usize) }; - let (result, buf) = self.file.read_exact_at(buf, offset as u64).await.into(); - result?; - Ok(PooledBuffer::from_existing(buf)) - } + let mut buf = PooledBuffer::with_capacity(len as usize); + let (result, buf) = self + .file + .read_exact_at(buf.slice(0..len as usize), offset as u64) + .await + .into(); + result?; + + Ok(buf.into_inner()) } /// Gets the nth index from the index file. diff --git a/core/server/src/streaming/segments/indexes/index_writer.rs b/core/server/src/streaming/segments/indexes/index_writer.rs index 416cde219..782e633e1 100644 --- a/core/server/src/streaming/segments/indexes/index_writer.rs +++ b/core/server/src/streaming/segments/indexes/index_writer.rs @@ -16,25 +16,20 @@ * under the License. */ -use compio::fs::File; -use compio::fs::OpenOptions; -use compio::io::AsyncWriteAtExt; +use crate::streaming::segments::DirectFile; +use crate::streaming::utils::PooledBuffer; use error_set::ErrContext; -use iggy_common::INDEX_SIZE; -use iggy_common::IggyError; +use iggy_common::{INDEX_SIZE, IggyError}; use std::sync::{ Arc, atomic::{AtomicU64, Ordering}, }; use tracing::trace; -use crate::streaming::utils::PooledBuffer; - /// A dedicated struct for writing to the index file. #[derive(Debug)] pub struct IndexWriter { - file_path: String, - file: File, + direct_file: DirectFile, index_size_bytes: Arc<AtomicU64>, fsync: bool, } @@ -47,39 +42,34 @@ impl IndexWriter { fsync: bool, file_exists: bool, ) -> Result<Self, IggyError> { - let file = OpenOptions::new() - .create(true) - .write(true) - .open(file_path) - .await - .with_error_context(|error| format!("Failed to open index file: {file_path}. {error}")) - .map_err(|_| IggyError::CannotReadFile)?; + let file_position = if file_exists { + let current_size = index_size_bytes.load(Ordering::Acquire); + (current_size + 511) & !511 + } else { + index_size_bytes.store(0, Ordering::Release); + 0 + }; - if file_exists { - let _ = file.sync_all().await.with_error_context(|error| { - format!("Failed to fsync index file after creation: {file_path}. {error}",) - }); + trace!( + "Opening index file for writing: {file_path}, file_position: {}", + file_position + ); - let actual_index_size = file - .metadata() - .await - .with_error_context(|error| { - format!("Failed to get metadata of index file: {file_path}. {error}") - }) - .map_err(|_| IggyError::CannotReadFileMetadata)? - .len(); + let mut direct_file = DirectFile::open(file_path, file_position, file_exists).await?; + if file_exists { + let actual_index_size = direct_file.get_file_size().await?; index_size_bytes.store(actual_index_size, Ordering::Release); - } - trace!( - "Opened index file for writing: {file_path}, size: {}", - index_size_bytes.load(Ordering::Acquire) - ); + trace!( + "Opened existing index file: {file_path}, size: {}, file_position: {}", + actual_index_size, + direct_file.position() + ); + } Ok(Self { - file_path: file_path.to_string(), - file, + direct_file, index_size_bytes, fsync, }) @@ -92,44 +82,33 @@ impl IndexWriter { } let count = indexes.len() / INDEX_SIZE; - let len = indexes.len(); + let actual_len = indexes.len(); - let position = self.index_size_bytes.load(Ordering::Relaxed); - self.file - .write_all_at(indexes, position) + trace!( + "Saving {count} indexes to file: {} (size: {} bytes)", + self.direct_file.file_path(), + actual_len + ); + + let bytes_written = self + .direct_file + .write_all(&indexes) .await - .0 .with_error_context(|error| { format!( "Failed to write {} indexes to file: {}. {error}", - count, self.file_path + count, + self.direct_file.file_path() ) }) .map_err(|_| IggyError::CannotSaveIndexToSegment)?; + let new_logical_size = self.index_size_bytes.load(Ordering::Relaxed) + bytes_written as u64; self.index_size_bytes - .fetch_add(len as u64, Ordering::Release); - - if self.fsync { - let _ = self.fsync().await; - } - trace!( - "Saved {count} indexes of size {} to file: {}", - INDEX_SIZE * count, - self.file_path - ); + .store(new_logical_size, Ordering::Release); - Ok(()) - } + trace!("Saved {count} indexes. Logical size: {}", new_logical_size); - pub async fn fsync(&self) -> Result<(), IggyError> { - self.file - .sync_all() - .await - .with_error_context(|error| { - format!("Failed to fsync index file: {}. {error}", self.file_path) - }) - .map_err(|_| IggyError::CannotWriteToFile)?; Ok(()) } } diff --git a/core/server/src/streaming/segments/messages/messages_reader.rs b/core/server/src/streaming/segments/messages/messages_reader.rs index bd318fc9c..5ad235022 100644 --- a/core/server/src/streaming/segments/messages/messages_reader.rs +++ b/core/server/src/streaming/segments/messages/messages_reader.rs @@ -177,23 +177,13 @@ impl MessagesReader { len: u32, use_pool: bool, ) -> Result<PooledBuffer, std::io::Error> { - if use_pool { - let mut buf = PooledBuffer::with_capacity(len as usize); - let len = len as usize; - let (result, buf) = self - .file - .read_exact_at(buf.slice(..len), offset as u64) - .await - .into(); - let buf = buf.into_inner(); - result?; - Ok(buf) - } else { - let mut buf = BytesMut::with_capacity(len as usize); - unsafe { buf.set_len(len as usize) }; - let (result, buf) = self.file.read_exact_at(buf, offset as u64).await.into(); - result?; - Ok(PooledBuffer::from_existing(buf)) - } + let mut buf = PooledBuffer::with_capacity(len as usize); + let (result, buf) = self + .file + .read_exact_at(buf.slice(0..len as usize), offset as u64) + .await + .into(); + result?; + Ok(buf.into_inner()) } } diff --git a/core/server/src/streaming/segments/messages/messages_writer.rs b/core/server/src/streaming/segments/messages/messages_writer.rs index f3d43d1e5..ad6a09680 100644 --- a/core/server/src/streaming/segments/messages/messages_writer.rs +++ b/core/server/src/streaming/segments/messages/messages_writer.rs @@ -16,8 +16,9 @@ * under the License. */ -use crate::streaming::segments::{IggyMessagesBatchSet, messages::write_batch}; -use compio::fs::{File, OpenOptions}; +use crate::streaming::segments::{ + DirectFile, IggyMessagesBatchSet, messages::write_batch_with_direct_file, +}; use error_set::ErrContext; use iggy_common::{IggyByteSize, IggyError}; use std::sync::{ @@ -29,68 +30,51 @@ use tracing::{error, trace}; /// A dedicated struct for writing to the messages file. #[derive(Debug)] pub struct MessagesWriter { - file_path: String, - /// Holds the file for synchronous writes; when asynchronous persistence is enabled, this will be None. - file: Option<File>, - /// When set, asynchronous writes are handled by this persister task. + direct_file: Option<DirectFile>, messages_size_bytes: Arc<AtomicU64>, fsync: bool, } impl MessagesWriter { - /// Opens the messages file in write mode. - /// - /// If the server confirmation is set to `NoWait`, the file handle is transferred to the - /// persister task (and stored in `persister_task`) so that writes are done asynchronously. - /// Otherwise, the file is retained in `self.file` for synchronous writes. pub async fn new( file_path: &str, messages_size_bytes: Arc<AtomicU64>, fsync: bool, file_exists: bool, ) -> Result<Self, IggyError> { - let file = OpenOptions::new() - .create(true) - .write(true) - .open(file_path) - .await - .with_error_context(|err| { - format!("Failed to open messages file: {file_path}, error: {err}") - }) - .map_err(|_| IggyError::CannotReadFile)?; + let file_position = if file_exists { + let current_size = messages_size_bytes.load(Ordering::Acquire); + (current_size + 511) & !511 + } else { + messages_size_bytes.store(0, Ordering::Release); + 0 + }; - if file_exists { - let _ = file.sync_all().await.with_error_context(|error| { - format!("Failed to fsync messages file after creation: {file_path}, error: {error}") - }); + trace!( + "Opening messages file for writing: {file_path}, file_position: {}", + file_position + ); - let actual_messages_size = file - .metadata() - .await - .with_error_context(|error| { - format!("Failed to get metadata of messages file: {file_path}, error: {error}") - }) - .map_err(|_| IggyError::CannotReadFileMetadata)? - .len(); + let mut direct_file = DirectFile::open(file_path, file_position, file_exists).await?; + if file_exists { + let actual_messages_size = direct_file.get_file_size().await?; messages_size_bytes.store(actual_messages_size, Ordering::Release); - } - trace!( - "Opened messages file for writing: {file_path}, size: {}", - messages_size_bytes.load(Ordering::Acquire) - ); + trace!( + "Opened existing messages file: {file_path}, size: {}, file_position: {}", + actual_messages_size, + direct_file.position() + ); + } - let file = Some(file); Ok(Self { - file_path: file_path.to_string(), - file, + direct_file: Some(direct_file), messages_size_bytes, fsync, }) } - /// Append a batch of messages to the messages file. pub async fn save_batch_set( &mut self, batch_set: IggyMessagesBatchSet, @@ -98,49 +82,34 @@ impl MessagesWriter { let messages_size = batch_set.size(); let messages_count = batch_set.count(); let containers_count = batch_set.containers_count(); - trace!( - "Saving batch set of size {messages_size} bytes ({containers_count} containers, {messages_count} messages) to messages file: {}", - self.file_path - ); - let position = self.messages_size_bytes.load(Ordering::Relaxed); - if let Some(ref mut file) = self.file { - write_batch(file, position, batch_set) + let actual_written = if let Some(ref mut direct_file) = self.direct_file { + trace!( + "Saving batch set of size {messages_size} bytes ({containers_count} containers, {messages_count} messages) to messages file: {}", + direct_file.file_path() + ); + + write_batch_with_direct_file(direct_file, batch_set) .await .with_error_context(|error| { format!( "Failed to write batch to messages file: {}. {error}", - self.file_path + direct_file.file_path() ) - })?; + })? } else { - error!("File handle is not available for synchronous write."); + tracing::error!("File handle is not available for synchronous write."); return Err(IggyError::CannotWriteToFile); - } - - if self.fsync { - let _ = self.fsync().await; - } + }; + let logical_size = self.messages_size_bytes.load(Ordering::Relaxed) + actual_written as u64; self.messages_size_bytes - .fetch_add(messages_size as u64, Ordering::Release); + .store(logical_size, Ordering::Release); + trace!( - "Written batch set of size {messages_size} bytes ({containers_count} containers, {messages_count} messages) to disk messages file: {}", - self.file_path + "Written batch set of size {messages_size} bytes to disk. Logical size: {}", + logical_size ); Ok(IggyByteSize::from(messages_size as u64)) } - - pub async fn fsync(&self) -> Result<(), IggyError> { - if let Some(file) = self.file.as_ref() { - file.sync_all() - .await - .with_error_context(|error| { - format!("Failed to fsync messages file: {}. {error}", self.file_path) - }) - .map_err(|_| IggyError::CannotWriteToFile)?; - } - - Ok(()) - } } diff --git a/core/server/src/streaming/segments/messages/mod.rs b/core/server/src/streaming/segments/messages/mod.rs index a3989a2eb..537ecb78c 100644 --- a/core/server/src/streaming/segments/messages/mod.rs +++ b/core/server/src/streaming/segments/messages/mod.rs @@ -19,25 +19,27 @@ mod messages_reader; mod messages_writer; -use super::IggyMessagesBatchSet; -use compio::{fs::File, io::AsyncWriteAtExt}; +use super::{DirectFile, IggyMessagesBatchSet}; +use crate::streaming::utils::PooledBuffer; use iggy_common::IggyError; pub use messages_reader::MessagesReader; pub use messages_writer::MessagesWriter; -/// Vectored write a batches of messages to file -async fn write_batch( - file: &mut File, - position: u64, +async fn write_batch_with_direct_file( + direct_file: &mut DirectFile, mut batches: IggyMessagesBatchSet, ) -> Result<usize, IggyError> { - let total_written = batches.iter().map(|b| b.size() as usize).sum(); - let batches = batches - .iter_mut() - .map(|b| b.take_messages()) - .collect::<Vec<_>>(); - let (result, _) = file.write_vectored_all_at(batches, position).await.into(); - result.map_err(|_| IggyError::CannotWriteToFile)?; + let total_written: usize = batches.iter().map(|b| b.size() as usize).sum(); + let mut messages_count = 0; + + for batch in batches.iter_mut() { + messages_count += batch.count(); + let messages = batch.take_messages(); + direct_file.write_all(&messages).await?; + } + + tracing::trace!("Saved {} messages", messages_count); + Ok(total_written) } diff --git a/core/server/src/streaming/segments/mod.rs b/core/server/src/streaming/segments/mod.rs index f3105e547..9fbd4abe6 100644 --- a/core/server/src/streaming/segments/mod.rs +++ b/core/server/src/streaming/segments/mod.rs @@ -16,6 +16,7 @@ * under the License. */ +mod direct_file; mod indexes; mod messages; mod messages_accumulator; @@ -24,6 +25,7 @@ mod segment; mod types; mod writing_messages; +pub use direct_file::DirectFile; pub use indexes::IggyIndexesMut; pub use messages_accumulator::MessagesAccumulator; pub use segment::Segment; diff --git a/core/server/src/streaming/segments/segment.rs b/core/server/src/streaming/segments/segment.rs index a3ac8588f..85dc3abb8 100644 --- a/core/server/src/streaming/segments/segment.rs +++ b/core/server/src/streaming/segments/segment.rs @@ -92,6 +92,7 @@ impl Segment { IggyExpiry::ServerDefault => config.segment.message_expiry, _ => message_expiry, }; + Segment { stream_id, topic_id, @@ -150,22 +151,16 @@ impl Segment { .with_error_context(|error| format!("Failed to load indexes for {self}. {error}")) .map_err(|_| IggyError::CannotReadFile)?; - info!( - "Loaded {} indexes for segment with start offset: {}, end offset: {}, and partition with ID: {}, topic with ID: {}, and stream with ID: {}.", - self.indexes.as_ref().map_or(0, |idx| idx.count()), - self.start_offset, - self.end_offset, - self.partition_id, - self.topic_id, - self.stream_id - ); - let last_index_offset = if loaded_indexes.is_empty() { 0_u64 } else { loaded_indexes.last().unwrap().offset() as u64 }; + if !loaded_indexes.is_empty() { + self.indexes = Some(loaded_indexes); + } + self.end_offset = self.start_offset + last_index_offset; info!( @@ -306,29 +301,8 @@ impl Segment { } pub async fn shutdown_writing(&mut self) { - if let Some(log_writer) = self.messages_writer.take() { - //TODO: Fixme not sure whether we should spawn a task here. - compio::runtime::spawn(async move { - let _ = log_writer.fsync().await; - }) - .detach(); - } else { - warn!( - "Log writer already closed when calling close() for {}", - self - ); - } - - if let Some(index_writer) = self.index_writer.take() { - //TODO: Fixme not sure whether we should spawn a task here. - compio::runtime::spawn(async move { - let _ = index_writer.fsync().await; - drop(index_writer) - }) - .detach(); - } else { - warn!("Index writer already closed when calling close()"); - } + let _ = self.messages_writer.take().map(|mut writer| {}); + let _ = self.index_writer.take().map(|mut writer| {}); } pub async fn delete(&mut self) -> Result<(), IggyError> { diff --git a/core/server/src/streaming/segments/types/messages_batch_mut.rs b/core/server/src/streaming/segments/types/messages_batch_mut.rs index 764faa112..7152f53d5 100644 --- a/core/server/src/streaming/segments/types/messages_batch_mut.rs +++ b/core/server/src/streaming/segments/types/messages_batch_mut.rs @@ -488,7 +488,7 @@ impl IggyMessagesBatchMut { /// subsequent messages in the new buffer. #[allow(clippy::too_many_arguments)] fn rebuild_indexes_for_chunk( - new_buffer: &BytesMut, + new_buffer: &PooledBuffer, new_indexes: &mut IggyIndexesMut, offset_in_new_buffer: &mut u32, chunk_start: usize, @@ -571,7 +571,7 @@ impl IggyMessagesBatchMut { for &(start, end) in &boundaries_to_remove { if start > last_pos { - let keep_len = start - last_pos; + let keep_len: usize = start - last_pos; let chunk = source.split_to(keep_len); let chunk_start_in_new_buffer = new_buffer.len(); new_buffer.put(chunk); @@ -684,7 +684,8 @@ impl IggyMessagesBatchMut { prev_offset = message.header().offset(); prev_position = index.position(); - messages_size += message.size(); + let msg_size = message.size(); + messages_size += msg_size; messages_count += 1; } @@ -822,11 +823,3 @@ impl Index<usize> for IggyMessagesBatchMut { &self.messages[start..end] } } - -impl Deref for IggyMessagesBatchMut { - type Target = BytesMut; - - fn deref(&self) -> &Self::Target { - &self.messages - } -} diff --git a/core/server/src/streaming/segments/writing_messages.rs b/core/server/src/streaming/segments/writing_messages.rs index e2b0fbb99..0d9b9f123 100644 --- a/core/server/src/streaming/segments/writing_messages.rs +++ b/core/server/src/streaming/segments/writing_messages.rs @@ -113,14 +113,20 @@ impl Segment { self.indexes.as_mut().unwrap().mark_saved(); if self.config.segment.cache_indexes == CacheIndexesConfig::None { - self.indexes.as_mut().unwrap().clear(); + if let Some(indexes) = self.indexes.as_mut() { + indexes.clear(); + } } self.check_and_handle_segment_full().await?; trace!( - "Saved {} messages on disk in segment with start offset: {} for partition with ID: {}, total bytes written: {}.", - unsaved_messages_count, self.start_offset, self.partition_id, saved_bytes + "Saved {} messages on disk in segment with start offset: {}, end offset: {}, for partition with ID: {}, total bytes written: {}.", + unsaved_messages_count, + self.start_offset, + self.end_offset, + self.partition_id, + saved_bytes ); Ok(unsaved_messages_count) diff --git a/core/server/src/streaming/utils/memory_pool.rs b/core/server/src/streaming/utils/memory_pool.rs index 2cc031d3b..df0783b14 100644 --- a/core/server/src/streaming/utils/memory_pool.rs +++ b/core/server/src/streaming/utils/memory_pool.rs @@ -17,7 +17,7 @@ */ use crate::configs::system::SystemConfig; -use bytes::BytesMut; +use aligned_vec::{AVec, ConstAlign}; use crossbeam::queue::ArrayQueue; use human_repr::HumanCount; use once_cell::sync::OnceCell; @@ -26,15 +26,18 @@ use std::sync::Arc; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use tracing::{error, info, trace, warn}; +pub const ALIGNMENT: usize = 512; +pub type Align512 = ConstAlign<ALIGNMENT>; +pub type AlignedBuffer = AVec<u8, Align512>; + /// Global memory pool instance. Use `memory_pool()` to access it. pub static MEMORY_POOL: OnceCell<MemoryPool> = OnceCell::new(); /// Total number of distinct bucket sizes. -const NUM_BUCKETS: usize = 32; +const NUM_BUCKETS: usize = 31; /// Array of bucket sizes in ascending order. Each entry is a distinct buffer size (in bytes). const BUCKET_SIZES: [usize; NUM_BUCKETS] = [ - 256, 512, 1024, 2 * 1024, @@ -49,8 +52,8 @@ const BUCKET_SIZES: [usize; NUM_BUCKETS] = [ 768 * 1024, 1024 * 1024, 1536 * 1024, - 2 * 1024 * 1024, // Above 2MiB everything should be rounded up to the next power of 2 to take advantage of hugepages - 4 * 1024 * 1024, // (environment variables MIMALLOC_ALLOW_LARGE_OS_PAGES=1 and MIMALLOC_LARGE_OS_PAGES=1). + 2 * 1024 * 1024, + 4 * 1024 * 1024, 6 * 1024 * 1024, 8 * 1024 * 1024, 10 * 1024 * 1024, @@ -75,7 +78,7 @@ pub fn memory_pool() -> &'static MemoryPool { .expect("Memory pool not initialized - MemoryPool::init_pool should be called first") } -/// A memory pool that maintains fixed-size buckets for reusing `BytesMut` buffers. +/// A memory pool that maintains fixed-size buckets for reusing `AlignedBuffer` buffers. /// /// Each bucket corresponds to a particular size in `BUCKET_SIZES`. The pool tracks: /// - Buffers currently in use (`in_use`) @@ -97,7 +100,7 @@ pub struct MemoryPool { /// Array of queues for reusable buffers. Each queue can store up to `bucket_capacity` buffers. /// The length of each queue (`buckets[i].len()`) is how many **free** buffers are currently available. /// Free doesn't mean the buffer is allocated, it just means it's not in use. - buckets: [Arc<ArrayQueue<BytesMut>>; NUM_BUCKETS], + buckets: [Arc<ArrayQueue<AlignedBuffer>>; NUM_BUCKETS], /// Number of buffers **in use** for each bucket size (grow/shrink as they are acquired/released). in_use: [Arc<AtomicUsize>; NUM_BUCKETS], @@ -163,16 +166,16 @@ impl MemoryPool { MEMORY_POOL.get_or_init(|| MemoryPool::new(is_enabled, memory_limit, bucket_capacity)); } - /// Acquire a `BytesMut` buffer with at least `capacity` bytes. + /// Acquire a `AlignedBuffer` buffer with at least `capacity` bytes. /// /// - If a bucket can fit `capacity`, try to pop from its free buffer queue; otherwise create a new buffer. /// - If `memory_limit` would be exceeded, allocate outside the pool. /// /// Returns a tuple of (buffer, was_pool_allocated) where was_pool_allocated indicates if the buffer /// was allocated from the pool (true) or externally (false). - pub fn acquire_buffer(&self, capacity: usize) -> (BytesMut, bool) { + pub fn acquire_buffer(&self, capacity: usize) -> (AlignedBuffer, bool) { if !self.is_enabled { - return (BytesMut::with_capacity(capacity), false); + return (AlignedBuffer::with_capacity(ALIGNMENT, capacity), false); } let current = self.pool_current_size(); @@ -193,12 +196,12 @@ impl MemoryPool { new_size, current, self.memory_limit ); self.inc_external_allocations(); - return (BytesMut::with_capacity(new_size), false); + return (AlignedBuffer::with_capacity(ALIGNMENT, new_size), false); } self.inc_bucket_alloc(idx); self.inc_bucket_in_use(idx); - (BytesMut::with_capacity(new_size), true) + (AlignedBuffer::with_capacity(ALIGNMENT, new_size), true) } None => { if current + capacity > self.memory_limit { @@ -207,16 +210,16 @@ impl MemoryPool { capacity, current, self.memory_limit ); self.inc_external_allocations(); - return (BytesMut::with_capacity(capacity), false); + return (AlignedBuffer::with_capacity(ALIGNMENT, capacity), false); } self.inc_external_allocations(); - (BytesMut::with_capacity(capacity), false) + (AlignedBuffer::with_capacity(ALIGNMENT, capacity), false) } } } - /// Return a `BytesMut` buffer previously acquired from the pool. + /// Return a `AlignedBuffer` buffer previously acquired from the pool. /// /// - If `current_capacity` differs from `original_capacity`, increments `resize_events`. /// - If a matching bucket exists, place it back in that bucket's queue (if space is available). @@ -224,7 +227,7 @@ impl MemoryPool { /// - The `was_pool_allocated` flag indicates if this buffer was originally allocated from the pool. pub fn release_buffer( &self, - buffer: BytesMut, + buffer: AlignedBuffer, original_capacity: usize, was_pool_allocated: bool, ) { @@ -235,10 +238,6 @@ impl MemoryPool { let current_capacity = buffer.capacity(); if current_capacity != original_capacity { self.inc_resize_events(); - trace!( - "Buffer capacity {} != original {} when returning", - current_capacity, original_capacity - ); } if was_pool_allocated { @@ -438,11 +437,11 @@ impl MemoryPool { /// Return a buffer to the pool by calling `release_buffer` with the original capacity. /// This extension trait makes it easy to do `some_bytes.return_to_pool(orig_cap, was_pool_allocated)`. -pub trait BytesMutExt { +pub trait AlignedBufferExt { fn return_to_pool(self, original_capacity: usize, was_pool_allocated: bool); } -impl BytesMutExt for BytesMut { +impl AlignedBufferExt for AlignedBuffer { fn return_to_pool(self, original_capacity: usize, was_pool_allocated: bool) { memory_pool().release_buffer(self, original_capacity, was_pool_allocated); } @@ -667,27 +666,6 @@ mod tests { ); } - // Test put_bytes - { - let initial_events = pool.resize_events(); - let mut buffer = PooledBuffer::with_capacity(4 * 1024); - let orig_bucket_idx = pool.best_fit(buffer.capacity()).unwrap(); - let orig_in_use = pool.bucket_current_elements(orig_bucket_idx); - - buffer.put_bytes(0, 64 * 1024); // 64KiB of zeros - - assert_eq!( - pool.resize_events(), - initial_events + 1, - "put_bytes should trigger resize event" - ); - assert_eq!( - pool.bucket_current_elements(orig_bucket_idx), - orig_in_use - 1, - "put_bytes should update bucket accounting" - ); - } - // Test extend_from_slice { let initial_events = pool.resize_events(); diff --git a/core/server/src/streaming/utils/mod.rs b/core/server/src/streaming/utils/mod.rs index e441a051b..d858d684b 100644 --- a/core/server/src/streaming/utils/mod.rs +++ b/core/server/src/streaming/utils/mod.rs @@ -25,5 +25,5 @@ pub mod random_id; mod memory_pool; mod pooled_buffer; -pub use memory_pool::{MemoryPool, memory_pool}; +pub use memory_pool::{ALIGNMENT, MemoryPool, memory_pool}; pub use pooled_buffer::PooledBuffer; diff --git a/core/server/src/streaming/utils/pooled_buffer.rs b/core/server/src/streaming/utils/pooled_buffer.rs index ee6f20148..b3769bf27 100644 --- a/core/server/src/streaming/utils/pooled_buffer.rs +++ b/core/server/src/streaming/utils/pooled_buffer.rs @@ -16,8 +16,8 @@ * under the License. */ -use super::memory_pool::{BytesMutExt, memory_pool}; -use bytes::{Buf, BufMut, BytesMut}; +use super::memory_pool::{AlignedBuffer, AlignedBufferExt, memory_pool}; +use crate::streaming::utils::memory_pool::ALIGNMENT; use compio::buf::{IoBuf, IoBufMut, SetBufInit}; use std::ops::{Deref, DerefMut}; @@ -26,7 +26,7 @@ pub struct PooledBuffer { from_pool: bool, original_capacity: usize, original_bucket_idx: Option<usize>, - inner: BytesMut, + inner: AlignedBuffer, } impl Default for PooledBuffer { @@ -42,13 +42,14 @@ impl PooledBuffer { /// /// * `capacity` - The capacity of the buffer pub fn with_capacity(capacity: usize) -> Self { - let (buffer, was_pool_allocated) = memory_pool().acquire_buffer(capacity); + let (mut buffer, was_pool_allocated) = memory_pool().acquire_buffer(capacity); let original_capacity = buffer.capacity(); let original_bucket_idx = if was_pool_allocated { memory_pool().best_fit(original_capacity) } else { None }; + Self { from_pool: was_pool_allocated, original_capacity, @@ -57,27 +58,13 @@ impl PooledBuffer { } } - /// Creates a new pooled buffer from an existing `BytesMut`. - /// - /// # Arguments - /// - /// * `existing` - The existing `BytesMut` buffer - pub fn from_existing(existing: BytesMut) -> Self { - Self { - from_pool: false, - original_capacity: existing.capacity(), - original_bucket_idx: None, - inner: existing, - } - } - /// Creates an empty pooled buffer. pub fn empty() -> Self { Self { from_pool: false, original_capacity: 0, original_bucket_idx: None, - inner: BytesMut::new(), + inner: AlignedBuffer::new(ALIGNMENT), } } @@ -90,6 +77,11 @@ impl PooledBuffer { let current_capacity = self.inner.capacity(); if current_capacity != self.original_capacity { + tracing::error!( + "Pooled buffer resized from {} to {}", + self.original_capacity, + current_capacity + ); memory_pool().inc_resize_events(); if let Some(orig_idx) = self.original_bucket_idx { @@ -131,49 +123,111 @@ impl PooledBuffer { } } - /// Wrapper for put_bytes which might cause resize - pub fn put_bytes(&mut self, byte: u8, len: usize) { + /// Wrapper for put_slice which might cause resize + pub fn put_slice(&mut self, src: &[u8]) { let before_cap = self.inner.capacity(); - self.inner.put_bytes(byte, len); + self.extend_from_slice(src); if self.inner.capacity() != before_cap { self.check_for_resize(); } } - /// Wrapper for put_slice which might cause resize - pub fn put_slice(&mut self, src: &[u8]) { + /// Wrapper for put_u32_le which might cause resize + pub fn put_u32_le(&mut self, value: u32) { let before_cap = self.inner.capacity(); - self.inner.put_slice(src); + self.reserve(4); + self.inner.extend_from_slice(&value.to_le_bytes()); if self.inner.capacity() != before_cap { self.check_for_resize(); } } - /// Wrapper for put_u32_le which might cause resize - pub fn put_u32_le(&mut self, value: u32) { + /// Wrapper for put_u64_le which might cause resize + pub fn put_u64_le(&mut self, value: u64) { let before_cap = self.inner.capacity(); - self.inner.put_u32_le(value); + self.reserve(8); + self.inner.extend_from_slice(&value.to_le_bytes()); if self.inner.capacity() != before_cap { self.check_for_resize(); } } - /// Wrapper for put_u64_le which might cause resize - pub fn put_u64_le(&mut self, value: u64) { + /// Get a slice of the buffer's contents + pub fn as_slice(&self) -> &[u8] { + self.inner.as_slice() + } + + /// Get the length of the buffer + pub fn len(&self) -> usize { + self.inner.len() + } + + /// Check if the buffer is empty + pub fn is_empty(&self) -> bool { + self.inner.is_empty() + } + + /// Get the capacity of the buffer + pub fn capacity(&self) -> usize { + self.inner.capacity() + } + + /// Clear the buffer + pub fn clear(&mut self) { + self.inner.clear() + } + + /// Resize the buffer + pub fn resize(&mut self, new_len: usize, value: u8) { let before_cap = self.inner.capacity(); - self.inner.put_u64_le(value); + self.inner.resize(new_len, value); if self.inner.capacity() != before_cap { self.check_for_resize(); } } + + /// Split the buffer at the given index, returning the data before the split + /// and keeping the data after the split in self + pub fn split_to(&mut self, at: usize) -> Vec<u8> { + if at > self.inner.len() { + panic!("split_to out of bounds"); + } + + let mut result = Vec::with_capacity(at); + result.extend_from_slice(&self.inner[..at]); + + let remaining = self.inner.len() - at; + let mut new_data = Vec::with_capacity(remaining); + new_data.extend_from_slice(&self.inner[at..]); + + self.inner.clear(); + self.inner.extend_from_slice(&new_data); + + result + } + + /// Put bytes from a slice + pub fn put<T: AsRef<[u8]>>(&mut self, data: T) { + self.extend_from_slice(data.as_ref()); + } + + /// Align the buffer length to the next 512-byte boundary by padding with zeros + pub fn align(&mut self) { + let current_len = self.inner.len(); + let aligned_len = (current_len + 511) & !511; + if aligned_len > current_len { + let padding = aligned_len - current_len; + self.resize(aligned_len, 0); + } + } } impl Deref for PooledBuffer { - type Target = BytesMut; + type Target = AlignedBuffer; fn deref(&self) -> &Self::Target { &self.inner @@ -189,12 +243,18 @@ impl DerefMut for PooledBuffer { impl Drop for PooledBuffer { fn drop(&mut self) { if self.from_pool { - let buf = std::mem::take(&mut self.inner); + let buf = std::mem::replace(&mut self.inner, AlignedBuffer::new(ALIGNMENT)); buf.return_to_pool(self.original_capacity, true); } } } +impl AsRef<[u8]> for PooledBuffer { + fn as_ref(&self) -> &[u8] { + self.inner.as_slice() + } +} + impl From<&[u8]> for PooledBuffer { fn from(slice: &[u8]) -> Self { let mut buf = PooledBuffer::with_capacity(slice.len()); @@ -203,30 +263,10 @@ impl From<&[u8]> for PooledBuffer { } } -impl Buf for PooledBuffer { - fn remaining(&self) -> usize { - self.inner.remaining() - } - - fn chunk(&self) -> &[u8] { - self.inner.chunk() - } - - fn advance(&mut self, cnt: usize) { - self.inner.advance(cnt) - } - - fn chunks_vectored<'t>(&'t self, dst: &mut [std::io::IoSlice<'t>]) -> usize { - self.inner.chunks_vectored(dst) - } -} - impl SetBufInit for PooledBuffer { unsafe fn set_buf_init(&mut self, len: usize) { - if self.inner.len() <= len { - unsafe { - self.inner.set_len(len); - } + unsafe { + self.inner.set_len(len); } } } @@ -239,7 +279,7 @@ unsafe impl IoBufMut for PooledBuffer { unsafe impl IoBuf for PooledBuffer { fn as_buf_ptr(&self) -> *const u8 { - self.inner.as_buf_ptr() + self.inner.as_ptr() } fn buf_len(&self) -> usize { diff --git a/core/server/src/tcp/connection_handler.rs b/core/server/src/tcp/connection_handler.rs index 9c6803b8f..1f1bb4827 100644 --- a/core/server/src/tcp/connection_handler.rs +++ b/core/server/src/tcp/connection_handler.rs @@ -49,7 +49,7 @@ pub(crate) async fn handle_connection( loop { let read_future = sender.read(length_buffer.clone()); - let (read_length, initial_buffer) = futures::select! { + let initial_buffer = futures::select! { _ = stop_receiver.recv().fuse() => { info!("Connection stop signal received for session: {}", session); let _ = sender.send_error_response(IggyError::Disconnected).await; @@ -57,8 +57,8 @@ pub(crate) async fn handle_connection( } result = read_future.fuse() => { match result { - (Ok(read_length), initial_buffer) => (read_length, initial_buffer), - (Err(error), _) => { + Ok(initial_buffer) => initial_buffer, + Err(error) => { if error.as_code() == IggyError::ConnectionClosed.as_code() { return Err(ConnectionError::from(error)); } else { @@ -71,9 +71,9 @@ pub(crate) async fn handle_connection( } }; - if read_length != INITIAL_BYTES_LENGTH { + if initial_buffer.len() != INITIAL_BYTES_LENGTH { sender.send_error_response(IggyError::CommandLengthError(format!( - "Unable to read the TCP request length, expected: {INITIAL_BYTES_LENGTH} bytes, received: {read_length} bytes." + "Unable to read the TCP request length, expected: {INITIAL_BYTES_LENGTH} bytes, received: {} bytes.", initial_buffer.len() ))).await?; continue; } @@ -81,8 +81,7 @@ pub(crate) async fn handle_connection( let initial_buffer = initial_buffer.freeze(); let length = u32::from_le_bytes(initial_buffer[0..INITIAL_BYTES_LENGTH].try_into().unwrap()); - let (res, code_buffer) = sender.read(code_buffer.clone()).await; - let _ = res?; + let code_buffer = sender.read(code_buffer.clone()).await?; let code_buffer = code_buffer.freeze(); let code: u32 = u32::from_le_bytes(code_buffer[0..INITIAL_BYTES_LENGTH].try_into().unwrap()); diff --git a/core/server/src/tcp/sender.rs b/core/server/src/tcp/sender.rs index 2c0ecfa9c..f757a69b6 100644 --- a/core/server/src/tcp/sender.rs +++ b/core/server/src/tcp/sender.rs @@ -23,31 +23,25 @@ use compio::{ io::{AsyncRead, AsyncReadAtExt, AsyncReadExt, AsyncWriteExt}, }; use iggy_common::IggyError; -use nix::libc; -use std::io::IoSlice; use tracing::{debug, error}; use crate::streaming::utils::PooledBuffer; const STATUS_OK: &[u8] = &[0; 4]; -pub(crate) async fn read<T, B>(stream: &mut T, buffer: B) -> (Result<usize, IggyError>, B) +pub(crate) async fn read<T, B>(stream: &mut T, buffer: B) -> Result<B, IggyError> where T: AsyncReadExt + AsyncWriteExt + Unpin, B: IoBufMut, { let BufResult(result, buffer) = stream.read_exact(buffer).await; - match (result, buffer) { - (Ok(_), buffer) => (Ok(buffer.buf_len()), buffer), - // TODO: How to handle this ?(Ok(0), buffer) => (Err(IggyError::ConnectionClosed), buffer), - // `read_exact` from compio doesn't return how many bytes it read. - (Err(error), buffer) => { + match result { + Ok(_) => Ok(buffer), + Err(error) => { if error.kind() == std::io::ErrorKind::UnexpectedEof { - //error!("Got some error tho.. {}", error); - (Err(IggyError::ConnectionClosed), buffer) + Err(IggyError::ConnectionClosed) } else { - //error!("Got some other error tho.. {}", error); - (Err(IggyError::TcpError), buffer) + Err(IggyError::TcpError) } } } diff --git a/core/server/src/tcp/tcp_sender.rs b/core/server/src/tcp/tcp_sender.rs index d0a1c27c9..ccdc95d24 100644 --- a/core/server/src/tcp/tcp_sender.rs +++ b/core/server/src/tcp/tcp_sender.rs @@ -20,13 +20,11 @@ use crate::binary::sender::Sender; use crate::streaming::utils::PooledBuffer; use crate::tcp::COMPONENT; use crate::{server_error::ServerError, tcp::sender}; -use bytes::BytesMut; -use compio::buf::{IoBuf, IoBufMut}; +use compio::buf::IoBufMut; use compio::io::AsyncWrite; use compio::net::TcpStream; use error_set::ErrContext; use iggy_common::IggyError; -use nix::libc; #[derive(Debug)] pub struct TcpSender { @@ -34,7 +32,7 @@ pub struct TcpSender { } impl Sender for TcpSender { - async fn read<B: IoBufMut>(&mut self, buffer: B) -> (Result<usize, IggyError>, B) { + async fn read<B: IoBufMut>(&mut self, buffer: B) -> Result<B, IggyError> { sender::read(&mut self.stream, buffer).await } diff --git a/core/server/src/tcp/tcp_tls_sender.rs b/core/server/src/tcp/tcp_tls_sender.rs index 8dbb3972b..f0dfcdf49 100644 --- a/core/server/src/tcp/tcp_tls_sender.rs +++ b/core/server/src/tcp/tcp_tls_sender.rs @@ -35,7 +35,7 @@ pub struct TcpTlsSender { } impl Sender for TcpTlsSender { - async fn read<B: IoBufMut>(&mut self, buffer: B) -> (Result<usize, IggyError>, B) { + async fn read<B: IoBufMut>(&mut self, buffer: B) -> Result<B, IggyError> { todo!(); sender::read(&mut self.stream, buffer).await }
