This is an automated email from the ASF dual-hosted git repository.
gkoszyk pushed a commit to branch io_uring_tpc
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/io_uring_tpc by this push:
new d5da51de feat(io_uring): replace tokio s3 crate (#2020)
d5da51de is described below
commit d5da51de4596663adc0966bce96ab05009a7d101
Author: Grzegorz Koszyk <[email protected]>
AuthorDate: Fri Jul 18 16:15:37 2025 +0200
feat(io_uring): replace tokio s3 crate (#2020)
---
Cargo.lock | 523 +++++++++++++--------
Cargo.toml | 1 +
core/configs/server.toml | 8 +-
core/integration/tests/archiver/disk.rs | 33 +-
core/integration/tests/archiver/mod.rs | 3 +-
core/integration/tests/archiver/s3.rs | 38 --
core/integration/tests/state/mod.rs | 2 +-
.../integration/tests/streaming/consumer_offset.rs | 2 +-
core/integration/tests/streaming/partition.rs | 2 +-
core/integration/tests/streaming/segment.rs | 2 +-
core/integration/tests/streaming/stream.rs | 2 +-
core/integration/tests/streaming/topic.rs | 2 +-
core/server/Cargo.toml | 5 +-
core/server/src/archiver/disk.rs | 16 +-
core/server/src/archiver/mod.rs | 5 +
core/server/src/archiver/s3.rs | 207 ++++++--
core/server/src/channels/commands/mod.rs | 1 -
core/server/src/channels/handler.rs | 24 +-
core/server/src/channels/server_command.rs | 15 +-
core/server/src/main.rs | 27 ++
core/server/src/server_error.rs | 8 +
core/server/src/shard/builder.rs | 9 +
core/server/src/shard/mod.rs | 16 +-
.../tasks/auxilary}/maintain_messages.rs | 342 +++++---------
core/server/src/shard/tasks/auxilary/mod.rs | 1 +
core/server/src/shard/tasks/mod.rs | 1 +
.../server/src/streaming/partitions/persistence.rs | 2 +-
.../src/streaming/segments/writing_messages.rs | 4 +-
core/server/src/streaming/utils/file.rs | 4 -
29 files changed, 735 insertions(+), 570 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index fce0c066..2a32b031 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -307,6 +307,15 @@ dependencies = [
"memchr",
]
+[[package]]
+name = "aligned-array"
+version = "1.0.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e05c92d086290f52938013f6242ac62bf7d401fab8ad36798a609faa65c3fd2c"
+dependencies = [
+ "generic-array",
+]
+
[[package]]
name = "alloc-no-stdlib"
version = "2.0.4"
@@ -643,44 +652,12 @@ version = "1.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0"
-[[package]]
-name = "attohttpc"
-version = "0.28.5"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "07a9b245ba0739fc90935094c29adbaee3f977218b5fb95e822e261cda7f56a3"
-dependencies = [
- "http 1.3.1",
- "log",
- "rustls",
- "serde",
- "serde_json",
- "url",
- "webpki-roots 0.26.11",
-]
-
[[package]]
name = "autocfg"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8"
-[[package]]
-name = "aws-creds"
-version = "0.38.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "ba912106484991c456adb3364338a2534d0818bd9374b324b608074e3b55f581"
-dependencies = [
- "attohttpc",
- "home",
- "log",
- "quick-xml 0.32.0",
- "rust-ini",
- "serde",
- "thiserror 1.0.69",
- "time",
- "url",
-]
-
[[package]]
name = "aws-lc-rs"
version = "1.13.1"
@@ -704,15 +681,6 @@ dependencies = [
"fs_extra",
]
-[[package]]
-name = "aws-region"
-version = "0.26.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "73ae4ae7c45238b60af0a3b27ef2fcc7bd5b8fdcd8a6d679919558b40d3eff7a"
-dependencies = [
- "thiserror 1.0.69",
-]
-
[[package]]
name = "axum"
version = "0.7.9"
@@ -1317,15 +1285,6 @@ dependencies = [
"thiserror 2.0.12",
]
-[[package]]
-name = "castaway"
-version = "0.2.3"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "0abae9be0aaf9ea96a3b1b8b1b55c602ca751eba1b1500220cea4ecbafe7c0d5"
-dependencies = [
- "rustversion",
-]
-
[[package]]
name = "cc"
version = "1.2.27"
@@ -1533,16 +1492,23 @@ dependencies = [
]
[[package]]
-name = "compact_str"
-version = "0.7.1"
+name = "compio"
+version = "0.15.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "f86b9c4c00838774a6d902ef931eff7470720c51d90c2e32cfe15dc304737b3f"
+checksum = "713c6293af093c202ad318e8f7bdc1de1a36d7a793bb77f7fc6bd6f1788659a9"
dependencies = [
- "castaway",
- "cfg-if",
- "itoa",
- "ryu",
- "static_assertions",
+ "compio-buf 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)",
+ "compio-dispatcher",
+ "compio-driver 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)",
+ "compio-fs 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)",
+ "compio-io 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)",
+ "compio-log 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
+ "compio-net 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)",
+ "compio-process",
+ "compio-quic",
+ "compio-runtime 0.8.1
(registry+https://github.com/rust-lang/crates.io-index)",
+ "compio-signal",
+ "compio-tls 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
@@ -1550,15 +1516,26 @@ name = "compio"
version = "0.15.0"
source =
"git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be#fe4243f0b6811ebc325afd081c9b087b4d9817be"
dependencies = [
- "compio-buf",
- "compio-driver",
- "compio-fs",
- "compio-io",
- "compio-log",
+ "compio-buf 0.6.0
(git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be)",
+ "compio-driver 0.8.1
(git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be)",
+ "compio-fs 0.8.0
(git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be)",
+ "compio-io 0.7.0
(git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be)",
+ "compio-log 0.1.0
(git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be)",
"compio-macros",
- "compio-net",
- "compio-runtime",
- "compio-tls",
+ "compio-net 0.8.0
(git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be)",
+ "compio-runtime 0.8.1
(git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be)",
+ "compio-tls 0.6.0
(git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be)",
+]
+
+[[package]]
+name = "compio-buf"
+version = "0.6.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "3ce94a45a47ef8c0e3f44084fe67c8effc25e7ac1de6de2ee1a29a59e6c6ba8e"
+dependencies = [
+ "arrayvec",
+ "bytes",
+ "libc",
]
[[package]]
@@ -1571,6 +1548,43 @@ dependencies = [
"libc",
]
+[[package]]
+name = "compio-dispatcher"
+version = "0.7.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "0cdf8c613be826be410d8744ab30acc49cc5134a78e2aa25efae9efa44bed6a7"
+dependencies = [
+ "compio-driver 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)",
+ "compio-runtime 0.8.1
(registry+https://github.com/rust-lang/crates.io-index)",
+ "flume",
+ "futures-channel",
+]
+
+[[package]]
+name = "compio-driver"
+version = "0.8.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "737212fe00b4af769f7e8f156c25ffafd5888d4d21834e100ea068dea1086ef8"
+dependencies = [
+ "aligned-array",
+ "cfg-if",
+ "cfg_aliases",
+ "compio-buf 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)",
+ "compio-log 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
+ "crossbeam-channel",
+ "crossbeam-queue",
+ "futures-util",
+ "io-uring",
+ "io_uring_buf_ring",
+ "libc",
+ "once_cell",
+ "paste",
+ "polling",
+ "slab",
+ "socket2 0.5.10",
+ "windows-sys 0.52.0",
+]
+
[[package]]
name = "compio-driver"
version = "0.8.1"
@@ -1578,8 +1592,8 @@ source =
"git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd
dependencies = [
"cfg-if",
"cfg_aliases",
- "compio-buf",
- "compio-log",
+ "compio-buf 0.6.0
(git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be)",
+ "compio-log 0.1.0
(git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be)",
"crossbeam-channel",
"crossbeam-queue",
"futures-util",
@@ -1594,6 +1608,24 @@ dependencies = [
"windows-sys 0.60.2",
]
+[[package]]
+name = "compio-fs"
+version = "0.8.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "9bcf65e631d521c666bca25595f8e5c78173e96f0b3b61f0a7d93f31d9661d32"
+dependencies = [
+ "cfg-if",
+ "cfg_aliases",
+ "compio-buf 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)",
+ "compio-driver 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)",
+ "compio-io 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)",
+ "compio-runtime 0.8.1
(registry+https://github.com/rust-lang/crates.io-index)",
+ "libc",
+ "os_pipe",
+ "widestring",
+ "windows-sys 0.52.0",
+]
+
[[package]]
name = "compio-fs"
version = "0.8.0"
@@ -1601,27 +1633,48 @@ source =
"git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd
dependencies = [
"cfg-if",
"cfg_aliases",
- "compio-buf",
- "compio-driver",
- "compio-io",
- "compio-runtime",
+ "compio-buf 0.6.0
(git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be)",
+ "compio-driver 0.8.1
(git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be)",
+ "compio-io 0.7.0
(git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be)",
+ "compio-runtime 0.8.1
(git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be)",
"libc",
"os_pipe",
"widestring",
"windows-sys 0.60.2",
]
+[[package]]
+name = "compio-io"
+version = "0.7.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "c2b05cc4142659f2c90b6e44c68568ff71c83c6fb9285aca686952250b914932"
+dependencies = [
+ "compio-buf 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)",
+ "futures-util",
+ "paste",
+ "pin-project-lite",
+]
+
[[package]]
name = "compio-io"
version = "0.7.0"
source =
"git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be#fe4243f0b6811ebc325afd081c9b087b4d9817be"
dependencies = [
- "compio-buf",
+ "compio-buf 0.6.0
(git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be)",
"futures-util",
"paste",
"pin-project-lite",
]
+[[package]]
+name = "compio-log"
+version = "0.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "fc4e560213c1996b618da369b7c9109564b41af9033802ae534465c4ee4e132f"
+dependencies = [
+ "tracing",
+]
+
[[package]]
name = "compio-log"
version = "0.1.0"
@@ -1641,16 +1694,35 @@ dependencies = [
"syn 2.0.104",
]
+[[package]]
+name = "compio-net"
+version = "0.8.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "0c1fabe3393bc0c3a0dca8e99a35bf97e42caa12bb3cc6bba83df04e28c9c142"
+dependencies = [
+ "cfg-if",
+ "compio-buf 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)",
+ "compio-driver 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)",
+ "compio-io 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)",
+ "compio-runtime 0.8.1
(registry+https://github.com/rust-lang/crates.io-index)",
+ "either",
+ "libc",
+ "once_cell",
+ "socket2 0.5.10",
+ "widestring",
+ "windows-sys 0.52.0",
+]
+
[[package]]
name = "compio-net"
version = "0.8.0"
source =
"git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be#fe4243f0b6811ebc325afd081c9b087b4d9817be"
dependencies = [
"cfg-if",
- "compio-buf",
- "compio-driver",
- "compio-io",
- "compio-runtime",
+ "compio-buf 0.6.0
(git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be)",
+ "compio-driver 0.8.1
(git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be)",
+ "compio-io 0.7.0
(git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be)",
+ "compio-runtime 0.8.1
(git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be)",
"either",
"libc",
"once_cell",
@@ -1659,6 +1731,64 @@ dependencies = [
"windows-sys 0.60.2",
]
+[[package]]
+name = "compio-process"
+version = "0.5.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "3867cfe7b23eaae89ff815aba4fdde61cb6fd55f81fd368128300c6b7e645016"
+dependencies = [
+ "cfg-if",
+ "compio-buf 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)",
+ "compio-driver 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)",
+ "compio-io 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)",
+ "compio-runtime 0.8.1
(registry+https://github.com/rust-lang/crates.io-index)",
+ "futures-util",
+ "windows-sys 0.52.0",
+]
+
+[[package]]
+name = "compio-quic"
+version = "0.4.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "6f107e044329f1e171930801b09bfc6e764c5e171e45c7a3e382f98561da619a"
+dependencies = [
+ "cfg_aliases",
+ "compio-buf 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)",
+ "compio-io 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)",
+ "compio-log 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
+ "compio-net 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)",
+ "compio-runtime 0.8.1
(registry+https://github.com/rust-lang/crates.io-index)",
+ "flume",
+ "futures-util",
+ "libc",
+ "quinn-proto",
+ "rustc-hash 2.1.1",
+ "rustls",
+ "thiserror 2.0.12",
+ "windows-sys 0.52.0",
+]
+
+[[package]]
+name = "compio-runtime"
+version = "0.8.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b7df559e87b7ab05ba61c32619f6076dd5cc2daf5a8cb30cb9931fb355d20aff"
+dependencies = [
+ "async-task",
+ "cfg-if",
+ "compio-buf 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)",
+ "compio-driver 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)",
+ "compio-log 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
+ "crossbeam-queue",
+ "futures-util",
+ "libc",
+ "once_cell",
+ "scoped-tls",
+ "slab",
+ "socket2 0.5.10",
+ "windows-sys 0.52.0",
+]
+
[[package]]
name = "compio-runtime"
version = "0.8.1"
@@ -1666,9 +1796,9 @@ source =
"git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd
dependencies = [
"async-task",
"cfg-if",
- "compio-buf",
- "compio-driver",
- "compio-log",
+ "compio-buf 0.6.0
(git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be)",
+ "compio-driver 0.8.1
(git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be)",
+ "compio-log 0.1.0
(git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be)",
"core_affinity",
"crossbeam-queue",
"futures-util",
@@ -1680,13 +1810,40 @@ dependencies = [
"windows-sys 0.60.2",
]
+[[package]]
+name = "compio-signal"
+version = "0.6.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "03d2931880b03b33d4df7d2b8a008e93731366d185358c7442fc8d24d5f9c1bd"
+dependencies = [
+ "compio-buf 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)",
+ "compio-driver 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)",
+ "compio-runtime 0.8.1
(registry+https://github.com/rust-lang/crates.io-index)",
+ "libc",
+ "once_cell",
+ "os_pipe",
+ "slab",
+ "windows-sys 0.52.0",
+]
+
+[[package]]
+name = "compio-tls"
+version = "0.6.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "542bb0e0f6f65cb84bc09b7e052fa54f006d1ba228a8dfad6d7b9676defe7232"
+dependencies = [
+ "compio-buf 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)",
+ "compio-io 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)",
+ "rustls",
+]
+
[[package]]
name = "compio-tls"
version = "0.6.0"
source =
"git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be#fe4243f0b6811ebc325afd081c9b087b4d9817be"
dependencies = [
- "compio-buf",
- "compio-io",
+ "compio-buf 0.6.0
(git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be)",
+ "compio-io 0.7.0
(git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be)",
"rustls",
]
@@ -2102,6 +2259,46 @@ dependencies = [
"regex-syntax 0.7.5",
]
+[[package]]
+name = "cyper"
+version = "0.4.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "89b65af5073b4f53c9697b611b414042e71c6a14e11088438a67b1ef36f51ca2"
+dependencies = [
+ "async-stream",
+ "base64 0.22.1",
+ "compio 0.15.0 (registry+https://github.com/rust-lang/crates.io-index)",
+ "cyper-core",
+ "encoding_rs",
+ "futures-util",
+ "http 1.3.1",
+ "http-body-util",
+ "hyper",
+ "hyper-util",
+ "mime",
+ "send_wrapper",
+ "serde",
+ "serde_urlencoded",
+ "thiserror 2.0.12",
+ "url",
+]
+
+[[package]]
+name = "cyper-core"
+version = "0.4.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "6343deaa569c748860d9afefab636648e7e6f9abdfc26b3b9dde327170ae6b2b"
+dependencies = [
+ "cfg-if",
+ "compio 0.15.0 (registry+https://github.com/rust-lang/crates.io-index)",
+ "futures-util",
+ "hyper",
+ "hyper-util",
+ "rustls-platform-verifier 0.6.0",
+ "send_wrapper",
+ "tower-service",
+]
+
[[package]]
name = "darling"
version = "0.20.11"
@@ -4394,7 +4591,7 @@ dependencies = [
"async-trait",
"bytes",
"chrono",
- "compio",
+ "compio 0.15.0
(git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be)",
"ctor",
"derive_more 2.0.1",
"env_logger",
@@ -4986,17 +5183,6 @@ version = "0.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "47e1ffaa40ddd1f3ed91f717a33c8c0ee23fff369e3aa8772b9605cc1d22f4c3"
-[[package]]
-name = "maybe-async"
-version = "0.2.10"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "5cf92c10c7e361d6b99666ec1c6f9805b0bea2c3bd8c78dc6fe98ac5bd78db11"
-dependencies = [
- "proc-macro2",
- "quote",
- "syn 2.0.104",
-]
-
[[package]]
name = "md-5"
version = "0.10.6"
@@ -5007,12 +5193,6 @@ dependencies = [
"digest",
]
-[[package]]
-name = "md5"
-version = "0.7.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "490cc448043f947bae3cbee9c203358d62dbee0db12107a74be5c30ccfd09771"
-
[[package]]
name = "memchr"
version = "2.7.5"
@@ -5066,15 +5246,6 @@ dependencies = [
"unicase",
]
-[[package]]
-name = "minidom"
-version = "0.16.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "e394a0e3c7ccc2daea3dffabe82f09857b6b510cb25af87d54bf3e910ac1642d"
-dependencies = [
- "rxml",
-]
-
[[package]]
name = "minimal-lexical"
version = "0.2.1"
@@ -6292,19 +6463,9 @@ dependencies = [
[[package]]
name = "quick-xml"
-version = "0.32.0"
+version = "0.37.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "1d3a6e5838b60e0e8fa7a43f22ade549a37d61f8bdbe636d0d7816191de969c2"
-dependencies = [
- "memchr",
- "serde",
-]
-
-[[package]]
-name = "quick-xml"
-version = "0.36.2"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "f7649a7b4df05aed9ea7ec6f628c67c9953a43869b8bc50929569b2999d443fe"
+checksum = "331e97a1af0bf59823e6eadffe373d7b27f485be8748f71471c662c1f269b7fb"
dependencies = [
"memchr",
"serde",
@@ -6345,7 +6506,7 @@ dependencies = [
"rustc-hash 2.1.1",
"rustls",
"rustls-pki-types",
- "rustls-platform-verifier",
+ "rustls-platform-verifier 0.5.3",
"slab",
"thiserror 2.0.12",
"tinyvec",
@@ -6633,14 +6794,12 @@ dependencies = [
"sync_wrapper",
"tokio",
"tokio-rustls",
- "tokio-util",
"tower 0.5.2",
"tower-http",
"tower-service",
"url",
"wasm-bindgen",
"wasm-bindgen-futures",
- "wasm-streams",
"web-sys",
"webpki-roots 1.0.1",
]
@@ -6783,40 +6942,6 @@ dependencies = [
"trim-in-place",
]
-[[package]]
-name = "rust-s3"
-version = "0.36.0-beta.2"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "cc7d6f3a3dd397743e8f344ffc80ea7137aee423983ae25b512e5332ad11362f"
-dependencies = [
- "async-trait",
- "aws-creds",
- "aws-region",
- "base64 0.22.1",
- "bytes",
- "cfg-if",
- "futures",
- "hex",
- "hmac",
- "http 1.3.1",
- "log",
- "maybe-async",
- "md5",
- "minidom",
- "percent-encoding",
- "quick-xml 0.36.2",
- "reqwest",
- "serde",
- "serde_derive",
- "serde_json",
- "sha2",
- "thiserror 1.0.69",
- "time",
- "tokio",
- "tokio-stream",
- "url",
-]
-
[[package]]
name = "rust_decimal"
version = "1.37.2"
@@ -6954,6 +7079,27 @@ dependencies = [
"windows-sys 0.59.0",
]
+[[package]]
+name = "rustls-platform-verifier"
+version = "0.6.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "eda84358ed17f1f354cf4b1909ad346e6c7bc2513e8c40eb08e0157aa13a9070"
+dependencies = [
+ "core-foundation",
+ "core-foundation-sys",
+ "jni",
+ "log",
+ "once_cell",
+ "rustls",
+ "rustls-native-certs",
+ "rustls-platform-verifier-android",
+ "rustls-webpki",
+ "security-framework",
+ "security-framework-sys",
+ "webpki-root-certs 1.0.1",
+ "windows-sys 0.52.0",
+]
+
[[package]]
name = "rustls-platform-verifier-android"
version = "0.1.1"
@@ -6979,22 +7125,22 @@ source =
"registry+https://github.com/rust-lang/crates.io-index"
checksum = "8a0d197bd2c9dc6e53b84da9556a69ba4cdfab8619eb41a8bd1cc2027a0f6b1d"
[[package]]
-name = "rxml"
-version = "0.11.1"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "65bc94b580d0f5a6b7a2d604e597513d3c673154b52ddeccd1d5c32360d945ee"
-dependencies = [
- "bytes",
- "rxml_validation",
-]
-
-[[package]]
-name = "rxml_validation"
-version = "0.11.0"
+name = "rusty-s3"
+version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "826e80413b9a35e9d33217b3dcac04cf95f6559d15944b93887a08be5496c4a4"
+checksum = "8f51a5a6b15f25d3e10c068039ee13befb6110fcb36c2b26317bcbdc23484d96"
dependencies = [
- "compact_str",
+ "base64 0.22.1",
+ "hmac",
+ "md-5",
+ "percent-encoding",
+ "quick-xml",
+ "serde",
+ "serde_json",
+ "sha2",
+ "time",
+ "url",
+ "zeroize",
]
[[package]]
@@ -7128,6 +7274,15 @@ dependencies = [
"serde",
]
+[[package]]
+name = "send_wrapper"
+version = "0.6.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "cd0b0ec5f1c1ca621c432a25813d8d60c88abe6d3e08a3eb9cf37d97a0fe3d73"
+dependencies = [
+ "futures-core",
+]
+
[[package]]
name = "serde"
version = "1.0.219"
@@ -7300,10 +7455,11 @@ dependencies = [
"bytes",
"chrono",
"clap",
- "compio",
+ "compio 0.15.0
(git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be)",
"console-subscriber",
"crossbeam",
"ctrlc",
+ "cyper",
"dashmap",
"derive_more 2.0.1",
"dotenvy",
@@ -7332,9 +7488,9 @@ dependencies = [
"quinn",
"reqwest",
"ring",
- "rust-s3",
"rustls",
"rustls-pemfile",
+ "rusty-s3",
"serde",
"serde_with",
"serial_test",
@@ -7789,12 +7945,6 @@ dependencies = [
"toml",
]
-[[package]]
-name = "static_assertions"
-version = "1.1.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f"
-
[[package]]
name = "stringprep"
version = "0.1.5"
@@ -8914,19 +9064,6 @@ dependencies = [
"unicode-ident",
]
-[[package]]
-name = "wasm-streams"
-version = "0.4.2"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "15053d8d85c7eccdbefef60f06769760a563c7f0a9d6902a13d35c7800b0ad65"
-dependencies = [
- "futures-util",
- "js-sys",
- "wasm-bindgen",
- "wasm-bindgen-futures",
- "web-sys",
-]
-
[[package]]
name = "wasm-timer"
version = "0.2.5"
diff --git a/Cargo.toml b/Cargo.toml
index 411d75b9..1faea63f 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -120,6 +120,7 @@ tempfile = "3.20.0"
thiserror = "2.0.12"
tokio = { version = "1.45.1", features = ["full"] }
compio = { git = "https://github.com/compio-rs/compio.git", rev =
"fe4243f0b6811ebc325afd081c9b087b4d9817be", features = ["runtime", "macros",
"io-uring", "time", "rustls"] }
+cyper = { version = "0.4.0", features = ["rustls"], default-features = false}
tokio-rustls = "0.26.2"
toml = "0.8.23"
tracing = "0.1.41"
diff --git a/core/configs/server.toml b/core/configs/server.toml
index e235baa8..7aba9ec1 100644
--- a/core/configs/server.toml
+++ b/core/configs/server.toml
@@ -17,7 +17,7 @@
[data_maintenance.archiver]
# Enables or disables the archiver process.
-enabled = false
+enabled = true
# Kind of archiver to use. Available options: "disk".
kind = "disk"
@@ -47,13 +47,13 @@ tmp_upload_dir = "local_data/s3_tmp"
[data_maintenance.messages]
# Enables or disables the archiver process for closed segments containing
messages.
-archiver_enabled = false
+archiver_enabled = true
# Enables or disables the expired message cleaner process.
cleaner_enabled = false
# Interval for running the message archiver and cleaner.
-interval = "1 m"
+interval = "35s"
[data_maintenance.state]
# Enables or disables the archiver process for state log.
@@ -549,4 +549,4 @@ bucket_capacity = 8192
# - "all": Use all available CPU cores (default)
# - numeric value (e.g. 4): Use 4 shards (4 threads pinned to cores 0, 1, 2, 3)
# - range (e.g. "5..8"): Use 3 shards with affinity to cores 5, 6, 7
-cpu_allocation = "all"
+cpu_allocation = "4"
diff --git a/core/integration/tests/archiver/disk.rs
b/core/integration/tests/archiver/disk.rs
index 00901190..d99ce8ef 100644
--- a/core/integration/tests/archiver/disk.rs
+++ b/core/integration/tests/archiver/disk.rs
@@ -17,12 +17,12 @@
*/
use crate::archiver::DiskArchiverSetup;
+use compio::io::{AsyncReadAtExt, AsyncWriteAtExt};
use server::streaming::utils::file;
use server::{archiver::Archiver, server_error::ArchiverError};
use std::path::Path;
-use tokio::io::{AsyncReadExt, AsyncWriteExt};
-#[tokio::test]
+#[compio::test]
async fn should_init_base_archiver_directory() {
let setup = DiskArchiverSetup::init().await;
let archiver = setup.archiver();
@@ -32,7 +32,7 @@ async fn should_init_base_archiver_directory() {
assert!(path.exists());
}
-#[tokio::test]
+#[compio::test]
async fn should_archive_file_on_disk_by_making_a_copy_of_original_file() {
let setup = DiskArchiverSetup::init().await;
let archiver = setup.archiver();
@@ -47,7 +47,7 @@ async fn
should_archive_file_on_disk_by_making_a_copy_of_original_file() {
assert_archived_file(&file_to_archive_path, &archived_file_path,
content).await;
}
-#[tokio::test]
+#[compio::test]
async fn should_archive_file_on_disk_within_additional_base_directory() {
let setup = DiskArchiverSetup::init().await;
let archiver = setup.archiver();
@@ -68,7 +68,7 @@ async fn
should_archive_file_on_disk_within_additional_base_directory() {
assert_archived_file(&file_to_archive_path, &archived_file_path,
content).await;
}
-#[tokio::test]
+#[compio::test]
async fn should_return_true_when_file_is_archived() {
let setup = DiskArchiverSetup::init().await;
let archiver = setup.archiver();
@@ -83,7 +83,7 @@ async fn should_return_true_when_file_is_archived() {
assert!(is_archived.unwrap());
}
-#[tokio::test]
+#[compio::test]
async fn should_return_false_when_file_is_not_archived() {
let setup = DiskArchiverSetup::init().await;
let archiver = setup.archiver();
@@ -96,7 +96,7 @@ async fn should_return_false_when_file_is_not_archived() {
assert!(!is_archived.unwrap());
}
-#[tokio::test]
+#[compio::test]
async fn should_fail_when_file_to_archive_does_not_exist() {
let setup = DiskArchiverSetup::init().await;
let archiver = setup.archiver();
@@ -110,26 +110,23 @@ async fn
should_fail_when_file_to_archive_does_not_exist() {
}
async fn create_file(path: &str, content: &str) {
- // TODO: Fixme
- /*
let mut file = file::overwrite(path).await.unwrap();
- file.write_all(content.as_bytes()).await.unwrap();
- */
+ let content = content.as_bytes().to_vec();
+ file.write_all_at(content, 0).await.unwrap();
}
async fn assert_archived_file(file_to_archive_path: &str, archived_file_path:
&str, content: &str) {
- // TODO: Fixme
- /*
assert!(Path::new(&file_to_archive_path).exists());
assert!(Path::new(&archived_file_path).exists());
let archived_file = file::open(archived_file_path).await;
assert!(archived_file.is_ok());
- let mut archived_file = archived_file.unwrap();
- let mut archived_file_content = String::new();
- archived_file
- .read_to_string(&mut archived_file_content)
+ let archived_file = archived_file.unwrap();
+ let len = archived_file.metadata().await.unwrap().len();
+ let archived_file_content = Vec::with_capacity(len as usize);
+ let (_, archived_file_content) = archived_file
+ .read_exact_at(archived_file_content, 0)
.await
.unwrap();
+ let archived_file_content =
String::from_utf8(archived_file_content).unwrap();
assert_eq!(content, archived_file_content);
- */
}
diff --git a/core/integration/tests/archiver/mod.rs
b/core/integration/tests/archiver/mod.rs
index 2e3ed94c..704936e9 100644
--- a/core/integration/tests/archiver/mod.rs
+++ b/core/integration/tests/archiver/mod.rs
@@ -16,13 +16,12 @@
* under the License.
*/
+use compio::fs::create_dir;
use server::archiver::disk::DiskArchiver;
use server::configs::server::DiskArchiverConfig;
-use tokio::fs::create_dir;
use uuid::Uuid;
mod disk;
-mod s3;
pub struct DiskArchiverSetup {
base_path: String,
diff --git a/core/integration/tests/archiver/s3.rs
b/core/integration/tests/archiver/s3.rs
deleted file mode 100644
index df2970f1..00000000
--- a/core/integration/tests/archiver/s3.rs
+++ /dev/null
@@ -1,38 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-use server::archiver::Archiver;
-use server::archiver::s3::S3Archiver;
-use server::configs::server::S3ArchiverConfig;
-
-#[tokio::test]
-async fn should_not_be_initialized_given_invalid_configuration() {
- let config = S3ArchiverConfig {
- key_id: "test".to_owned(),
- key_secret: "secret".to_owned(),
- bucket: "iggy".to_owned(),
- endpoint: Some("https://iggy.s3.com".to_owned()),
- region: None,
- tmp_upload_dir: "tmp".to_owned(),
- };
- let archiver = S3Archiver::new(config);
- assert!(archiver.is_ok());
- let archiver = archiver.unwrap();
- let init = archiver.init().await;
- assert!(init.is_err());
-}
diff --git a/core/integration/tests/state/mod.rs
b/core/integration/tests/state/mod.rs
index 2071587f..fe496abd 100644
--- a/core/integration/tests/state/mod.rs
+++ b/core/integration/tests/state/mod.rs
@@ -16,6 +16,7 @@
* under the License.
*/
+use compio::fs::create_dir;
use iggy::prelude::{Aes256GcmEncryptor, EncryptorKind};
use server::bootstrap::create_directories;
use server::state::file::FileState;
@@ -24,7 +25,6 @@ use server::streaming::utils::file::overwrite;
use server::versioning::SemanticVersion;
use std::str::FromStr;
use std::sync::Arc;
-use compio::fs::create_dir;
use uuid::Uuid;
mod file;
diff --git a/core/integration/tests/streaming/consumer_offset.rs
b/core/integration/tests/streaming/consumer_offset.rs
index 84fc9749..db89169b 100644
--- a/core/integration/tests/streaming/consumer_offset.rs
+++ b/core/integration/tests/streaming/consumer_offset.rs
@@ -17,12 +17,12 @@
*/
use crate::streaming::common::test_setup::TestSetup;
+use compio::fs;
use iggy::prelude::ConsumerKind;
use server::configs::system::SystemConfig;
use server::streaming::partitions::partition::ConsumerOffset;
use server::streaming::storage::PartitionStorageKind;
use std::sync::Arc;
-use compio::fs;
#[compio::test]
async fn should_persist_consumer_offsets_and_then_load_them_from_disk() {
diff --git a/core/integration/tests/streaming/partition.rs
b/core/integration/tests/streaming/partition.rs
index 828532f9..ac657e3b 100644
--- a/core/integration/tests/streaming/partition.rs
+++ b/core/integration/tests/streaming/partition.rs
@@ -18,13 +18,13 @@
use crate::streaming::common::test_setup::TestSetup;
use crate::streaming::create_messages;
+use compio::fs;
use iggy::prelude::{IggyExpiry, IggyTimestamp, Sizeable};
use server::state::system::PartitionState;
use server::streaming::partitions::partition::Partition;
use server::streaming::segments::*;
use std::sync::Arc;
use std::sync::atomic::{AtomicU32, AtomicU64};
-use compio::fs;
#[compio::test]
async fn should_persist_partition_with_segment() {
diff --git a/core/integration/tests/streaming/segment.rs
b/core/integration/tests/streaming/segment.rs
index 0ee68893..4e097b8f 100644
--- a/core/integration/tests/streaming/segment.rs
+++ b/core/integration/tests/streaming/segment.rs
@@ -347,7 +347,7 @@ async fn should_delete_persisted_segments() -> Result<(),
Box<dyn std::error::Er
shard.add_active_session(session.clone());
let id = shard
- .create_stream_bypass_auth( Some(stream_id.get_u32_value()?),
stream_name)
+ .create_stream_bypass_auth(Some(stream_id.get_u32_value()?),
stream_name)
.unwrap();
let stream = shard.get_stream(&Identifier::numeric(id).unwrap()).unwrap();
stream.persist();
diff --git a/core/integration/tests/streaming/stream.rs
b/core/integration/tests/streaming/stream.rs
index ff360632..8d7b7cb8 100644
--- a/core/integration/tests/streaming/stream.rs
+++ b/core/integration/tests/streaming/stream.rs
@@ -19,12 +19,12 @@
use crate::streaming::common::test_setup::TestSetup;
use crate::streaming::create_messages;
use ahash::AHashMap;
+use compio::fs;
use iggy::prelude::*;
use server::state::system::StreamState;
use server::streaming::polling_consumer::PollingConsumer;
use server::streaming::segments::IggyMessagesBatchMut;
use server::streaming::streams::stream::Stream;
-use compio::fs;
#[compio::test]
async fn should_persist_stream_with_topics_directory_and_info_file() {
diff --git a/core/integration/tests/streaming/topic.rs
b/core/integration/tests/streaming/topic.rs
index 0e8812f9..37c4560b 100644
--- a/core/integration/tests/streaming/topic.rs
+++ b/core/integration/tests/streaming/topic.rs
@@ -19,6 +19,7 @@
use crate::streaming::common::test_setup::TestSetup;
use crate::streaming::create_messages;
use ahash::AHashMap;
+use compio::fs;
use iggy::prelude::*;
use server::state::system::{PartitionState, TopicState};
use server::streaming::polling_consumer::PollingConsumer;
@@ -27,7 +28,6 @@ use server::streaming::topics::topic::Topic;
use std::default::Default;
use std::sync::Arc;
use std::sync::atomic::{AtomicU32, AtomicU64};
-use compio::fs;
#[compio::test]
async fn should_persist_topics_with_partitions_directories_and_info_file() {
diff --git a/core/server/Cargo.toml b/core/server/Cargo.toml
index b171652d..fa7fc48f 100644
--- a/core/server/Cargo.toml
+++ b/core/server/Cargo.toml
@@ -26,7 +26,7 @@ license = "Apache-2.0"
normal = ["tracing-appender"]
[package.metadata.cargo-machete]
-ignored = ["rust-s3"]
+ignored = ["rusty-s3"]
[features]
default = ["mimalloc"]
@@ -94,7 +94,6 @@ prometheus-client = "0.23.1"
quinn = { workspace = true }
reqwest = { workspace = true, features = ["rustls-tls-no-provider"] }
ring = "0.17.14"
-rust-s3 = { workspace = true }
rustls = { workspace = true }
rustls-pemfile = "2.2.0"
serde = { workspace = true }
@@ -107,6 +106,7 @@ tempfile = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
compio = { workspace = true }
+cyper = { workspace = true }
tokio-rustls = { workspace = true }
tokio-util = { workspace = true }
toml = { workspace = true }
@@ -118,6 +118,7 @@ tracing-subscriber = { workspace = true }
twox-hash = { workspace = true }
ulid = "1.2.1"
uuid = { workspace = true }
+rusty-s3 = "0.7.0"
[build-dependencies]
figment = { version = "0.10.19", features = ["json", "toml", "env"] }
diff --git a/core/server/src/archiver/disk.rs b/core/server/src/archiver/disk.rs
index 0eafa20d..8873fef1 100644
--- a/core/server/src/archiver/disk.rs
+++ b/core/server/src/archiver/disk.rs
@@ -19,10 +19,12 @@
use crate::archiver::{Archiver, COMPONENT};
use crate::configs::server::DiskArchiverConfig;
use crate::server_error::ArchiverError;
+use crate::streaming::utils::file;
+use compio::fs;
+use compio::io::copy;
use error_set::ErrContext;
use std::path::Path;
-use tokio::fs;
-use tracing::{debug, info};
+use tracing::{debug, error, info};
#[derive(Debug)]
pub struct DiskArchiver {
@@ -70,8 +72,7 @@ impl Archiver for DiskArchiver {
files: &[&str],
base_directory: Option<String>,
) -> Result<(), ArchiverError> {
- //TODO: Fixme figure this out, we can't use tokio methods there.
- /* debug!("Archiving files on disk: {:?}", files);
+ debug!("Archiving files on disk: {:?}", files);
for file in files {
debug!("Archiving file: {file}");
let source = Path::new(file);
@@ -89,12 +90,15 @@ impl Archiver for DiskArchiver {
.with_error_context(|error| {
format!("{COMPONENT} (error: {error}) - failed to create
file: {file} at path: {destination_path}",)
})?;
- fs::copy(source, destination).await.with_error_context(|error| {
+ let source = file::open(file).await.unwrap();
+ let mut source = std::io::Cursor::new(source);
+ let (destination, _) =
file::append(&destination_path).await.unwrap();
+ let mut destination = std::io::Cursor::new(destination);
+ copy(&mut source, &mut
destination).await.with_error_context(|error| {
format!("{COMPONENT} (error: {error}) - failed to copy file:
{file} to destination: {destination_path}")
})?;
debug!("Archived file: {file} at: {destination_path}");
}
- */
Ok(())
}
}
diff --git a/core/server/src/archiver/mod.rs b/core/server/src/archiver/mod.rs
index 6d61dcad..6a6f6f50 100644
--- a/core/server/src/archiver/mod.rs
+++ b/core/server/src/archiver/mod.rs
@@ -30,6 +30,11 @@ use std::str::FromStr;
use crate::archiver::disk::DiskArchiver;
use crate::archiver::s3::S3Archiver;
+pub(crate) struct PutObjectStreamResponse {
+ status: u16,
+ total_size: usize,
+}
+pub(crate) type Region = String;
pub const COMPONENT: &str = "ARCHIVER";
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Default, Display, Copy,
Clone)]
diff --git a/core/server/src/archiver/s3.rs b/core/server/src/archiver/s3.rs
index bbc9cd3b..4b97a614 100644
--- a/core/server/src/archiver/s3.rs
+++ b/core/server/src/archiver/s3.rs
@@ -16,23 +16,39 @@
* under the License.
*/
-use crate::archiver::{Archiver, COMPONENT};
+use crate::archiver::{Archiver, COMPONENT, PutObjectStreamResponse, Region};
use crate::configs::server::S3ArchiverConfig;
+use crate::io;
use crate::server_error::ArchiverError;
-use crate::streaming::utils::file;
+use crate::streaming::utils::{PooledBuffer, file};
+use bytes::BytesMut;
+use compio::buf::{IntoInner, IoBuf};
+use compio::fs;
+use compio::io::{AsyncRead, AsyncReadExt, copy};
+use cyper::{Client, ClientBuilder};
use error_set::ErrContext;
-use s3::creds::Credentials;
-use s3::{Bucket, Region};
+use futures::future::try_join_all;
+use rusty_s3::actions::{CompleteMultipartUpload, CreateMultipartUpload,
GetObject, UploadPart};
+use rusty_s3::{Bucket, Credentials, S3Action, UrlStyle};
+use std::collections::HashMap;
+use std::io::Cursor;
+use std::ops::Deref;
use std::path::Path;
-use tokio::fs;
+use std::sync::Arc;
+use std::time::Duration;
use tracing::{debug, error, info};
#[derive(Debug)]
pub struct S3Archiver {
+ client: Client,
bucket: Bucket,
+ credentials: Credentials,
tmp_upload_dir: String,
+ expiration: Duration,
}
+pub const CHUNK_SIZE: usize = 8_388_608; // 8 Mebibytes, min is 5 (5_242_880);
+
impl S3Archiver {
/// Creates a new S3 archiver.
///
@@ -40,26 +56,25 @@ impl S3Archiver {
///
/// Returns an error if the S3 client cannot be initialized or credentials
are invalid.
pub fn new(config: S3ArchiverConfig) -> Result<Self, ArchiverError> {
- let credentials = Credentials::new(
- Some(&config.key_id),
- Some(&config.key_secret),
- None,
- None,
- None,
- )
- .map_err(|_| ArchiverError::InvalidS3Credentials)?;
-
- let bucket = Bucket::new(
- &config.bucket,
- Region::Custom {
- endpoint: config.endpoint.map_or_else(String::new, |e| e),
- region: config.region.map_or_else(String::new, |r| r),
- },
- credentials,
- )
- .map_err(|_| ArchiverError::CannotInitializeS3Archiver)?;
+ let credentials = Credentials::new(&config.key_id, &config.key_secret);
+ let region: Region = config.region.map_or_else(String::new, |r| r);
+ let endpoint = config
+ .endpoint
+ .map_or_else(String::new, |e| e)
+ .parse()
+ .expect("Endpoint should be valid URL");
+ let path_style = UrlStyle::VirtualHost;
+ let name = config.bucket;
+
+ let bucket = Bucket::new(endpoint, path_style, name, region)?;
+ //TODO: Make this configurable ?
+ let expiration = Duration::from_secs(60);
+ let client = ClientBuilder::new().use_rustls_default().build();
Ok(Self {
- bucket: *bucket,
+ bucket,
+ client,
+ credentials,
+ expiration,
tmp_upload_dir: config.tmp_upload_dir,
})
}
@@ -69,9 +84,10 @@ impl S3Archiver {
"Copying file: {path} to temporary S3 upload directory: {}",
self.tmp_upload_dir
);
- let source = Path::new(path);
+ let source = file::open(&path).await?;
+ let mut source = std::io::Cursor::new(source);
let destination = Path::new(&self.tmp_upload_dir).join(path);
- let destination_path =
destination.to_str().unwrap_or_default().to_owned();
+ let destination_path = self.tmp_upload_dir.to_owned();
debug!("Creating temporary S3 upload directory: {destination_path}");
fs::create_dir_all(destination.parent().expect("Path should have a
parent directory"))
.await
@@ -80,18 +96,123 @@ impl S3Archiver {
"{COMPONENT} (error: {error}) - failed to create temporary
S3 upload directory for path: {destination_path}"
)
})?;
+ let destination = file::open(&self.tmp_upload_dir).await?;
+ let mut destination = std::io::Cursor::new(destination);
debug!("Copying file: {path} to temporary S3 upload path:
{destination_path}");
- fs::copy(source, &destination).await.with_error_context(|error| {
+ copy(&mut source, &mut destination).await.with_error_context(|error| {
format!("{COMPONENT} (error: {error}) - failed to copy file:
{path} to temporary S3 upload path: {destination_path}")
})?;
debug!("File: {path} copied to temporary S3 upload path:
{destination_path}");
Ok(destination_path)
}
+
+ async fn put_object_stream(
+ &self,
+ reader: &mut impl AsyncReadExt,
+ destination_path: &str,
+ ) -> Result<PutObjectStreamResponse, ArchiverError> {
+ let buf = BytesMut::with_capacity(CHUNK_SIZE);
+ let (result, chunk) =
reader.read_exact(buf.slice(..CHUNK_SIZE)).await.into();
+ result?;
+ let buf = chunk.into_inner().freeze();
+ if buf.len() < CHUNK_SIZE {
+ // Normal upload
+ let action = self
+ .bucket
+ .put_object(Some(&self.credentials), destination_path);
+ let url = action.sign(self.expiration);
+ let buf_len = buf.len();
+ let response = self.client.put(url)?.body(buf).send().await?;
+ return Ok(PutObjectStreamResponse {
+ status: response.status().as_u16(),
+ total_size: buf_len,
+ });
+ }
+
+ let action = self
+ .bucket
+ .create_multipart_upload(Some(&self.credentials),
destination_path);
+ let url = action.sign(self.expiration);
+ let response = self.client.post(url)?.send().await?;
+ let body = response.text().await?;
+ let response =
+ CreateMultipartUpload::parse_response(&body).expect("Failed to
parse response");
+ let upload_id = response.upload_id();
+
+ let mut total_size = 0;
+ let mut part_number = 0;
+ let mut handles = Vec::new();
+ let action = self.bucket.upload_part(
+ Some(&self.credentials),
+ destination_path,
+ part_number,
+ upload_id,
+ );
+ let url = action.sign(self.expiration);
+ let buf_size = buf.len();
+ let response = self.client.put(url)?.body(buf).send();
+ total_size += buf_size;
+ part_number += 1;
+ handles.push(response);
+ loop {
+ let buf = BytesMut::with_capacity(CHUNK_SIZE);
+ let (result, chunk) =
reader.read_exact(buf.slice(..CHUNK_SIZE)).await.into();
+ result?;
+ let buf = chunk.into_inner().freeze();
+ let buf_len = buf.len();
+ let done = buf_len < CHUNK_SIZE;
+ let action = self.bucket.upload_part(
+ Some(&self.credentials),
+ destination_path,
+ part_number,
+ upload_id,
+ );
+ let url = action.sign(self.expiration);
+ let response = self.client.put(url)?.body(buf).send();
+ part_number += 1;
+ total_size += buf_len;
+
+ handles.push(response);
+ if done {
+ break;
+ }
+ }
+ let responses = try_join_all(handles).await?;
+ let mut etags = Vec::new();
+ for response in responses {
+ let status_code = response.status().as_u16();
+ if status_code == 200 {
+ let etag = response
+ .headers()
+ .get("etag")
+ .unwrap()
+ .to_str()
+ .unwrap()
+ .to_owned();
+ etags.push(etag);
+ }
+ }
+ let action = self.bucket.complete_multipart_upload(
+ Some(&self.credentials),
+ destination_path,
+ upload_id,
+ etags.iter().map(|s| s.as_str()),
+ );
+ let url = action.sign(self.expiration);
+ let body = CompleteMultipartUpload::body(action);
+ let response = self.client.post(url)?.body(body).send().await?;
+ Ok(PutObjectStreamResponse {
+ status: response.status().as_u16(),
+ total_size,
+ })
+ }
}
impl Archiver for S3Archiver {
async fn init(&self) -> Result<(), ArchiverError> {
- let response = self.bucket.list("/".to_string(), None).await;
+ let action = self.bucket.list_objects_v2(Some(&self.credentials));
+ let url = action.sign(self.expiration);
+ let response = self.client.get(url)?.send().await;
if let Err(error) = response {
error!("Cannot initialize S3 archiver: {error}");
return Err(ArchiverError::CannotInitializeS3Archiver);
@@ -102,7 +223,7 @@ impl Archiver for S3Archiver {
"Removing existing S3 archiver temporary upload directory: {}",
self.tmp_upload_dir
);
- fs::remove_dir_all(&self.tmp_upload_dir).await?;
+ io::fs_utils::remove_dir_all(&self.tmp_upload_dir).await?;
}
info!(
"Creating S3 archiver temporary upload directory: {}",
@@ -121,14 +242,19 @@ impl Archiver for S3Archiver {
let base_directory = base_directory.as_deref().unwrap_or_default();
let destination = Path::new(&base_directory).join(file);
let destination_path =
destination.to_str().unwrap_or_default().to_owned();
- let response = self.bucket.get_object_tagging(destination_path).await;
+ let mut object_tagging = self
+ .bucket
+ .get_object(Some(&self.credentials), &destination_path);
+ object_tagging.query_mut().insert("tagging", "");
+ let url = object_tagging.sign(self.expiration);
+ let response = self.client.get(url)?.send().await;
if response.is_err() {
debug!("File: {file} is not archived on S3.");
return Ok(false);
}
- let (_, status) = response.expect("Response should be valid if not an
error");
- if status == 200 {
+ let response = response.expect("Response should be valid if not an
error");
+ if response.status() == 200 {
debug!("File: {file} is archived on S3.");
return Ok(true);
}
@@ -151,19 +277,15 @@ impl Archiver for S3Archiver {
let source = self.copy_file_to_tmp(path).await?;
debug!("Archiving file: {source} on S3.");
- let mut file = file::open(&source)
+ let file = file::open(&source)
.await
.with_error_context(|error| format!("{COMPONENT} (error:
{error}) - failed to open source file: {source} for archiving"))?;
+ let mut reader = std::io::Cursor::new(file);
let base_directory = base_directory.as_deref().unwrap_or_default();
let destination = Path::new(&base_directory).join(path);
let destination_path =
destination.to_str().unwrap_or_default().to_owned();
- // TODO: Fixme figure this out.
- // The `put_object_stream` method requires `AsyncRead` trait from
tokio as its reader.
- /*
- let response = self
- .bucket
- .put_object_stream(&mut file, destination_path)
- .await;
+ // Egh.. multi part upload.
+ let response = self.put_object_stream(&mut reader,
&destination_path).await;
if let Err(error) = response {
error!("Cannot archive file: {path} on S3: {}", error);
fs::remove_file(&source).await.with_error_context(|error| {
@@ -173,9 +295,7 @@ impl Archiver for S3Archiver {
file_path: (*path).to_string(),
});
}
-
- let response = response.expect("Response should be valid if not an
error");
- let status = response.status_code();
+ let PutObjectStreamResponse { status, total_size } =
response.unwrap();
if status == 200 {
debug!("Archived file: {path} on S3.");
fs::remove_file(&source).await.with_error_context(|error| {
@@ -191,7 +311,6 @@ impl Archiver for S3Archiver {
return Err(ArchiverError::CannotArchiveFile {
file_path: (*path).to_string(),
});
- */
}
Ok(())
}
diff --git a/core/server/src/channels/commands/mod.rs
b/core/server/src/channels/commands/mod.rs
index 464277bb..13b1bf91 100644
--- a/core/server/src/channels/commands/mod.rs
+++ b/core/server/src/channels/commands/mod.rs
@@ -18,7 +18,6 @@
pub mod archive_state;
pub mod clean_personal_access_tokens;
-pub mod maintain_messages;
pub mod print_sysinfo;
pub mod save_messages;
pub mod verify_heartbeats;
diff --git a/core/server/src/channels/handler.rs
b/core/server/src/channels/handler.rs
index 7612a259..ee44fd58 100644
--- a/core/server/src/channels/handler.rs
+++ b/core/server/src/channels/handler.rs
@@ -1,3 +1,5 @@
+use std::rc::Rc;
+
/* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -15,21 +17,17 @@
* specific language governing permissions and limitations
* under the License.
*/
-
-//TODO: Fixme
-/*
use super::server_command::BackgroundServerCommand;
-use crate::configs::server::ServerConfig;
-use crate::streaming::systems::system::SharedSystem;
+use crate::{configs::server::ServerConfig, shard::IggyShard};
pub struct BackgroundServerCommandHandler<'a> {
- system: SharedSystem,
+ shard: Rc<IggyShard>,
config: &'a ServerConfig,
}
impl<'a> BackgroundServerCommandHandler<'a> {
- pub fn new(system: SharedSystem, config: &'a ServerConfig) -> Self {
- Self { system, config }
+ pub fn new(shard: Rc<IggyShard>, config: &'a ServerConfig) -> Self {
+ Self { shard, config }
}
pub fn install_handler<C, E>(&mut self, mut executor: E) -> Self
@@ -37,14 +35,12 @@ impl<'a> BackgroundServerCommandHandler<'a> {
E: BackgroundServerCommand<C> + Send + Sync + 'static,
{
let (sender, receiver) = flume::unbounded();
- let system = self.system.clone();
- executor.start_command_sender(system.clone(), self.config, sender);
- executor.start_command_consumer(system.clone(), self.config, receiver);
+ let shard = self.shard.clone();
+ executor.start_command_sender(shard.clone(), self.config, sender);
+ executor.start_command_consumer(shard.clone(), self.config, receiver);
Self {
- system,
+ shard,
config: self.config,
}
}
}
-
-*/
diff --git a/core/server/src/channels/server_command.rs
b/core/server/src/channels/server_command.rs
index 16b0dc2f..ac490737 100644
--- a/core/server/src/channels/server_command.rs
+++ b/core/server/src/channels/server_command.rs
@@ -16,29 +16,24 @@
* under the License.
*/
-//TODO: Fixme
-/*
-use crate::configs::server::ServerConfig;
-use crate::streaming::systems::system::SharedSystem;
+use crate::{configs::server::ServerConfig, shard::IggyShard};
use flume::{Receiver, Sender};
-use std::future::Future;
+use std::{future::Future, rc::Rc};
pub trait BackgroundServerCommand<C> {
- fn execute(&mut self, system: &SharedSystem, command: C) -> impl
Future<Output = ()>;
+ fn execute(&mut self, system: &IggyShard, command: C) -> impl
Future<Output = ()>;
fn start_command_sender(
&mut self,
- system: SharedSystem,
+ shard: Rc<IggyShard>,
config: &ServerConfig,
sender: Sender<C>,
);
fn start_command_consumer(
self,
- system: SharedSystem,
+ shard: Rc<IggyShard>,
config: &ServerConfig,
receiver: Receiver<C>,
);
}
-
-*/
diff --git a/core/server/src/main.rs b/core/server/src/main.rs
index 2971347e..b1481c22 100644
--- a/core/server/src/main.rs
+++ b/core/server/src/main.rs
@@ -19,6 +19,7 @@
use std::collections::HashSet;
use std::rc::Rc;
use std::sync::Arc;
+use std::sync::atomic::AtomicBool;
use anyhow::Result;
use clap::Parser;
@@ -28,6 +29,7 @@ use figlet_rs::FIGfont;
use iggy_common::create_user::CreateUser;
use iggy_common::defaults::DEFAULT_ROOT_USER_ID;
use iggy_common::{Aes256GcmEncryptor, EncryptorKind, IggyError};
+use server::archiver::{ArchiverKind, ArchiverKindType};
use server::args::Args;
use server::bootstrap::{
create_directories, create_root_user, create_shard_connections,
create_shard_executor,
@@ -172,6 +174,30 @@ fn main() -> Result<(), ServerError> {
)),
false => None,
};
+ let archiver_config = &config.data_maintenance.archiver;
+ let archiver: Option<ArchiverKind> = if
archiver_config.enabled {
+ info!("Archiving is enabled, kind: {}",
archiver_config.kind);
+ match archiver_config.kind {
+ ArchiverKindType::Disk =>
Some(ArchiverKind::get_disk_archiver(
+ archiver_config
+ .disk
+ .clone()
+ .expect("Disk archiver config is missing"),
+ )),
+ ArchiverKindType::S3 => Some(
+ ArchiverKind::get_s3_archiver(
+ archiver_config
+ .s3
+ .clone()
+ .expect("S3 archiver config is
missing"),
+ )
+ .expect("Failed to create S3 archiver"),
+ ),
+ }
+ } else {
+ info!("Archiving is disabled.");
+ None
+ };
let state = StateKind::File(FileState::new(
&config.system.get_state_messages_file_path(),
@@ -251,6 +277,7 @@ fn main() -> Result<(), ServerError> {
.id(id)
.connections(connections)
.config(config)
+ .archiver(archiver)
.encryptor(encryptor)
.version(version)
.state(state)
diff --git a/core/server/src/server_error.rs b/core/server/src/server_error.rs
index 16e2ef92..ca29a627 100644
--- a/core/server/src/server_error.rs
+++ b/core/server/src/server_error.rs
@@ -18,6 +18,7 @@
use error_set::error_set;
use quinn::{ConnectionError as QuicConnectionError, ReadToEndError,
WriteError};
+use rusty_s3::BucketError;
use std::array::TryFromSliceError;
use tokio::io;
@@ -45,6 +46,7 @@ error_set!(
#[display("Invalid configuration")]
InvalidConfiguration,
+
#[display("Cache config validation failure")]
CacheConfigValidationFailure,
};
@@ -59,6 +61,12 @@ error_set!(
#[display("Invalid S3 credentials")]
InvalidS3Credentials,
+ #[display("HTTP request error: {0}")]
+ CyperError(cyper::Error),
+
+ #[display("Invalid S3 Bucket configuration")]
+ BucketError(BucketError),
+
#[display("Cannot archive file: {}", file_path)]
CannotArchiveFile { file_path: String },
} || IoError;
diff --git a/core/server/src/shard/builder.rs b/core/server/src/shard/builder.rs
index eb851271..c49cdeb6 100644
--- a/core/server/src/shard/builder.rs
+++ b/core/server/src/shard/builder.rs
@@ -26,6 +26,7 @@ use iggy_common::{Aes256GcmEncryptor, EncryptorKind};
use tracing::info;
use crate::{
+ archiver::{Archiver, ArchiverKind},
bootstrap::resolve_persister,
configs::server::ServerConfig,
map_toggle_str,
@@ -44,6 +45,7 @@ pub struct IggyShardBuilder {
config: Option<ServerConfig>,
encryptor: Option<EncryptorKind>,
version: Option<SemanticVersion>,
+ archiver: Option<ArchiverKind>,
state: Option<StateKind>,
}
@@ -78,6 +80,11 @@ impl IggyShardBuilder {
self
}
+ pub fn archiver(mut self, archiver: Option<ArchiverKind>) -> Self {
+ self.archiver = archiver;
+ self
+ }
+
// TODO: Too much happens in there, some of those bootstrapping logic
should be moved outside.
pub fn build(self) -> IggyShard {
let id = self.id.unwrap();
@@ -105,6 +112,7 @@ impl IggyShardBuilder {
config.system.clone(),
partition_persister,
));
+ let archiver = self.archiver.map(Rc::new);
IggyShard {
id: id,
@@ -112,6 +120,7 @@ impl IggyShardBuilder {
shards_table: Default::default(),
storage: storage,
encryptor: encryptor,
+ archiver: archiver,
state: state,
config: config,
version: version,
diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs
index b884a92d..43280c77 100644
--- a/core/server/src/shard/mod.rs
+++ b/core/server/src/shard/mod.rs
@@ -50,12 +50,16 @@ use tracing::{error, info, instrument, warn};
use transmission::connector::{Receiver, ShardConnector, StopReceiver,
StopSender};
use crate::{
+ archiver::ArchiverKind,
configs::server::ServerConfig,
io::fs_utils,
shard::{
system::info::SystemInfo,
task_registry::TaskRegistry,
- tasks::messages::spawn_shard_message_task,
+ tasks::{
+ auxilary::maintain_messages::spawn_message_maintainance_task,
+ messages::spawn_shard_message_task,
+ },
transmission::{
event::ShardEvent,
frame::{ShardFrame, ShardResponse},
@@ -141,6 +145,7 @@ pub struct IggyShard {
pub(crate) state: StateKind,
pub(crate) encryptor: Option<EncryptorKind>,
+ pub(crate) archiver: Option<Rc<ArchiverKind>>,
pub(crate) config: ServerConfig,
//TODO: This could be shared.
pub(crate) client_manager: RefCell<ClientManager>,
@@ -182,8 +187,8 @@ impl IggyShard {
let (stop_sender, stop_receiver) = async_channel::unbounded();
let shard = Self {
- id: 0, // Default shard ID
- shards: Vec::new(), // No other shards in default config
+ id: 0,
+ shards: Vec::new(),
shards_table: Default::default(),
version,
streams: Default::default(),
@@ -191,6 +196,7 @@ impl IggyShard {
storage,
state,
encryptor: None,
+ archiver: None,
config: server_config,
client_manager: Default::default(),
active_sessions: Default::default(),
@@ -223,15 +229,12 @@ impl IggyShard {
let _ = self.load_users(users.into_values().collect()).await;
let _ = self.load_streams(streams.into_values().collect()).await;
- //TODO: Fix the archiver.
- /*
if let Some(archiver) = self.archiver.as_ref() {
archiver
.init()
.await
.expect("Failed to initialize archiver");
}
- */
info!("Initialized system in {} ms.", now.elapsed().as_millis());
Ok(())
}
@@ -248,6 +251,7 @@ impl IggyShard {
// Create all tasks (tcp listener, http listener, command processor,
in the future also the background jobs).
let mut tasks: Vec<Task> =
vec![Box::pin(spawn_shard_message_task(self.clone()))];
+ tasks.push(Box::pin(spawn_message_maintainance_task(self.clone())));
if self.config.tcp.enabled {
tasks.push(Box::pin(spawn_tcp_server(self.clone())));
}
diff --git a/core/server/src/channels/commands/maintain_messages.rs
b/core/server/src/shard/tasks/auxilary/maintain_messages.rs
similarity index 59%
rename from core/server/src/channels/commands/maintain_messages.rs
rename to core/server/src/shard/tasks/auxilary/maintain_messages.rs
index 77a60999..e3edea69 100644
--- a/core/server/src/channels/commands/maintain_messages.rs
+++ b/core/server/src/shard/tasks/auxilary/maintain_messages.rs
@@ -1,222 +1,124 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-//TODO: Fixme
-/*
-
-use crate::archiver::ArchiverKind;
-use crate::channels::server_command::BackgroundServerCommand;
-use crate::configs::server::MessagesMaintenanceConfig;
-use crate::map_toggle_str;
-use crate::streaming::systems::system::SharedSystem;
-use crate::streaming::topics::topic::Topic;
+use crate::{
+ archiver::ArchiverKind, configs::server::MessagesMaintenanceConfig,
map_toggle_str,
+ shard::IggyShard, streaming::topics::topic::Topic,
+};
+use compio::time;
use error_set::ErrContext;
-use flume::Sender;
-use iggy_common::IggyDuration;
-use iggy_common::IggyError;
-use iggy_common::IggyTimestamp;
-use iggy_common::locking::IggyRwLockFn;
-use std::sync::Arc;
-use tokio::time;
-use tracing::{debug, error, info, instrument, trace};
-
-pub struct MessagesMaintainer {
- cleaner_enabled: bool,
- archiver_enabled: bool,
- interval: IggyDuration,
- sender: Sender<MaintainMessagesCommand>,
-}
-
-#[derive(Debug, Default, Clone)]
-pub struct MaintainMessagesCommand {
- clean_messages: bool,
- archive_messages: bool,
-}
-
-#[derive(Debug, Default, Clone)]
-pub struct MaintainMessagesExecutor;
-
-impl MessagesMaintainer {
- pub fn new(
- config: &MessagesMaintenanceConfig,
- sender: Sender<MaintainMessagesCommand>,
- ) -> Self {
- Self {
- cleaner_enabled: config.cleaner_enabled,
- archiver_enabled: config.archiver_enabled,
- interval: config.interval,
- sender,
- }
+use futures::FutureExt;
+use iggy_common::{IggyDuration, IggyError, IggyTimestamp,
locking::IggyRwLockFn};
+use std::{rc::Rc, time::Duration};
+use tracing::{debug, error, info, trace};
+
+pub async fn spawn_message_maintainance_task(shard: Rc<IggyShard>) ->
Result<(), IggyError> {
+ let config = &shard.config.data_maintenance.messages;
+ if !config.cleaner_enabled && !config.archiver_enabled {
+ info!("Messages maintainer is disabled.");
+ return Ok(());
}
- pub fn start(&self) {
- if !self.cleaner_enabled && !self.archiver_enabled {
- info!("Messages maintainer is disabled.");
- return;
- }
-
- let interval = self.interval;
- let sender = self.sender.clone();
- info!(
- "Message maintainer, cleaner is {}, archiver is {}, interval:
{interval}",
- map_toggle_str(self.cleaner_enabled),
- map_toggle_str(self.archiver_enabled)
- );
- let clean_messages = self.cleaner_enabled;
- let archive_messages = self.archiver_enabled;
- tokio::spawn(async move {
- let mut interval_timer = time::interval(interval.get_duration());
+ let interval = config.interval;
+ info!(
+ "Message maintainer, cleaner is {}, archiver is {}, interval:
{interval}",
+ map_toggle_str(config.cleaner_enabled),
+ map_toggle_str(config.archiver_enabled)
+ );
+ let clean_messages = config.cleaner_enabled;
+ let mut interval_timer = time::interval(interval.get_duration());
+ loop {
+ let shutdown_check = async {
loop {
- interval_timer.tick().await;
- sender
- .send(MaintainMessagesCommand {
- clean_messages,
- archive_messages,
- })
- .unwrap_or_else(|err| {
- error!("Failed to send MaintainMessagesCommand. Error:
{}", err);
- });
- }
- });
- }
-}
-
-impl BackgroundServerCommand<MaintainMessagesCommand> for
MaintainMessagesExecutor {
- #[instrument(skip_all, name = "trace_maintain_messages")]
- async fn execute(&mut self, system: &SharedSystem, command:
MaintainMessagesCommand) {
- let system = system.read().await;
- let streams = system.get_streams();
- for stream in streams {
- let topics = stream.get_topics();
- for topic in topics {
- let archiver = if command.archive_messages {
- system.archiver.clone()
- } else {
- None
- };
- let expired_segments = handle_expired_segments(
- topic,
- archiver.clone(),
- system.config.segment.archive_expired,
- command.clean_messages,
- )
- .await;
- if expired_segments.is_err() {
- error!(
- "Failed to get expired segments for stream ID: {},
topic ID: {}",
- topic.stream_id, topic.topic_id
- );
- continue;
+ if shard.is_shutting_down() {
+ return;
}
-
- let oldest_segments = handle_oldest_segments(
- topic,
- archiver.clone(),
- system.config.topic.delete_oldest_segments,
- )
- .await;
- if oldest_segments.is_err() {
- error!(
- "Failed to get oldest segments for stream ID: {},
topic ID: {}",
- topic.stream_id, topic.topic_id
- );
- continue;
- }
-
- let deleted_expired_segments = expired_segments.unwrap();
- let deleted_oldest_segments = oldest_segments.unwrap();
- let deleted_segments = HandledSegments {
- segments_count: deleted_expired_segments.segments_count
- + deleted_oldest_segments.segments_count,
- messages_count: deleted_expired_segments.messages_count
- + deleted_oldest_segments.messages_count,
- };
-
- if deleted_segments.segments_count == 0 {
- trace!(
- "No segments were deleted for stream ID: {}, topic ID:
{}",
- topic.stream_id, topic.topic_id
- );
- continue;
- }
-
- info!(
- "Deleted {} segments and {} messages for stream ID: {},
topic ID: {}",
- deleted_segments.segments_count,
- deleted_segments.messages_count,
- topic.stream_id,
- topic.topic_id
- );
-
- system
- .metrics
- .decrement_segments(deleted_segments.segments_count);
- system
- .metrics
- .decrement_messages(deleted_segments.messages_count);
+ compio::time::sleep(Duration::from_millis(100)).await;
}
- }
- }
+ };
+ let archiver = shard.archiver.clone();
+ let fut = interval_timer.tick();
+
+ futures::select! {
+ _ = shutdown_check.fuse() => {
+ info!("Shard {} message maintainer shutting down", shard.id);
+ break;
+ }
+ _ = fut.fuse() => {
+ let streams = shard.get_streams();
+ for stream in streams {
+ let topics = stream.get_topics();
+ for topic in topics {
+ let expired_segments = handle_expired_segments(
+ topic,
+ archiver.clone(),
+ shard.config.system.segment.archive_expired,
+ clean_messages,
+ )
+ .await;
+ if expired_segments.is_err() {
+ error!(
+ "Failed to get expired segments for stream ID:
{}, topic ID: {}",
+ topic.stream_id, topic.topic_id
+ );
+ continue;
+ }
+
+ let stream_id = stream.stream_id;
+ let oldest_segments = handle_oldest_segments(
+ stream_id,
+ topic,
+ archiver.clone(),
+ shard.config.system.topic.delete_oldest_segments,
+ )
+ .await;
+ if oldest_segments.is_err() {
+ error!(
+ "Failed to get oldest segments for stream ID:
{}, topic ID: {}",
+ topic.stream_id, topic.topic_id
+ );
+ continue;
+ }
+
+ let deleted_expired_segments =
expired_segments.unwrap();
+ let deleted_oldest_segments = oldest_segments.unwrap();
+ let deleted_segments = HandledSegments {
+ segments_count:
deleted_expired_segments.segments_count
+ + deleted_oldest_segments.segments_count,
+ messages_count:
deleted_expired_segments.messages_count
+ + deleted_oldest_segments.messages_count,
+ };
+
+ if deleted_segments.segments_count == 0 {
+ trace!(
+ "No segments were deleted for stream ID: {},
topic ID: {}",
+ topic.stream_id, topic.topic_id
+ );
+ continue;
+ }
+
+ info!(
+ "Deleted {} segments and {} messages for stream
ID: {}, topic ID: {}",
+ deleted_segments.segments_count,
+ deleted_segments.messages_count,
+ topic.stream_id,
+ topic.topic_id
+ );
- fn start_command_sender(
- &mut self,
- _system: SharedSystem,
- config: &crate::configs::server::ServerConfig,
- sender: Sender<MaintainMessagesCommand>,
- ) {
- if (!config.data_maintenance.archiver.enabled
- || !config.data_maintenance.messages.archiver_enabled)
- && !config.data_maintenance.messages.cleaner_enabled
- {
- return;
+ shard
+ .metrics
+
.decrement_segments(deleted_segments.segments_count);
+ shard
+ .metrics
+
.decrement_messages(deleted_segments.messages_count);
+ }
+ }
}
-
- let messages_maintainer =
- MessagesMaintainer::new(&config.data_maintenance.messages, sender);
- messages_maintainer.start();
- }
-
- fn start_command_consumer(
- mut self,
- system: SharedSystem,
- config: &crate::configs::server::ServerConfig,
- receiver: flume::Receiver<MaintainMessagesCommand>,
- ) {
- if (!config.data_maintenance.archiver.enabled
- || !config.data_maintenance.messages.archiver_enabled)
- && !config.data_maintenance.messages.cleaner_enabled
- {
- return;
}
-
- tokio::spawn(async move {
- let system = system.clone();
- while let Ok(command) = receiver.recv_async().await {
- self.execute(&system, command).await;
- }
- info!("Messages maintainer receiver stopped.");
- });
}
+ Ok(())
}
async fn handle_expired_segments(
topic: &Topic,
- archiver: Option<Arc<ArchiverKind>>,
+ archiver: Option<Rc<ArchiverKind>>,
archive: bool,
clean: bool,
) -> Result<HandledSegments, IggyError> {
@@ -287,8 +189,9 @@ async fn get_expired_segments(topic: &Topic, now:
IggyTimestamp) -> Vec<Segments
}
async fn handle_oldest_segments(
+ stream_id: u32,
topic: &Topic,
- archiver: Option<Arc<ArchiverKind>>,
+ archiver: Option<Rc<ArchiverKind>>,
delete_oldest_segments: bool,
) -> Result<HandledSegments, IggyError> {
if let Some(archiver) = archiver {
@@ -296,11 +199,7 @@ async fn handle_oldest_segments(
for partition in topic.partitions.values() {
let mut start_offsets = Vec::new();
let partition = partition.read().await;
- for segment in partition.get_segments() {
- if !segment.is_closed() {
- continue;
- }
-
+ for segment in partition.get_segments().iter().filter(|s|
s.is_closed()) {
let is_archived =
archiver.is_archived(segment.index_file_path(), None).await;
if is_archived.is_err() {
error!(
@@ -325,6 +224,7 @@ async fn handle_oldest_segments(
start_offsets.push(segment.start_offset());
}
}
+
if !start_offsets.is_empty() {
info!(
"Found {} segments to archive for stream ID: {}, topic ID:
{}, partition ID: {}",
@@ -340,11 +240,13 @@ async fn handle_oldest_segments(
}
}
+ let segments_count = segments_to_archive
+ .iter()
+ .map(|s| s.start_offsets.len())
+ .sum::<usize>();
info!(
"Archiving {} oldest segments for stream ID: {}, topic ID: {}...",
- segments_to_archive.len(),
- topic.stream_id,
- topic.topic_id,
+ segments_count, topic.stream_id, topic.topic_id,
);
archive_segments(topic, &segments_to_archive, archiver.clone())
.await
@@ -446,17 +348,19 @@ impl HandledSegments {
async fn archive_segments(
topic: &Topic,
segments_to_archive: &[SegmentsToHandle],
- archiver: Arc<ArchiverKind>,
+ archiver: Rc<ArchiverKind>,
) -> Result<u64, IggyError> {
if segments_to_archive.is_empty() {
return Ok(0);
}
+ let segments_count = segments_to_archive
+ .iter()
+ .map(|s| s.start_offsets.len())
+ .sum::<usize>();
info!(
"Found {} segments to archive for stream ID: {}, topic ID: {},
archiving...",
- segments_to_archive.len(),
- topic.stream_id,
- topic.topic_id
+ segments_count, topic.stream_id, topic.topic_id
);
let mut archived_segments = 0;
@@ -556,5 +460,3 @@ async fn delete_segments(
messages_count,
})
}
-
-*/
diff --git a/core/server/src/shard/tasks/auxilary/mod.rs
b/core/server/src/shard/tasks/auxilary/mod.rs
new file mode 100644
index 00000000..944cab55
--- /dev/null
+++ b/core/server/src/shard/tasks/auxilary/mod.rs
@@ -0,0 +1 @@
+pub mod maintain_messages;
diff --git a/core/server/src/shard/tasks/mod.rs
b/core/server/src/shard/tasks/mod.rs
index ba63992f..a5ec47da 100644
--- a/core/server/src/shard/tasks/mod.rs
+++ b/core/server/src/shard/tasks/mod.rs
@@ -1 +1,2 @@
+pub mod auxilary;
pub mod messages;
diff --git a/core/server/src/streaming/partitions/persistence.rs
b/core/server/src/streaming/partitions/persistence.rs
index 824bbddf..e4ee2d9e 100644
--- a/core/server/src/streaming/partitions/persistence.rs
+++ b/core/server/src/streaming/partitions/persistence.rs
@@ -19,11 +19,11 @@
use crate::state::system::PartitionState;
use crate::streaming::partitions::COMPONENT;
use crate::streaming::partitions::partition::Partition;
+use compio::fs::create_dir_all;
use error_set::ErrContext;
use iggy_common::IggyError;
use std::path::Path;
use std::sync::atomic::Ordering;
-use compio::fs::create_dir_all;
use tracing::error;
impl Partition {
diff --git a/core/server/src/streaming/segments/writing_messages.rs
b/core/server/src/streaming/segments/writing_messages.rs
index df177bdf..e2b0fbb9 100644
--- a/core/server/src/streaming/segments/writing_messages.rs
+++ b/core/server/src/streaming/segments/writing_messages.rs
@@ -159,7 +159,9 @@ impl Segment {
}
self.shutdown_writing().await;
info!(
- "Closed segment with start offset: {}, end offset: {}, size:
{} for partition with ID: {}.",
+ "Closed segment for stream: {}, topic: {} with start offset:
{}, end offset: {}, size: {} for partition with ID: {}.",
+ self.stream_id,
+ self.topic_id,
self.start_offset,
self.end_offset,
self.get_messages_size(),
diff --git a/core/server/src/streaming/utils/file.rs
b/core/server/src/streaming/utils/file.rs
index de00aa5e..6f70363c 100644
--- a/core/server/src/streaming/utils/file.rs
+++ b/core/server/src/streaming/utils/file.rs
@@ -19,10 +19,6 @@
use compio::fs::{File, OpenOptions, remove_file};
use std::path::Path;
-pub fn open_std(path: &str) -> Result<std::fs::File, std::io::Error> {
- std::fs::OpenOptions::new().read(true).open(path)
-}
-
pub async fn open(path: &str) -> Result<File, std::io::Error> {
OpenOptions::new().read(true).open(path).await
}