This is an automated email from the ASF dual-hosted git repository. piotr pushed a commit to branch fix_rust_producer in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 0e62ea2d3ba9c90157b8ac7bf11e825879c12258 Author: spetz <[email protected]> AuthorDate: Tue Jan 27 07:54:58 2026 +0100 fix(sdk): improve high-level SDK direct and background producers --- Cargo.lock | 144 +++++++++++++++++----- Cargo.toml | 20 +-- DEPENDENCIES.md | 13 +- core/ai/mcp/Cargo.toml | 2 +- core/bench/dashboard/server/Cargo.toml | 2 +- core/common/Cargo.toml | 2 +- core/integration/Cargo.toml | 4 +- core/integration/src/test_mcp_server.rs | 1 + core/integration/tests/mcp/mod.rs | 5 +- core/integration/tests/sdk/producer/background.rs | 143 +++++++++++++++++++++ core/sdk/Cargo.toml | 3 +- core/sdk/src/clients/consumer.rs | 73 ++++++++++- core/sdk/src/clients/producer.rs | 44 +++---- core/sdk/src/clients/producer_config.rs | 44 ++++--- core/sdk/src/clients/producer_dispatcher.rs | 9 +- core/sdk/src/clients/producer_sharding.rs | 61 ++++++++- core/sdk/src/prelude.rs | 1 + core/server/Cargo.toml | 4 +- 18 files changed, 463 insertions(+), 112 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c8894b31d..06f488f72 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1093,7 +1093,7 @@ dependencies = [ "rand 0.9.2", "serde", "serde_json", - "sysinfo", + "sysinfo 0.38.0", "tracing", "uuid", ] @@ -1317,7 +1317,7 @@ version = "3.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "89ec27229c38ed0eb3c0feee3d2c1d6a4379ae44f418a29a658890e062d8f365" dependencies = [ - "darling 0.23.0", + "darling 0.20.11", "ident_case", "prettyplease", "proc-macro2", @@ -1665,7 +1665,7 @@ version = "3.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "faf9468729b8cbcea668e36183cb69d317348c2e08e994829fb56ebfdfbaac34" dependencies = [ - "windows-sys 0.61.2", + "windows-sys 0.48.0", ] [[package]] @@ -2224,7 +2224,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "73736a89c4aff73035ba2ed2e565061954da00d4970fc9ac25dcc85a2a20d790" dependencies = [ "dispatch2", - "nix", + "nix 0.30.1", "windows-sys 0.61.2", ] @@ -2665,7 +2665,7 @@ dependencies = [ "libc", "option-ext", "redox_users", - "windows-sys 0.61.2", + "windows-sys 0.59.0", ] [[package]] @@ -2969,7 +2969,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" dependencies = [ "libc", - "windows-sys 0.61.2", + "windows-sys 0.59.0", ] [[package]] @@ -4175,7 +4175,7 @@ dependencies = [ "libc", "percent-encoding", "pin-project-lite", - "socket2 0.6.2", + "socket2 0.5.10", "tokio", "tower-service", "tracing", @@ -4407,7 +4407,7 @@ dependencies = [ [[package]] name = "iggy" -version = "0.8.1-edge.5" +version = "0.8.1-edge.6" dependencies = [ "async-broadcast", "async-dropper", @@ -4421,7 +4421,6 @@ dependencies = [ "iggy_binary_protocol", "iggy_common", "mockall", - "num_cpus", "quinn", "reqwest", "reqwest-middleware", @@ -4458,7 +4457,7 @@ dependencies = [ "rand 0.9.2", "rayon", "serde", - "sysinfo", + "sysinfo 0.38.0", "tokio", "tracing", "tracing-appender", @@ -4639,7 +4638,7 @@ dependencies = [ "figment", "human-repr", "humantime", - "nix", + "nix 0.31.1", "once_cell", "rcgen", "ring", @@ -5089,7 +5088,7 @@ dependencies = [ "portable-atomic", "portable-atomic-util", "serde_core", - "windows-sys 0.61.2", + "windows-sys 0.59.0", ] [[package]] @@ -5782,6 +5781,18 @@ dependencies = [ "libc", ] +[[package]] +name = "nix" +version = "0.31.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "225e7cfe711e0ba79a68baeddb2982723e4235247aefce1482f2f16c27865b66" +dependencies = [ + "bitflags 2.10.0", + "cfg-if", + "cfg_aliases", + "libc", +] + [[package]] name = "nom" version = "7.1.3" @@ -5893,7 +5904,7 @@ version = "0.50.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5" dependencies = [ - "windows-sys 0.61.2", + "windows-sys 0.59.0", ] [[package]] @@ -6349,7 +6360,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7d8fae84b431384b68627d0f9b3b1245fcf9f46f6c0e3dc902e9dce64edd1967" dependencies = [ "libc", - "windows-sys 0.61.2", + "windows-sys 0.48.0", ] [[package]] @@ -7070,7 +7081,7 @@ dependencies = [ "quinn-udp", "rustc-hash", "rustls", - "socket2 0.6.2", + "socket2 0.5.10", "thiserror 2.0.18", "tokio", "tracing", @@ -7109,9 +7120,9 @@ dependencies = [ "cfg_aliases", "libc", "once_cell", - "socket2 0.6.2", + "socket2 0.5.10", "tracing", - "windows-sys 0.60.2", + "windows-sys 0.59.0", ] [[package]] @@ -7532,9 +7543,9 @@ dependencies = [ [[package]] name = "rmcp" -version = "0.13.0" +version = "0.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d1815dbc06c414d720f8bc1951eccd66bc99efc6376331f1e7093a119b3eb508" +checksum = "0a621b37a548ff6ab6292d57841eb25785a7f146d89391a19c9f199414bd13da" dependencies = [ "async-trait", "axum", @@ -7565,9 +7576,9 @@ dependencies = [ [[package]] name = "rmcp-macros" -version = "0.13.0" +version = "0.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "11f0bc7008fa102e771a76c6d2c9b253be3f2baa5964e060464d038ae1cbc573" +checksum = "6b79ed92303f9262db79575aa8c3652581668e9d136be6fd0b9ededa78954c95" dependencies = [ "darling 0.23.0", "proc-macro2", @@ -7725,7 +7736,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys", - "windows-sys 0.61.2", + "windows-sys 0.59.0", ] [[package]] @@ -7793,7 +7804,7 @@ dependencies = [ "security-framework", "security-framework-sys", "webpki-root-certs", - "windows-sys 0.61.2", + "windows-sys 0.59.0", ] [[package]] @@ -7979,7 +7990,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5b55fb86dfd3a2f5f76ea78310a88f96c4ea21a3031f8d212443d56123fd0521" dependencies = [ "libc", - "windows-sys 0.61.2", + "windows-sys 0.59.0", ] [[package]] @@ -8247,7 +8258,7 @@ dependencies = [ "mimalloc", "mime_guess", "moka", - "nix", + "nix 0.31.1", "opentelemetry", "opentelemetry-appender-tracing", "opentelemetry-otlp", @@ -8269,7 +8280,7 @@ dependencies = [ "socket2 0.6.2", "static-toml", "strum", - "sysinfo", + "sysinfo 0.38.0", "tempfile", "thiserror 2.0.18", "tokio", @@ -8887,7 +8898,21 @@ dependencies = [ "ntapi", "objc2-core-foundation", "objc2-io-kit", - "windows", + "windows 0.61.3", +] + +[[package]] +name = "sysinfo" +version = "0.38.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fe840c5b1afe259a5657392a4dbb74473a14c8db999c3ec2f4ae812e028a94da" +dependencies = [ + "libc", + "memchr", + "ntapi", + "objc2-core-foundation", + "objc2-io-kit", + "windows 0.62.2", ] [[package]] @@ -8923,7 +8948,7 @@ dependencies = [ "getrandom 0.3.4", "once_cell", "rustix", - "windows-sys 0.61.2", + "windows-sys 0.59.0", ] [[package]] @@ -9884,7 +9909,7 @@ dependencies = [ "regex", "rustc_version", "rustversion", - "sysinfo", + "sysinfo 0.37.2", "time", "vergen-lib", ] @@ -10148,7 +10173,7 @@ version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" dependencies = [ - "windows-sys 0.61.2", + "windows-sys 0.48.0", ] [[package]] @@ -10163,11 +10188,23 @@ version = "0.61.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9babd3a767a4c1aef6900409f85f5d53ce2544ccdfaa86dad48c91782c6d6893" dependencies = [ - "windows-collections", + "windows-collections 0.2.0", "windows-core 0.61.2", - "windows-future", + "windows-future 0.2.1", "windows-link 0.1.3", - "windows-numerics", + "windows-numerics 0.2.0", +] + +[[package]] +name = "windows" +version = "0.62.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "527fadee13e0c05939a6a05d5bd6eec6cd2e3dbd648b9f8e447c6518133d8580" +dependencies = [ + "windows-collections 0.3.2", + "windows-core 0.62.2", + "windows-future 0.3.2", + "windows-numerics 0.3.1", ] [[package]] @@ -10179,6 +10216,15 @@ dependencies = [ "windows-core 0.61.2", ] +[[package]] +name = "windows-collections" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "23b2d95af1a8a14a3c7367e1ed4fc9c20e0a26e79551b1454d72583c97cc6610" +dependencies = [ + "windows-core 0.62.2", +] + [[package]] name = "windows-core" version = "0.61.2" @@ -10213,7 +10259,18 @@ checksum = "fc6a41e98427b19fe4b73c550f060b59fa592d7d686537eebf9385621bfbad8e" dependencies = [ "windows-core 0.61.2", "windows-link 0.1.3", - "windows-threading", + "windows-threading 0.1.0", +] + +[[package]] +name = "windows-future" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1d6f90251fe18a279739e78025bd6ddc52a7e22f921070ccdc67dde84c605cb" +dependencies = [ + "windows-core 0.62.2", + "windows-link 0.2.1", + "windows-threading 0.2.1", ] [[package]] @@ -10260,6 +10317,16 @@ dependencies = [ "windows-link 0.1.3", ] +[[package]] +name = "windows-numerics" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e2e40844ac143cdb44aead537bbf727de9b044e107a0f1220392177d15b0f26" +dependencies = [ + "windows-core 0.62.2", + "windows-link 0.2.1", +] + [[package]] name = "windows-result" version = "0.3.4" @@ -10422,6 +10489,15 @@ dependencies = [ "windows-link 0.1.3", ] +[[package]] +name = "windows-threading" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3949bd5b99cafdf1c7ca86b43ca564028dfe27d66958f2470940f73d86d75b37" +dependencies = [ + "windows-link 0.2.1", +] + [[package]] name = "windows_aarch64_gnullvm" version = "0.42.2" diff --git a/Cargo.toml b/Cargo.toml index d845a659a..80c364d9f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -58,9 +58,9 @@ aes-gcm = "0.10.3" ahash = { version = "0.8.12", features = ["serde"] } anyhow = "1.0.100" argon2 = "0.5.3" -arrow = "57.0.0" -arrow-array = "57.0.0" -arrow-json = "57.0.0" +arrow = "57.2.0" +arrow-array = "57.2.0" +arrow-json = "57.2.0" async-broadcast = "0.7.2" async-channel = "2.5.0" async-dropper = { version = "0.3.1", features = ["tokio", "simple"] } @@ -147,7 +147,7 @@ log = "0.4.29" metadata = { path = "core/metadata" } mimalloc = "0.1" mockall = "0.14.0" -nix = { version = "0.30.1", features = ["fs", "resource", "sched"] } +nix = { version = "0.31.1", features = ["fs", "resource", "sched"] } nonzero_lit = "0.1.2" once_cell = "1.21.3" opentelemetry = { version = "0.31.0", features = ["trace", "logs"] } @@ -168,7 +168,7 @@ opentelemetry_sdk = { version = "0.31.0", features = [ "experimental_logs_batch_log_processor_with_async_runtime", "experimental_trace_batch_span_processor_with_async_runtime", ] } -parquet = "57.0.0" +parquet = "57.2.0" passterm = "=2.0.1" postcard = { version = "1.1.3", features = ["alloc"] } @@ -197,13 +197,13 @@ serde_yaml_ng = "0.10.0" serial_test = "3.3.1" server = { path = "core/server" } simd-json = { version = "0.17.0", features = ["serde_impl"] } -slab = "0.4.1" +slab = "0.4.11" strum = { version = "0.27.2", features = ["derive"] } strum_macros = "0.27.2" -sysinfo = "0.37.2" +sysinfo = "0.38.0" tempfile = "3.24.0" test-case = "3.3.1" -thiserror = "2.0.17" +thiserror = "2.0.18" tokio = { version = "1.49.0", features = ["full"] } tokio-rustls = "0.26.4" tokio-tungstenite = { version = "0.28", features = ["rustls-tls-webpki-roots"] } @@ -225,7 +225,7 @@ tracing-subscriber = { version = "0.3.22", default-features = false, features = trait-variant = "0.1.2" tungstenite = "0.28.0" twox-hash = { version = "2.1.2", features = ["xxhash32"] } -uuid = { version = "1.19.0", features = [ +uuid = { version = "1.20.0", features = [ "v4", "v7", "fast-rng", @@ -233,7 +233,7 @@ uuid = { version = "1.19.0", features = [ "zerocopy", ] } webpki-roots = "1.0.5" -zip = { version = "7.1.0", default-features = false, features = ["deflate"] } +zip = { version = "7.2.0", default-features = false, features = ["deflate"] } [profile.release] lto = true diff --git a/DEPENDENCIES.md b/DEPENDENCIES.md index 7d4f8133a..5f762d210 100644 --- a/DEPENDENCIES.md +++ b/DEPENDENCIES.md @@ -383,7 +383,7 @@ icu_provider: 2.1.1, "Unicode-3.0", ident_case: 1.0.1, "Apache-2.0 OR MIT", idna: 1.1.0, "Apache-2.0 OR MIT", idna_adapter: 1.2.1, "Apache-2.0 OR MIT", -iggy: 0.8.1-edge.5, "Apache-2.0", +iggy: 0.8.1-edge.6, "Apache-2.0", iggy-bench: 0.3.1-edge.1, "Apache-2.0", iggy-bench-dashboard-server: 0.5.1-edge.1, "Apache-2.0", iggy-cli: 0.10.1-edge.1, "Apache-2.0", @@ -496,6 +496,7 @@ moka: 0.12.13, "(Apache-2.0 OR MIT) AND Apache-2.0", murmur3: 0.5.2, "Apache-2.0 OR MIT", never-say-never: 6.6.666, "Apache-2.0 OR MIT OR Zlib", nix: 0.30.1, "MIT", +nix: 0.31.1, "MIT", nom: 7.1.3, "MIT", nom: 8.0.0, "MIT", nom_locate: 5.0.0, "MIT", @@ -654,8 +655,8 @@ ring: 0.17.14, "Apache-2.0 AND ISC", ringbuffer: 0.16.0, "MIT", rkyv: 0.7.46, "MIT", rkyv_derive: 0.7.46, "MIT", -rmcp: 0.13.0, "Apache-2.0", -rmcp-macros: 0.13.0, "Apache-2.0", +rmcp: 0.14.0, "Apache-2.0", +rmcp-macros: 0.14.0, "Apache-2.0", rmp: 0.8.15, "MIT", rmp-serde: 1.3.1, "MIT", roaring: 0.11.3, "Apache-2.0 OR MIT", @@ -765,6 +766,7 @@ synthez: 0.4.0, "BlueOak-1.0.0", synthez-codegen: 0.4.0, "BlueOak-1.0.0", synthez-core: 0.4.0, "BlueOak-1.0.0", sysinfo: 0.37.2, "MIT", +sysinfo: 0.38.0, "MIT", tagptr: 0.2.0, "Apache-2.0 OR MIT", tap: 1.0.1, "MIT", tar: 0.4.44, "Apache-2.0 OR MIT", @@ -890,15 +892,19 @@ winapi-i686-pc-windows-gnu: 0.4.0, "Apache-2.0 OR MIT", winapi-util: 0.1.11, "MIT OR Unlicense", winapi-x86_64-pc-windows-gnu: 0.4.0, "Apache-2.0 OR MIT", windows: 0.61.3, "Apache-2.0 OR MIT", +windows: 0.62.2, "Apache-2.0 OR MIT", windows-collections: 0.2.0, "Apache-2.0 OR MIT", +windows-collections: 0.3.2, "Apache-2.0 OR MIT", windows-core: 0.61.2, "Apache-2.0 OR MIT", windows-core: 0.62.2, "Apache-2.0 OR MIT", windows-future: 0.2.1, "Apache-2.0 OR MIT", +windows-future: 0.3.2, "Apache-2.0 OR MIT", windows-implement: 0.60.2, "Apache-2.0 OR MIT", windows-interface: 0.59.3, "Apache-2.0 OR MIT", windows-link: 0.1.3, "Apache-2.0 OR MIT", windows-link: 0.2.1, "Apache-2.0 OR MIT", windows-numerics: 0.2.0, "Apache-2.0 OR MIT", +windows-numerics: 0.3.1, "Apache-2.0 OR MIT", windows-result: 0.3.4, "Apache-2.0 OR MIT", windows-result: 0.4.1, "Apache-2.0 OR MIT", windows-strings: 0.4.2, "Apache-2.0 OR MIT", @@ -914,6 +920,7 @@ windows-targets: 0.48.5, "Apache-2.0 OR MIT", windows-targets: 0.52.6, "Apache-2.0 OR MIT", windows-targets: 0.53.5, "Apache-2.0 OR MIT", windows-threading: 0.1.0, "Apache-2.0 OR MIT", +windows-threading: 0.2.1, "Apache-2.0 OR MIT", windows_aarch64_gnullvm: 0.42.2, "Apache-2.0 OR MIT", windows_aarch64_gnullvm: 0.48.5, "Apache-2.0 OR MIT", windows_aarch64_gnullvm: 0.52.6, "Apache-2.0 OR MIT", diff --git a/core/ai/mcp/Cargo.toml b/core/ai/mcp/Cargo.toml index f8f8c138e..a5c4b10c3 100644 --- a/core/ai/mcp/Cargo.toml +++ b/core/ai/mcp/Cargo.toml @@ -41,7 +41,7 @@ opentelemetry-otlp = { workspace = true } opentelemetry-semantic-conventions = { workspace = true } opentelemetry_sdk = { workspace = true, features = ["rt-tokio"] } reqwest = { workspace = true } -rmcp = { version = "0.13.0", features = [ +rmcp = { version = "0.14.0", features = [ "server", "transport-io", "transport-streamable-http-server", diff --git a/core/bench/dashboard/server/Cargo.toml b/core/bench/dashboard/server/Cargo.toml index 6303a663a..0ae74810b 100644 --- a/core/bench/dashboard/server/Cargo.toml +++ b/core/bench/dashboard/server/Cargo.toml @@ -30,7 +30,7 @@ bench-report = { workspace = true } chrono = { workspace = true, features = ["serde"] } clap = { workspace = true } dashmap = { workspace = true } -file-operation = "0.8.11" +file-operation = "0.8.17" notify = "8.2.0" octocrab = "0.49.5" serde = { workspace = true, features = ["derive"] } diff --git a/core/common/Cargo.toml b/core/common/Cargo.toml index a529c1ab3..45ad65e51 100644 --- a/core/common/Cargo.toml +++ b/core/common/Cargo.toml @@ -57,7 +57,7 @@ human-repr = { workspace = true } humantime = { workspace = true } nix = { workspace = true } once_cell = { workspace = true } -rcgen = "0.14.6" +rcgen = "0.14.7" ring = "0.17.14" rustls = { workspace = true } serde = { workspace = true } diff --git a/core/integration/Cargo.toml b/core/integration/Cargo.toml index 6dec491f5..87b95b6f1 100644 --- a/core/integration/Cargo.toml +++ b/core/integration/Cargo.toml @@ -47,9 +47,9 @@ libc = "0.2.180" log = { workspace = true } predicates = { workspace = true } rand = { workspace = true } -rcgen = "0.14.6" +rcgen = "0.14.7" reqwest = { workspace = true } -rmcp = { version = "0.13.0", features = [ +rmcp = { version = "0.14.0", features = [ "client", "reqwest", "transport-streamable-http-client", diff --git a/core/integration/src/test_mcp_server.rs b/core/integration/src/test_mcp_server.rs index 9d729eaa8..f4179a698 100644 --- a/core/integration/src/test_mcp_server.rs +++ b/core/integration/src/test_mcp_server.rs @@ -182,6 +182,7 @@ impl TestMcpServer { let client_info = ClientInfo { protocol_version: Default::default(), capabilities: ClientCapabilities::default(), + meta: None, client_info: Implementation { name: "test-mcp-client".to_string(), version: "1.0.0".to_string(), diff --git a/core/integration/tests/mcp/mod.rs b/core/integration/tests/mcp/mod.rs index 3fab31930..0fadd7ea3 100644 --- a/core/integration/tests/mcp/mod.rs +++ b/core/integration/tests/mcp/mod.rs @@ -35,7 +35,7 @@ use integration::{ use lazy_static::lazy_static; use rmcp::{ ServiceError, - model::{CallToolRequestParam, CallToolResult, ListToolsResult}, + model::{CallToolRequestParams, CallToolResult, ListToolsResult}, serde::de::DeserializeOwned, serde_json::{self, json}, }; @@ -698,10 +698,11 @@ impl TestMcpClient { data: Option<serde_json::Value>, ) -> Result<CallToolResult, ServiceError> { self.mcp_client - .call_tool(CallToolRequestParam { + .call_tool(CallToolRequestParams { name: method.to_owned().into(), arguments: data.and_then(|value| value.as_object().cloned()), task: None, + meta: None, }) .await } diff --git a/core/integration/tests/sdk/producer/background.rs b/core/integration/tests/sdk/producer/background.rs index d8d7818ae..8acb05de2 100644 --- a/core/integration/tests/sdk/producer/background.rs +++ b/core/integration/tests/sdk/producer/background.rs @@ -493,3 +493,146 @@ async fn background_linger_time_respected_after_idle() { producer.shutdown().await; cleanup(&client).await; } + +#[tokio::test] +#[parallel] +async fn background_preserves_message_ordering() { + let mut test_server = TestServer::default(); + test_server.start(); + + let tcp_client_config = TcpClientConfig { + server_address: test_server.get_raw_tcp_addr().unwrap(), + ..TcpClientConfig::default() + }; + let client = ClientWrapper::Tcp(TcpClient::create(Arc::new(tcp_client_config)).unwrap()); + let client = IggyClient::create(client, None, None); + + client.connect().await.unwrap(); + login_root(&client).await; + init_system(&client).await; + + let messages_count = 1000u32; + + let cfg = BackgroundConfig::builder() + .linger_time(IggyDuration::from(50_000)) + .build(); + + let producer = client + .producer(STREAM_NAME, TOPIC_NAME) + .unwrap() + .partitioning(Partitioning::partition_id(PARTITION_ID)) + .background(cfg) + .build(); + + for i in 0..messages_count { + let payload = Bytes::from(i.to_le_bytes().to_vec()); + let msg = IggyMessage::builder() + .id(i as u128 + 1) + .payload(payload) + .build() + .unwrap(); + producer.send(vec![msg]).await.unwrap(); + } + + producer.shutdown().await; + + let consumer = Consumer::default(); + let polled_messages = client + .poll_messages( + &Identifier::named(STREAM_NAME).unwrap(), + &Identifier::named(TOPIC_NAME).unwrap(), + Some(PARTITION_ID), + &consumer, + &PollingStrategy::offset(0), + messages_count, + false, + ) + .await + .unwrap(); + + assert_eq!(polled_messages.messages.len() as u32, messages_count); + + for (idx, msg) in polled_messages.messages.iter().enumerate() { + let expected = idx as u32; + let actual = u32::from_le_bytes(msg.payload[..4].try_into().unwrap()); + assert_eq!( + actual, expected, + "Message at position {} has wrong order: expected {}, got {}", + idx, expected, actual + ); + } + + cleanup(&client).await; +} + +#[tokio::test] +#[parallel] +async fn background_preserves_ordering_with_multiple_shards() { + let mut test_server = TestServer::default(); + test_server.start(); + + let tcp_client_config = TcpClientConfig { + server_address: test_server.get_raw_tcp_addr().unwrap(), + ..TcpClientConfig::default() + }; + let client = ClientWrapper::Tcp(TcpClient::create(Arc::new(tcp_client_config)).unwrap()); + let client = IggyClient::create(client, None, None); + + client.connect().await.unwrap(); + login_root(&client).await; + init_system(&client).await; + + let messages_count = 1000u32; + + let cfg = BackgroundConfig::builder() + .num_shards(4) + .linger_time(IggyDuration::from(50_000)) + .build(); + + let producer = client + .producer(STREAM_NAME, TOPIC_NAME) + .unwrap() + .partitioning(Partitioning::partition_id(PARTITION_ID)) + .background(cfg) + .build(); + + for i in 0..messages_count { + let payload = Bytes::from(i.to_le_bytes().to_vec()); + let msg = IggyMessage::builder() + .id(i as u128 + 1) + .payload(payload) + .build() + .unwrap(); + producer.send(vec![msg]).await.unwrap(); + } + + producer.shutdown().await; + + let consumer = Consumer::default(); + let polled_messages = client + .poll_messages( + &Identifier::named(STREAM_NAME).unwrap(), + &Identifier::named(TOPIC_NAME).unwrap(), + Some(PARTITION_ID), + &consumer, + &PollingStrategy::offset(0), + messages_count, + false, + ) + .await + .unwrap(); + + assert_eq!(polled_messages.messages.len() as u32, messages_count); + + for (idx, msg) in polled_messages.messages.iter().enumerate() { + let expected = idx as u32; + let actual = u32::from_le_bytes(msg.payload[..4].try_into().unwrap()); + assert_eq!( + actual, expected, + "Message at position {} has wrong order: expected {}, got {}", + idx, expected, actual + ); + } + + cleanup(&client).await; +} diff --git a/core/sdk/Cargo.toml b/core/sdk/Cargo.toml index c83cfc06b..6d33647bb 100644 --- a/core/sdk/Cargo.toml +++ b/core/sdk/Cargo.toml @@ -17,7 +17,7 @@ [package] name = "iggy" -version = "0.8.1-edge.5" +version = "0.8.1-edge.6" description = "Iggy is the persistent message streaming platform written in Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing millions of messages per second." edition = "2024" license = "Apache-2.0" @@ -48,7 +48,6 @@ futures = { workspace = true } futures-util = { workspace = true } iggy_binary_protocol = { workspace = true } iggy_common = { workspace = true } -num_cpus = "1.17.0" quinn = { workspace = true } reqwest = { workspace = true } reqwest-middleware = { workspace = true } diff --git a/core/sdk/src/clients/consumer.rs b/core/sdk/src/clients/consumer.rs index 3d7784938..73190de1e 100644 --- a/core/sdk/src/clients/consumer.rs +++ b/core/sdk/src/clients/consumer.rs @@ -475,10 +475,6 @@ impl IggyConsumer { tokio::spawn(async move { loop { sleep(interval.get_duration()).await; - if shutdown.load(ORDERING) { - trace!("Shutdown signal received, stopping background offset storage"); - break; - } for entry in last_consumed_offsets.iter() { let partition_id = *entry.key(); let consumed_offset = entry.load(ORDERING); @@ -494,6 +490,10 @@ impl IggyConsumer { ) .await; } + if shutdown.load(ORDERING) { + trace!("Shutdown signal received, stopping background offset storage"); + break; + } } }); } @@ -1056,6 +1056,71 @@ impl Stream for IggyConsumer { } } +impl IggyConsumer { + pub async fn shutdown(&mut self) -> Result<(), IggyError> { + if self.shutdown.swap(true, ORDERING) { + return Ok(()); + } + + info!("Shutting down consumer: {}...", self.consumer_name); + + for entry in self.last_consumed_offsets.iter() { + let partition_id = *entry.key(); + let consumed_offset = entry.load(ORDERING); + + let Some(stored_offset) = self + .last_stored_offsets + .get(&partition_id) + .map(|e| e.load(ORDERING)) + else { + continue; + }; + + if consumed_offset > stored_offset { + trace!( + "Flushing final offset: {consumed_offset} for partition: {partition_id}, stream: {}, topic: {}", + self.stream_id, self.topic_id + ); + let _ = Self::store_consumer_offset( + &self.client, + &self.consumer, + &self.stream_id, + &self.topic_id, + partition_id, + consumed_offset, + &self.last_stored_offsets, + self.allow_replay, + ) + .await; + } + } + + if self.is_consumer_group && self.joined_consumer_group.load(ORDERING) { + let group_id = self.consumer.id.clone(); + trace!( + "Leaving consumer group: {group_id} for stream: {}, topic: {}", + self.stream_id, self.topic_id + ); + + let client = self.client.read().await; + if let Err(error) = client + .leave_consumer_group(&self.stream_id, &self.topic_id, &group_id) + .await + { + warn!( + "Failed to leave consumer group: {group_id} for stream: {}, topic: {}. {error}", + self.stream_id, self.topic_id + ); + } else { + self.joined_consumer_group.store(false, ORDERING); + } + } + + info!("Consumer: {} has been shut down.", self.consumer_name); + Ok(()) + } +} + impl Drop for IggyConsumer { fn drop(&mut self) { self.shutdown.store(true, ORDERING); diff --git a/core/sdk/src/clients/producer.rs b/core/sdk/src/clients/producer.rs index 0a6ec018e..67a5bf8d4 100644 --- a/core/sdk/src/clients/producer.rs +++ b/core/sdk/src/clients/producer.rs @@ -185,6 +185,7 @@ impl ProducerCore { messages: &mut [IggyMessage], ) -> Result<(), IggyError> { let client = self.client.read().await; + let Some(max_retries) = self.send_retries_count else { return client .send_messages(stream, topic, partitioning, messages) @@ -197,25 +198,10 @@ impl ProducerCore { .await; } - let mut timer = if let Some(interval) = self.send_retries_interval { - let mut timer = tokio::time::interval(interval.get_duration()); - timer.tick().await; - Some(timer) - } else { - None - }; - - self.wait_until_connected(max_retries, stream, topic, &mut timer) + self.wait_until_connected(max_retries, stream, topic) .await?; - self.send_with_retries( - max_retries, - stream, - topic, - partitioning, - messages, - &mut timer, - ) - .await + self.send_with_retries(&client, max_retries, stream, topic, partitioning, messages) + .await } async fn wait_until_connected( @@ -223,9 +209,10 @@ impl ProducerCore { max_retries: u32, stream: &Identifier, topic: &Identifier, - timer: &mut Option<Interval>, ) -> Result<(), IggyError> { let mut retries = 0; + let mut timer: Option<Interval> = None; + while !self.can_send.load(ORDERING) { retries += 1; if retries > max_retries { @@ -241,7 +228,9 @@ impl ProducerCore { but the client is disconnected. Retrying {retries}/{max_retries}..." ); - if let Some(timer) = timer.as_mut() { + if let Some(interval) = self.send_retries_interval { + let timer = + timer.get_or_insert_with(|| tokio::time::interval(interval.get_duration())); trace!( "Waiting for the next retry to send messages to topic: {topic}, \ stream: {stream} for disconnected client..." @@ -254,15 +243,16 @@ impl ProducerCore { async fn send_with_retries( &self, + client: &ClientWrapper, max_retries: u32, stream: &Identifier, topic: &Identifier, partitioning: &Arc<Partitioning>, messages: &mut [IggyMessage], - timer: &mut Option<Interval>, ) -> Result<(), IggyError> { - let client = self.client.read().await; let mut retries = 0; + let mut timer: Option<Interval> = None; + loop { match client .send_messages(stream, topic, partitioning, messages) @@ -284,12 +274,14 @@ impl ProducerCore { {error} Retrying {retries}/{max_retries}..." ); - if let Some(t) = timer.as_mut() { + if let Some(interval) = self.send_retries_interval { + let timer = timer + .get_or_insert_with(|| tokio::time::interval(interval.get_duration())); trace!( "Waiting for the next retry to send messages to topic: {topic}, \ stream: {stream}..." ); - t.tick().await; + timer.tick().await; } } } @@ -570,8 +562,8 @@ impl IggyProducer { } pub async fn shutdown(self) { - if let Some(disp) = self.dispatcher { - disp.shutdown().await; + if let Some(dispatcher) = self.dispatcher { + dispatcher.shutdown().await; } } } diff --git a/core/sdk/src/clients/producer_config.rs b/core/sdk/src/clients/producer_config.rs index dd3f301de..26d4bed6f 100644 --- a/core/sdk/src/clients/producer_config.rs +++ b/core/sdk/src/clients/producer_config.rs @@ -17,7 +17,7 @@ */ use crate::clients::MIB; use crate::clients::producer_error_callback::{ErrorCallback, LogErrorCallback}; -use crate::clients::producer_sharding::{BalancedSharding, Sharding}; +use crate::clients::producer_sharding::{OrderedSharding, Sharding}; use bon::Builder; use iggy_common::{IggyByteSize, IggyDuration}; use std::sync::Arc; @@ -67,10 +67,13 @@ pub enum BackpressureMode { pub struct BackgroundConfig { /// Number of shard-workers that run in parallel. /// - /// The default is `num_cpus::get().clamp(2, 16)`. - /// More shards increase throughput by parallelising - /// serialisation, compression and network I/O, but consume more memory. - #[builder(default = default_shard_count())] + /// With the default `OrderedSharding` strategy, messages to the same + /// stream/topic are always routed to the same shard, preserving ordering. + /// Increasing shards improves throughput only when sending to multiple streams/topics. + /// + /// With `BalancedSharding`, messages are distributed round-robin across all shards + /// for maximum single-destination throughput, but ordering is **not** preserved. + #[builder(default = 1)] pub num_shards: usize, /// How long a shard may wait before flushing an *incomplete* batch. /// @@ -84,26 +87,36 @@ pub struct BackgroundConfig { #[builder(default = Arc::new(Box::new(LogErrorCallback)))] pub error_callback: Arc<Box<dyn ErrorCallback + Send + Sync>>, /// Strategy that maps a message to a shard. - #[builder(default = Box::new(BalancedSharding::default()))] + /// + /// Default is `OrderedSharding` which routes all messages for the same + /// stream/topic to the same shard, preserving message ordering. + /// + /// Use `BalancedSharding` for maximum throughput when ordering doesn't matter. + #[builder(default = Box::new(OrderedSharding))] pub sharding: Box<dyn Sharding + Send + Sync>, - /// Maximum **total size in bytes** of a batch. + /// Maximum **total size in bytes** of a batch. /// `0` ⇒ unlimited (size-based batching disabled). #[builder(default = MIB)] pub batch_size: usize, - /// Maximum **number of messages** per batch. + /// Maximum **number of messages** per batch. /// `0` ⇒ unlimited (length-based batching disabled). #[builder(default = 1000)] pub batch_length: usize, /// Action to apply when back-pressure limits are reached #[builder(default = BackpressureMode::Block)] pub failure_mode: BackpressureMode, - /// Upper bound for the **bytes held in memory** across *all* shards. + /// Upper bound for the **bytes held in memory** across *all* shards. /// `IggyByteSize::from(0)` ⇒ unlimited. #[builder(default = IggyByteSize::from(32 * MIB as u64))] pub max_buffer_size: IggyByteSize, - /// Maximum number of **in-flight requests** (batches being sent). - /// `0` ⇒ unlimited. - #[builder(default = default_shard_count() * 2)] + /// Maximum number of **in-flight requests** (batches being sent). + /// + /// **WARNING**: Using more than 1 may cause message reordering if retries occur. + /// With max_in_flight > 1, a failed batch could be retried after later batches succeed. + /// + /// The default is `1` to preserve message ordering. + /// `0` ⇒ unlimited (no ordering guarantee). + #[builder(default = 1)] pub max_in_flight: usize, } @@ -134,11 +147,6 @@ pub struct DirectConfig { #[builder(default = 1000)] pub batch_length: u32, /// How long to wait for more messages before flushing the current set. - #[builder(default = IggyDuration::from(1000))] + #[builder(default = IggyDuration::from(0))] pub linger_time: IggyDuration, } - -fn default_shard_count() -> usize { - let cpus = num_cpus::get(); - cpus.clamp(2, 64) -} diff --git a/core/sdk/src/clients/producer_dispatcher.rs b/core/sdk/src/clients/producer_dispatcher.rs index fa6dded4c..c8ab06423 100644 --- a/core/sdk/src/clients/producer_dispatcher.rs +++ b/core/sdk/src/clients/producer_dispatcher.rs @@ -38,7 +38,12 @@ pub struct ProducerDispatcher { impl ProducerDispatcher { pub fn new(core: Arc<impl ProducerCoreBackend>, config: BackgroundConfig) -> Self { - let mut shards = Vec::with_capacity(config.num_shards); + let num_shards = if config.num_shards == 0 { + 1 + } else { + config.num_shards + }; + let mut shards = Vec::with_capacity(num_shards); let config = Arc::new(config); let (err_tx, err_rx) = flume::unbounded::<ErrorCtx>(); @@ -70,7 +75,7 @@ impl ProducerDispatcher { } }); - for _ in 0..config.num_shards { + for _ in 0..num_shards { let stop_rx = stop_tx.subscribe(); shards.push(Shard::new( core.clone(), diff --git a/core/sdk/src/clients/producer_sharding.rs b/core/sdk/src/clients/producer_sharding.rs index 8bdca2213..545212984 100644 --- a/core/sdk/src/clients/producer_sharding.rs +++ b/core/sdk/src/clients/producer_sharding.rs @@ -19,6 +19,8 @@ use crate::clients::producer::ProducerCoreBackend; use crate::clients::producer_config::BackgroundConfig; use crate::clients::producer_error_callback::ErrorCtx; use iggy_common::{Identifier, IggyByteSize, IggyError, IggyMessage, Partitioning, Sizeable}; +use std::hash::DefaultHasher; +use std::hash::{Hash, Hasher}; use std::sync::Arc; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use tokio::sync::{OwnedSemaphorePermit, broadcast}; @@ -42,13 +44,16 @@ pub trait Sharding: Send + Sync + std::fmt::Debug + 'static { /// A simple round-robin sharding strategy. /// Distributes messages evenly across all shards by incrementing an atomic counter. +/// +/// **WARNING**: This strategy does NOT preserve message ordering across shards. +/// Messages to the same stream/topic may be processed out of order. +/// Use `OrderedSharding` if message ordering is required. #[derive(Default, Debug)] pub struct BalancedSharding { counter: AtomicUsize, } impl Sharding for BalancedSharding { - /// Picks the next shard in a round-robin fashion. fn pick_shard( &self, num_shards: usize, @@ -60,6 +65,29 @@ impl Sharding for BalancedSharding { } } +/// A sharding strategy that preserves message ordering by routing all messages +/// for the same stream/topic combination to the same shard. +/// +/// This ensures that messages sent to the same destination are processed +/// in the order they were dispatched, even when using multiple shards. +#[derive(Default, Debug)] +pub struct OrderedSharding; + +impl Sharding for OrderedSharding { + fn pick_shard( + &self, + num_shards: usize, + _: &[IggyMessage], + stream: &Identifier, + topic: &Identifier, + ) -> usize { + let mut hasher = DefaultHasher::new(); + stream.hash(&mut hasher); + topic.hash(&mut hasher); + (hasher.finish() as usize) % num_shards + } +} + #[derive(Debug)] pub struct ShardMessage { pub stream: Arc<Identifier>, @@ -169,6 +197,10 @@ impl Shard { } _ = stop_rx.recv() => { closed_clone.store(true, Ordering::Release); + while let Ok(msg) = rx.try_recv() { + buffer_bytes += msg.inner.get_size_bytes().as_bytes_usize(); + buffer.push(msg); + } if !buffer.is_empty() { Self::flush_buffer(&core, &mut buffer, &mut buffer_bytes, &err_sender).await; } @@ -191,7 +223,22 @@ impl Shard { buffer_bytes: &mut usize, err_sender: &flume::Sender<ErrorCtx>, ) { + if buffer.is_empty() { + return; + } + + let mut merged_batches: Vec<ShardMessageWithPermits> = Vec::new(); for msg in buffer.drain(..) { + if let Some(last) = merged_batches.last_mut() + && Self::same_destination(&last.inner, &msg.inner) + { + last.inner.messages.extend(msg.inner.messages); + continue; + } + merged_batches.push(msg); + } + + for msg in merged_batches { let result = core .send_internal( &msg.inner.stream, @@ -201,13 +248,13 @@ impl Shard { ) .await; - if let Err(err) = result { + if let Err(error) = result { if let IggyError::ProducerSendFailed { failed, cause, stream_name, topic_name, - } = &err + } = &error { let ctx = ErrorCtx { cause: cause.to_owned(), @@ -220,7 +267,7 @@ impl Shard { }; let _ = err_sender.send_async(ctx).await; } else { - tracing::error!("background send failed: {err}"); + error!("Background send failed. {error}"); } } } @@ -237,6 +284,12 @@ impl Shard { IggyError::BackgroundSendError }) } + + fn same_destination(first: &ShardMessage, second: &ShardMessage) -> bool { + first.stream == second.stream + && first.topic == second.topic + && first.partitioning == second.partitioning + } } #[cfg(test)] diff --git a/core/sdk/src/prelude.rs b/core/sdk/src/prelude.rs index 5459b3125..ba374fe6b 100644 --- a/core/sdk/src/prelude.rs +++ b/core/sdk/src/prelude.rs @@ -40,6 +40,7 @@ pub use crate::clients::consumer_builder::IggyConsumerBuilder; pub use crate::clients::producer::IggyProducer; pub use crate::clients::producer_builder::IggyProducerBuilder; pub use crate::clients::producer_config::{BackgroundConfig, DirectConfig}; +pub use crate::clients::producer_sharding::{BalancedSharding, OrderedSharding, Sharding}; pub use crate::consumer_ext::IggyConsumerMessageExt; pub use crate::stream_builder::IggyConsumerConfig; pub use crate::stream_builder::IggyStreamConsumer; diff --git a/core/server/Cargo.toml b/core/server/Cargo.toml index 26170f137..b01c7c95d 100644 --- a/core/server/Cargo.toml +++ b/core/server/Cargo.toml @@ -75,7 +75,7 @@ lending-iterator = "0.1.7" metadata = { workspace = true } mimalloc = { workspace = true, optional = true } mime_guess = { version = "2.0", optional = true } -moka = { version = "0.12.12", features = ["future"] } +moka = { version = "0.12.13", features = ["future"] } nix = { workspace = true } opentelemetry = { workspace = true } opentelemetry-appender-tracing = { workspace = true } @@ -95,7 +95,7 @@ send_wrapper = "0.6.0" serde = { workspace = true } serde_with = { workspace = true } slab = { workspace = true } -socket2 = "0.6.1" +socket2 = "0.6.2" static-toml = "1.3.0" strum = { workspace = true } sysinfo = { workspace = true }
