This is an automated email from the ASF dual-hosted git repository.
piotr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new 486cdbcf8 fix(sdk): improve high-level SDK direct and background
producers (#2621)
486cdbcf8 is described below
commit 486cdbcf8c605c10d57dfbd45658e6f2164b1dfa
Author: Piotr Gankiewicz <[email protected]>
AuthorDate: Tue Jan 27 10:03:01 2026 +0100
fix(sdk): improve high-level SDK direct and background producers (#2621)
- Direct producer: removed redundant lock acquisition, lazy timer
initialization
- Background producer now preserves message ordering by default
(`OrderedSharding`, `max_in_flight = 1`)
- Added `shutdown()` to consumer - flushes pending offsets and leaves
consumer group gracefully
- Fixed message loss on shutdown - both producer and consumer now
drain/flush before exit
- Fixed `num_shards = 0` panic - treated as 1
- Updated Rust dependencies
---
Cargo.lock | 158 ++++++++++++++++------
Cargo.toml | 28 ++--
DEPENDENCIES.md | 27 ++--
core/ai/mcp/Cargo.toml | 4 +-
core/bench/Cargo.toml | 2 +-
core/bench/dashboard/server/Cargo.toml | 2 +-
core/binary_protocol/Cargo.toml | 2 +-
core/common/Cargo.toml | 4 +-
core/connectors/runtime/Cargo.toml | 2 +-
core/connectors/runtime/src/sink.rs | 8 +-
core/connectors/runtime/src/source.rs | 8 +-
core/connectors/sdk/Cargo.toml | 2 +-
core/integration/Cargo.toml | 4 +-
core/integration/src/test_mcp_server.rs | 1 +
core/integration/tests/mcp/mod.rs | 5 +-
core/integration/tests/sdk/producer/background.rs | 143 ++++++++++++++++++++
core/sdk/Cargo.toml | 3 +-
core/sdk/src/clients/consumer.rs | 71 +++++++++-
core/sdk/src/clients/producer.rs | 44 +++---
core/sdk/src/clients/producer_config.rs | 44 +++---
core/sdk/src/clients/producer_dispatcher.rs | 9 +-
core/sdk/src/clients/producer_sharding.rs | 61 ++++++++-
core/sdk/src/prelude.rs | 1 +
core/server/Cargo.toml | 6 +-
24 files changed, 492 insertions(+), 147 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index c8894b31d..4d342f57b 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1093,7 +1093,7 @@ dependencies = [
"rand 0.9.2",
"serde",
"serde_json",
- "sysinfo",
+ "sysinfo 0.38.0",
"tracing",
"uuid",
]
@@ -1317,7 +1317,7 @@ version = "3.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "89ec27229c38ed0eb3c0feee3d2c1d6a4379ae44f418a29a658890e062d8f365"
dependencies = [
- "darling 0.23.0",
+ "darling 0.20.11",
"ident_case",
"prettyplease",
"proc-macro2",
@@ -1665,7 +1665,7 @@ version = "3.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "faf9468729b8cbcea668e36183cb69d317348c2e08e994829fb56ebfdfbaac34"
dependencies = [
- "windows-sys 0.61.2",
+ "windows-sys 0.48.0",
]
[[package]]
@@ -2224,7 +2224,7 @@ source =
"registry+https://github.com/rust-lang/crates.io-index"
checksum = "73736a89c4aff73035ba2ed2e565061954da00d4970fc9ac25dcc85a2a20d790"
dependencies = [
"dispatch2",
- "nix",
+ "nix 0.30.1",
"windows-sys 0.61.2",
]
@@ -2665,7 +2665,7 @@ dependencies = [
"libc",
"option-ext",
"redox_users",
- "windows-sys 0.61.2",
+ "windows-sys 0.59.0",
]
[[package]]
@@ -2969,7 +2969,7 @@ source =
"registry+https://github.com/rust-lang/crates.io-index"
checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb"
dependencies = [
"libc",
- "windows-sys 0.61.2",
+ "windows-sys 0.59.0",
]
[[package]]
@@ -4175,7 +4175,7 @@ dependencies = [
"libc",
"percent-encoding",
"pin-project-lite",
- "socket2 0.6.2",
+ "socket2 0.5.10",
"tokio",
"tower-service",
"tracing",
@@ -4407,7 +4407,7 @@ dependencies = [
[[package]]
name = "iggy"
-version = "0.8.1-edge.5"
+version = "0.8.1-edge.6"
dependencies = [
"async-broadcast",
"async-dropper",
@@ -4421,7 +4421,6 @@ dependencies = [
"iggy_binary_protocol",
"iggy_common",
"mockall",
- "num_cpus",
"quinn",
"reqwest",
"reqwest-middleware",
@@ -4440,7 +4439,7 @@ dependencies = [
[[package]]
name = "iggy-bench"
-version = "0.3.1-edge.1"
+version = "0.3.1-edge.2"
dependencies = [
"async-trait",
"bench-report",
@@ -4458,7 +4457,7 @@ dependencies = [
"rand 0.9.2",
"rayon",
"serde",
- "sysinfo",
+ "sysinfo 0.38.0",
"tokio",
"tracing",
"tracing-appender",
@@ -4515,7 +4514,7 @@ dependencies = [
[[package]]
name = "iggy-connectors"
-version = "0.2.1-edge.4"
+version = "0.2.1-edge.5"
dependencies = [
"async-trait",
"axum",
@@ -4563,7 +4562,7 @@ dependencies = [
[[package]]
name = "iggy-mcp"
-version = "0.2.1-edge.4"
+version = "0.2.1-edge.5"
dependencies = [
"axum",
"axum-server",
@@ -4594,7 +4593,7 @@ dependencies = [
[[package]]
name = "iggy_binary_protocol"
-version = "0.8.1-edge.2"
+version = "0.8.1-edge.3"
dependencies = [
"anyhow",
"async-broadcast",
@@ -4615,7 +4614,7 @@ dependencies = [
[[package]]
name = "iggy_common"
-version = "0.8.1-edge.1"
+version = "0.8.1-edge.2"
dependencies = [
"aes-gcm",
"ahash 0.8.12",
@@ -4639,7 +4638,7 @@ dependencies = [
"figment",
"human-repr",
"humantime",
- "nix",
+ "nix 0.31.1",
"once_cell",
"rcgen",
"ring",
@@ -4786,7 +4785,7 @@ dependencies = [
[[package]]
name = "iggy_connector_sdk"
-version = "0.1.1-edge.2"
+version = "0.1.1-edge.3"
dependencies = [
"async-trait",
"base64 0.22.1",
@@ -5089,7 +5088,7 @@ dependencies = [
"portable-atomic",
"portable-atomic-util",
"serde_core",
- "windows-sys 0.61.2",
+ "windows-sys 0.59.0",
]
[[package]]
@@ -5782,6 +5781,18 @@ dependencies = [
"libc",
]
+[[package]]
+name = "nix"
+version = "0.31.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "225e7cfe711e0ba79a68baeddb2982723e4235247aefce1482f2f16c27865b66"
+dependencies = [
+ "bitflags 2.10.0",
+ "cfg-if",
+ "cfg_aliases",
+ "libc",
+]
+
[[package]]
name = "nom"
version = "7.1.3"
@@ -5893,7 +5904,7 @@ version = "0.50.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5"
dependencies = [
- "windows-sys 0.61.2",
+ "windows-sys 0.59.0",
]
[[package]]
@@ -6349,7 +6360,7 @@ source =
"registry+https://github.com/rust-lang/crates.io-index"
checksum = "7d8fae84b431384b68627d0f9b3b1245fcf9f46f6c0e3dc902e9dce64edd1967"
dependencies = [
"libc",
- "windows-sys 0.61.2",
+ "windows-sys 0.48.0",
]
[[package]]
@@ -7070,7 +7081,7 @@ dependencies = [
"quinn-udp",
"rustc-hash",
"rustls",
- "socket2 0.6.2",
+ "socket2 0.5.10",
"thiserror 2.0.18",
"tokio",
"tracing",
@@ -7109,9 +7120,9 @@ dependencies = [
"cfg_aliases",
"libc",
"once_cell",
- "socket2 0.6.2",
+ "socket2 0.5.10",
"tracing",
- "windows-sys 0.60.2",
+ "windows-sys 0.59.0",
]
[[package]]
@@ -7532,9 +7543,9 @@ dependencies = [
[[package]]
name = "rmcp"
-version = "0.13.0"
+version = "0.14.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "d1815dbc06c414d720f8bc1951eccd66bc99efc6376331f1e7093a119b3eb508"
+checksum = "0a621b37a548ff6ab6292d57841eb25785a7f146d89391a19c9f199414bd13da"
dependencies = [
"async-trait",
"axum",
@@ -7565,9 +7576,9 @@ dependencies = [
[[package]]
name = "rmcp-macros"
-version = "0.13.0"
+version = "0.14.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "11f0bc7008fa102e771a76c6d2c9b253be3f2baa5964e060464d038ae1cbc573"
+checksum = "6b79ed92303f9262db79575aa8c3652581668e9d136be6fd0b9ededa78954c95"
dependencies = [
"darling 0.23.0",
"proc-macro2",
@@ -7725,7 +7736,7 @@ dependencies = [
"errno",
"libc",
"linux-raw-sys",
- "windows-sys 0.61.2",
+ "windows-sys 0.59.0",
]
[[package]]
@@ -7793,7 +7804,7 @@ dependencies = [
"security-framework",
"security-framework-sys",
"webpki-root-certs",
- "windows-sys 0.61.2",
+ "windows-sys 0.59.0",
]
[[package]]
@@ -7979,7 +7990,7 @@ source =
"registry+https://github.com/rust-lang/crates.io-index"
checksum = "5b55fb86dfd3a2f5f76ea78310a88f96c4ea21a3031f8d212443d56123fd0521"
dependencies = [
"libc",
- "windows-sys 0.61.2",
+ "windows-sys 0.59.0",
]
[[package]]
@@ -8206,7 +8217,7 @@ dependencies = [
[[package]]
name = "server"
-version = "0.6.1-edge.5"
+version = "0.6.1-edge.6"
dependencies = [
"ahash 0.8.12",
"anyhow",
@@ -8247,7 +8258,7 @@ dependencies = [
"mimalloc",
"mime_guess",
"moka",
- "nix",
+ "nix 0.31.1",
"opentelemetry",
"opentelemetry-appender-tracing",
"opentelemetry-otlp",
@@ -8269,7 +8280,7 @@ dependencies = [
"socket2 0.6.2",
"static-toml",
"strum",
- "sysinfo",
+ "sysinfo 0.38.0",
"tempfile",
"thiserror 2.0.18",
"tokio",
@@ -8887,7 +8898,21 @@ dependencies = [
"ntapi",
"objc2-core-foundation",
"objc2-io-kit",
- "windows",
+ "windows 0.61.3",
+]
+
+[[package]]
+name = "sysinfo"
+version = "0.38.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "fe840c5b1afe259a5657392a4dbb74473a14c8db999c3ec2f4ae812e028a94da"
+dependencies = [
+ "libc",
+ "memchr",
+ "ntapi",
+ "objc2-core-foundation",
+ "objc2-io-kit",
+ "windows 0.62.2",
]
[[package]]
@@ -8923,7 +8948,7 @@ dependencies = [
"getrandom 0.3.4",
"once_cell",
"rustix",
- "windows-sys 0.61.2",
+ "windows-sys 0.59.0",
]
[[package]]
@@ -9884,7 +9909,7 @@ dependencies = [
"regex",
"rustc_version",
"rustversion",
- "sysinfo",
+ "sysinfo 0.37.2",
"time",
"vergen-lib",
]
@@ -10148,7 +10173,7 @@ version = "0.1.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22"
dependencies = [
- "windows-sys 0.61.2",
+ "windows-sys 0.48.0",
]
[[package]]
@@ -10163,11 +10188,23 @@ version = "0.61.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9babd3a767a4c1aef6900409f85f5d53ce2544ccdfaa86dad48c91782c6d6893"
dependencies = [
- "windows-collections",
+ "windows-collections 0.2.0",
"windows-core 0.61.2",
- "windows-future",
+ "windows-future 0.2.1",
"windows-link 0.1.3",
- "windows-numerics",
+ "windows-numerics 0.2.0",
+]
+
+[[package]]
+name = "windows"
+version = "0.62.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "527fadee13e0c05939a6a05d5bd6eec6cd2e3dbd648b9f8e447c6518133d8580"
+dependencies = [
+ "windows-collections 0.3.2",
+ "windows-core 0.62.2",
+ "windows-future 0.3.2",
+ "windows-numerics 0.3.1",
]
[[package]]
@@ -10179,6 +10216,15 @@ dependencies = [
"windows-core 0.61.2",
]
+[[package]]
+name = "windows-collections"
+version = "0.3.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "23b2d95af1a8a14a3c7367e1ed4fc9c20e0a26e79551b1454d72583c97cc6610"
+dependencies = [
+ "windows-core 0.62.2",
+]
+
[[package]]
name = "windows-core"
version = "0.61.2"
@@ -10213,7 +10259,18 @@ checksum =
"fc6a41e98427b19fe4b73c550f060b59fa592d7d686537eebf9385621bfbad8e"
dependencies = [
"windows-core 0.61.2",
"windows-link 0.1.3",
- "windows-threading",
+ "windows-threading 0.1.0",
+]
+
+[[package]]
+name = "windows-future"
+version = "0.3.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e1d6f90251fe18a279739e78025bd6ddc52a7e22f921070ccdc67dde84c605cb"
+dependencies = [
+ "windows-core 0.62.2",
+ "windows-link 0.2.1",
+ "windows-threading 0.2.1",
]
[[package]]
@@ -10260,6 +10317,16 @@ dependencies = [
"windows-link 0.1.3",
]
+[[package]]
+name = "windows-numerics"
+version = "0.3.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "6e2e40844ac143cdb44aead537bbf727de9b044e107a0f1220392177d15b0f26"
+dependencies = [
+ "windows-core 0.62.2",
+ "windows-link 0.2.1",
+]
+
[[package]]
name = "windows-result"
version = "0.3.4"
@@ -10422,6 +10489,15 @@ dependencies = [
"windows-link 0.1.3",
]
+[[package]]
+name = "windows-threading"
+version = "0.2.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "3949bd5b99cafdf1c7ca86b43ca564028dfe27d66958f2470940f73d86d75b37"
+dependencies = [
+ "windows-link 0.2.1",
+]
+
[[package]]
name = "windows_aarch64_gnullvm"
version = "0.42.2"
diff --git a/Cargo.toml b/Cargo.toml
index d845a659a..129bc2203 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -58,9 +58,9 @@ aes-gcm = "0.10.3"
ahash = { version = "0.8.12", features = ["serde"] }
anyhow = "1.0.100"
argon2 = "0.5.3"
-arrow = "57.0.0"
-arrow-array = "57.0.0"
-arrow-json = "57.0.0"
+arrow = "57.2.0"
+arrow-array = "57.2.0"
+arrow-json = "57.2.0"
async-broadcast = "0.7.2"
async-channel = "2.5.0"
async-dropper = { version = "0.3.1", features = ["tokio", "simple"] }
@@ -135,10 +135,10 @@ humantime = "2.3.0"
hwlocality = "1.0.0-alpha.11"
iceberg = "0.8.0"
iceberg-catalog-rest = "0.8.0"
-iggy = { path = "core/sdk", version = "0.8.1-edge.1" }
-iggy_binary_protocol = { path = "core/binary_protocol", version =
"0.8.1-edge.1" }
-iggy_common = { path = "core/common", version = "0.8.1-edge.1" }
-iggy_connector_sdk = { path = "core/connectors/sdk", version = "0.1.1-edge.1" }
+iggy = { path = "core/sdk", version = "0.8.1-edge.6" }
+iggy_binary_protocol = { path = "core/binary_protocol", version =
"0.8.1-edge.3" }
+iggy_common = { path = "core/common", version = "0.8.1-edge.2" }
+iggy_connector_sdk = { path = "core/connectors/sdk", version = "0.1.1-edge.3" }
integration = { path = "core/integration" }
keyring = { version = "3.6.3", features = ["sync-secret-service", "vendored"] }
lazy_static = "1.5.0"
@@ -147,7 +147,7 @@ log = "0.4.29"
metadata = { path = "core/metadata" }
mimalloc = "0.1"
mockall = "0.14.0"
-nix = { version = "0.30.1", features = ["fs", "resource", "sched"] }
+nix = { version = "0.31.1", features = ["fs", "resource", "sched"] }
nonzero_lit = "0.1.2"
once_cell = "1.21.3"
opentelemetry = { version = "0.31.0", features = ["trace", "logs"] }
@@ -168,7 +168,7 @@ opentelemetry_sdk = { version = "0.31.0", features = [
"experimental_logs_batch_log_processor_with_async_runtime",
"experimental_trace_batch_span_processor_with_async_runtime",
] }
-parquet = "57.0.0"
+parquet = "57.2.0"
passterm = "=2.0.1"
postcard = { version = "1.1.3", features = ["alloc"] }
@@ -197,13 +197,13 @@ serde_yaml_ng = "0.10.0"
serial_test = "3.3.1"
server = { path = "core/server" }
simd-json = { version = "0.17.0", features = ["serde_impl"] }
-slab = "0.4.1"
+slab = "0.4.11"
strum = { version = "0.27.2", features = ["derive"] }
strum_macros = "0.27.2"
-sysinfo = "0.37.2"
+sysinfo = "0.38.0"
tempfile = "3.24.0"
test-case = "3.3.1"
-thiserror = "2.0.17"
+thiserror = "2.0.18"
tokio = { version = "1.49.0", features = ["full"] }
tokio-rustls = "0.26.4"
tokio-tungstenite = { version = "0.28", features = ["rustls-tls-webpki-roots"]
}
@@ -225,7 +225,7 @@ tracing-subscriber = { version = "0.3.22", default-features
= false, features =
trait-variant = "0.1.2"
tungstenite = "0.28.0"
twox-hash = { version = "2.1.2", features = ["xxhash32"] }
-uuid = { version = "1.19.0", features = [
+uuid = { version = "1.20.0", features = [
"v4",
"v7",
"fast-rng",
@@ -233,7 +233,7 @@ uuid = { version = "1.19.0", features = [
"zerocopy",
] }
webpki-roots = "1.0.5"
-zip = { version = "7.1.0", default-features = false, features = ["deflate"] }
+zip = { version = "7.2.0", default-features = false, features = ["deflate"] }
[profile.release]
lto = true
diff --git a/DEPENDENCIES.md b/DEPENDENCIES.md
index 7d4f8133a..1320800e7 100644
--- a/DEPENDENCIES.md
+++ b/DEPENDENCIES.md
@@ -383,14 +383,14 @@ icu_provider: 2.1.1, "Unicode-3.0",
ident_case: 1.0.1, "Apache-2.0 OR MIT",
idna: 1.1.0, "Apache-2.0 OR MIT",
idna_adapter: 1.2.1, "Apache-2.0 OR MIT",
-iggy: 0.8.1-edge.5, "Apache-2.0",
-iggy-bench: 0.3.1-edge.1, "Apache-2.0",
+iggy: 0.8.1-edge.6, "Apache-2.0",
+iggy-bench: 0.3.1-edge.2, "Apache-2.0",
iggy-bench-dashboard-server: 0.5.1-edge.1, "Apache-2.0",
iggy-cli: 0.10.1-edge.1, "Apache-2.0",
-iggy-connectors: 0.2.1-edge.4, "Apache-2.0",
-iggy-mcp: 0.2.1-edge.4, "Apache-2.0",
-iggy_binary_protocol: 0.8.1-edge.2, "Apache-2.0",
-iggy_common: 0.8.1-edge.1, "Apache-2.0",
+iggy-connectors: 0.2.1-edge.5, "Apache-2.0",
+iggy-mcp: 0.2.1-edge.5, "Apache-2.0",
+iggy_binary_protocol: 0.8.1-edge.3, "Apache-2.0",
+iggy_common: 0.8.1-edge.2, "Apache-2.0",
iggy_connector_elasticsearch_sink: 0.2.0-edge.1, "Apache-2.0",
iggy_connector_elasticsearch_source: 0.2.0-edge.1, "Apache-2.0",
iggy_connector_iceberg_sink: 0.2.0-edge.1, "Apache-2.0",
@@ -398,7 +398,7 @@ iggy_connector_postgres_sink: 0.2.0-edge.1, "Apache-2.0",
iggy_connector_postgres_source: 0.2.0-edge.1, "Apache-2.0",
iggy_connector_quickwit_sink: 0.2.0-edge.1, "Apache-2.0",
iggy_connector_random_source: 0.2.0-edge.1, "Apache-2.0",
-iggy_connector_sdk: 0.1.1-edge.2, "Apache-2.0",
+iggy_connector_sdk: 0.1.1-edge.3, "Apache-2.0",
iggy_connector_stdout_sink: 0.2.0-edge.1, "Apache-2.0",
iggy_examples: 0.0.5, "Apache-2.0",
ignore: 0.4.25, "MIT OR Unlicense",
@@ -496,6 +496,7 @@ moka: 0.12.13, "(Apache-2.0 OR MIT) AND Apache-2.0",
murmur3: 0.5.2, "Apache-2.0 OR MIT",
never-say-never: 6.6.666, "Apache-2.0 OR MIT OR Zlib",
nix: 0.30.1, "MIT",
+nix: 0.31.1, "MIT",
nom: 7.1.3, "MIT",
nom: 8.0.0, "MIT",
nom_locate: 5.0.0, "MIT",
@@ -654,8 +655,8 @@ ring: 0.17.14, "Apache-2.0 AND ISC",
ringbuffer: 0.16.0, "MIT",
rkyv: 0.7.46, "MIT",
rkyv_derive: 0.7.46, "MIT",
-rmcp: 0.13.0, "Apache-2.0",
-rmcp-macros: 0.13.0, "Apache-2.0",
+rmcp: 0.14.0, "Apache-2.0",
+rmcp-macros: 0.14.0, "Apache-2.0",
rmp: 0.8.15, "MIT",
rmp-serde: 1.3.1, "MIT",
roaring: 0.11.3, "Apache-2.0 OR MIT",
@@ -715,7 +716,7 @@ serde_with_macros: 3.16.1, "Apache-2.0 OR MIT",
serde_yaml_ng: 0.10.0, "MIT",
serial_test: 3.3.1, "MIT",
serial_test_derive: 3.3.1, "MIT",
-server: 0.6.1-edge.5, "Apache-2.0",
+server: 0.6.1-edge.6, "Apache-2.0",
sha1: 0.10.6, "Apache-2.0 OR MIT",
sha2: 0.10.9, "Apache-2.0 OR MIT",
sha3: 0.10.8, "Apache-2.0 OR MIT",
@@ -765,6 +766,7 @@ synthez: 0.4.0, "BlueOak-1.0.0",
synthez-codegen: 0.4.0, "BlueOak-1.0.0",
synthez-core: 0.4.0, "BlueOak-1.0.0",
sysinfo: 0.37.2, "MIT",
+sysinfo: 0.38.0, "MIT",
tagptr: 0.2.0, "Apache-2.0 OR MIT",
tap: 1.0.1, "MIT",
tar: 0.4.44, "Apache-2.0 OR MIT",
@@ -890,15 +892,19 @@ winapi-i686-pc-windows-gnu: 0.4.0, "Apache-2.0 OR MIT",
winapi-util: 0.1.11, "MIT OR Unlicense",
winapi-x86_64-pc-windows-gnu: 0.4.0, "Apache-2.0 OR MIT",
windows: 0.61.3, "Apache-2.0 OR MIT",
+windows: 0.62.2, "Apache-2.0 OR MIT",
windows-collections: 0.2.0, "Apache-2.0 OR MIT",
+windows-collections: 0.3.2, "Apache-2.0 OR MIT",
windows-core: 0.61.2, "Apache-2.0 OR MIT",
windows-core: 0.62.2, "Apache-2.0 OR MIT",
windows-future: 0.2.1, "Apache-2.0 OR MIT",
+windows-future: 0.3.2, "Apache-2.0 OR MIT",
windows-implement: 0.60.2, "Apache-2.0 OR MIT",
windows-interface: 0.59.3, "Apache-2.0 OR MIT",
windows-link: 0.1.3, "Apache-2.0 OR MIT",
windows-link: 0.2.1, "Apache-2.0 OR MIT",
windows-numerics: 0.2.0, "Apache-2.0 OR MIT",
+windows-numerics: 0.3.1, "Apache-2.0 OR MIT",
windows-result: 0.3.4, "Apache-2.0 OR MIT",
windows-result: 0.4.1, "Apache-2.0 OR MIT",
windows-strings: 0.4.2, "Apache-2.0 OR MIT",
@@ -914,6 +920,7 @@ windows-targets: 0.48.5, "Apache-2.0 OR MIT",
windows-targets: 0.52.6, "Apache-2.0 OR MIT",
windows-targets: 0.53.5, "Apache-2.0 OR MIT",
windows-threading: 0.1.0, "Apache-2.0 OR MIT",
+windows-threading: 0.2.1, "Apache-2.0 OR MIT",
windows_aarch64_gnullvm: 0.42.2, "Apache-2.0 OR MIT",
windows_aarch64_gnullvm: 0.48.5, "Apache-2.0 OR MIT",
windows_aarch64_gnullvm: 0.52.6, "Apache-2.0 OR MIT",
diff --git a/core/ai/mcp/Cargo.toml b/core/ai/mcp/Cargo.toml
index f8f8c138e..d9c8b9ad9 100644
--- a/core/ai/mcp/Cargo.toml
+++ b/core/ai/mcp/Cargo.toml
@@ -17,7 +17,7 @@
[package]
name = "iggy-mcp"
-version = "0.2.1-edge.4"
+version = "0.2.1-edge.5"
description = "MCP Server for Iggy message streaming platform"
edition = "2024"
license = "Apache-2.0"
@@ -41,7 +41,7 @@ opentelemetry-otlp = { workspace = true }
opentelemetry-semantic-conventions = { workspace = true }
opentelemetry_sdk = { workspace = true, features = ["rt-tokio"] }
reqwest = { workspace = true }
-rmcp = { version = "0.13.0", features = [
+rmcp = { version = "0.14.0", features = [
"server",
"transport-io",
"transport-streamable-http-server",
diff --git a/core/bench/Cargo.toml b/core/bench/Cargo.toml
index 42ed50e44..bbfbae035 100644
--- a/core/bench/Cargo.toml
+++ b/core/bench/Cargo.toml
@@ -17,7 +17,7 @@
[package]
name = "iggy-bench"
-version = "0.3.1-edge.1"
+version = "0.3.1-edge.2"
edition = "2024"
license = "Apache-2.0"
repository = "https://github.com/apache/iggy"
diff --git a/core/bench/dashboard/server/Cargo.toml
b/core/bench/dashboard/server/Cargo.toml
index 6303a663a..0ae74810b 100644
--- a/core/bench/dashboard/server/Cargo.toml
+++ b/core/bench/dashboard/server/Cargo.toml
@@ -30,7 +30,7 @@ bench-report = { workspace = true }
chrono = { workspace = true, features = ["serde"] }
clap = { workspace = true }
dashmap = { workspace = true }
-file-operation = "0.8.11"
+file-operation = "0.8.17"
notify = "8.2.0"
octocrab = "0.49.5"
serde = { workspace = true, features = ["derive"] }
diff --git a/core/binary_protocol/Cargo.toml b/core/binary_protocol/Cargo.toml
index bb10cb841..c015b5257 100644
--- a/core/binary_protocol/Cargo.toml
+++ b/core/binary_protocol/Cargo.toml
@@ -17,7 +17,7 @@
[package]
name = "iggy_binary_protocol"
-version = "0.8.1-edge.2"
+version = "0.8.1-edge.3"
description = "Iggy is the persistent message streaming platform written in
Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing
millions of messages per second."
edition = "2024"
license = "Apache-2.0"
diff --git a/core/common/Cargo.toml b/core/common/Cargo.toml
index a529c1ab3..52ea4c5ea 100644
--- a/core/common/Cargo.toml
+++ b/core/common/Cargo.toml
@@ -16,7 +16,7 @@
# under the License.
[package]
name = "iggy_common"
-version = "0.8.1-edge.1"
+version = "0.8.1-edge.2"
description = "Iggy is the persistent message streaming platform written in
Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing
millions of messages per second."
edition = "2024"
license = "Apache-2.0"
@@ -57,7 +57,7 @@ human-repr = { workspace = true }
humantime = { workspace = true }
nix = { workspace = true }
once_cell = { workspace = true }
-rcgen = "0.14.6"
+rcgen = "0.14.7"
ring = "0.17.14"
rustls = { workspace = true }
serde = { workspace = true }
diff --git a/core/connectors/runtime/Cargo.toml
b/core/connectors/runtime/Cargo.toml
index 0ec17f5b3..dea88451a 100644
--- a/core/connectors/runtime/Cargo.toml
+++ b/core/connectors/runtime/Cargo.toml
@@ -17,7 +17,7 @@
[package]
name = "iggy-connectors"
-version = "0.2.1-edge.4"
+version = "0.2.1-edge.5"
description = "Connectors runtime for Iggy message streaming platform"
edition = "2024"
license = "Apache-2.0"
diff --git a/core/connectors/runtime/src/sink.rs
b/core/connectors/runtime/src/sink.rs
index 3f55d4d8a..40b41d886 100644
--- a/core/connectors/runtime/src/sink.rs
+++ b/core/connectors/runtime/src/sink.rs
@@ -181,16 +181,14 @@ pub async fn init(
pub fn consume(sinks: Vec<SinkConnectorWrapper>, context: Arc<RuntimeContext>)
{
for sink in sinks {
for plugin in sink.plugins {
- if plugin.error.is_none() {
- info!("Starting consume for sink with ID: {}...", plugin.id);
- } else {
+ if let Some(error) = &plugin.error {
error!(
- "Failed to initialize sink connector with ID: {}: {}.
Skipping...",
+ "Failed to initialize sink connector with ID: {}: {error}.
Skipping...",
plugin.id,
- plugin.error.as_ref().expect("Error should be present")
);
continue;
}
+ info!("Starting consume for sink with ID: {}...", plugin.id);
for consumer in plugin.consumers {
let plugin_key = plugin.key.clone();
let context = context.clone();
diff --git a/core/connectors/runtime/src/source.rs
b/core/connectors/runtime/src/source.rs
index d366a5de5..fba517989 100644
--- a/core/connectors/runtime/src/source.rs
+++ b/core/connectors/runtime/src/source.rs
@@ -226,15 +226,13 @@ pub fn handle(sources: Vec<SourceConnectorWrapper>,
context: Arc<RuntimeContext>
let plugin_key = plugin.key.clone();
let context = context.clone();
- if plugin.error.is_none() {
- info!("Starting handler for source connector with ID:
{plugin_id}...");
- } else {
+ if let Some(error) = &plugin.error {
error!(
- "Failed to initialize source connector with ID:
{plugin_id}: {}. Skipping...",
- plugin.error.as_ref().expect("Error should be present")
+ "Failed to initialize source connector with ID:
{plugin_id}: {error}. Skipping...",
);
continue;
}
+ info!("Starting handler for source connector with ID:
{plugin_id}...");
let handle = source.callback;
tokio::task::spawn_blocking(move || {
diff --git a/core/connectors/sdk/Cargo.toml b/core/connectors/sdk/Cargo.toml
index 72bafef01..530c5ba56 100644
--- a/core/connectors/sdk/Cargo.toml
+++ b/core/connectors/sdk/Cargo.toml
@@ -17,7 +17,7 @@
[package]
name = "iggy_connector_sdk"
-version = "0.1.1-edge.2"
+version = "0.1.1-edge.3"
description = "Iggy is the persistent message streaming platform written in
Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing
millions of messages per second."
edition = "2024"
license = "Apache-2.0"
diff --git a/core/integration/Cargo.toml b/core/integration/Cargo.toml
index 6dec491f5..87b95b6f1 100644
--- a/core/integration/Cargo.toml
+++ b/core/integration/Cargo.toml
@@ -47,9 +47,9 @@ libc = "0.2.180"
log = { workspace = true }
predicates = { workspace = true }
rand = { workspace = true }
-rcgen = "0.14.6"
+rcgen = "0.14.7"
reqwest = { workspace = true }
-rmcp = { version = "0.13.0", features = [
+rmcp = { version = "0.14.0", features = [
"client",
"reqwest",
"transport-streamable-http-client",
diff --git a/core/integration/src/test_mcp_server.rs
b/core/integration/src/test_mcp_server.rs
index 9d729eaa8..f4179a698 100644
--- a/core/integration/src/test_mcp_server.rs
+++ b/core/integration/src/test_mcp_server.rs
@@ -182,6 +182,7 @@ impl TestMcpServer {
let client_info = ClientInfo {
protocol_version: Default::default(),
capabilities: ClientCapabilities::default(),
+ meta: None,
client_info: Implementation {
name: "test-mcp-client".to_string(),
version: "1.0.0".to_string(),
diff --git a/core/integration/tests/mcp/mod.rs
b/core/integration/tests/mcp/mod.rs
index 3fab31930..0fadd7ea3 100644
--- a/core/integration/tests/mcp/mod.rs
+++ b/core/integration/tests/mcp/mod.rs
@@ -35,7 +35,7 @@ use integration::{
use lazy_static::lazy_static;
use rmcp::{
ServiceError,
- model::{CallToolRequestParam, CallToolResult, ListToolsResult},
+ model::{CallToolRequestParams, CallToolResult, ListToolsResult},
serde::de::DeserializeOwned,
serde_json::{self, json},
};
@@ -698,10 +698,11 @@ impl TestMcpClient {
data: Option<serde_json::Value>,
) -> Result<CallToolResult, ServiceError> {
self.mcp_client
- .call_tool(CallToolRequestParam {
+ .call_tool(CallToolRequestParams {
name: method.to_owned().into(),
arguments: data.and_then(|value| value.as_object().cloned()),
task: None,
+ meta: None,
})
.await
}
diff --git a/core/integration/tests/sdk/producer/background.rs
b/core/integration/tests/sdk/producer/background.rs
index d8d7818ae..8acb05de2 100644
--- a/core/integration/tests/sdk/producer/background.rs
+++ b/core/integration/tests/sdk/producer/background.rs
@@ -493,3 +493,146 @@ async fn background_linger_time_respected_after_idle() {
producer.shutdown().await;
cleanup(&client).await;
}
+
+#[tokio::test]
+#[parallel]
+async fn background_preserves_message_ordering() {
+ let mut test_server = TestServer::default();
+ test_server.start();
+
+ let tcp_client_config = TcpClientConfig {
+ server_address: test_server.get_raw_tcp_addr().unwrap(),
+ ..TcpClientConfig::default()
+ };
+ let client =
ClientWrapper::Tcp(TcpClient::create(Arc::new(tcp_client_config)).unwrap());
+ let client = IggyClient::create(client, None, None);
+
+ client.connect().await.unwrap();
+ login_root(&client).await;
+ init_system(&client).await;
+
+ let messages_count = 1000u32;
+
+ let cfg = BackgroundConfig::builder()
+ .linger_time(IggyDuration::from(50_000))
+ .build();
+
+ let producer = client
+ .producer(STREAM_NAME, TOPIC_NAME)
+ .unwrap()
+ .partitioning(Partitioning::partition_id(PARTITION_ID))
+ .background(cfg)
+ .build();
+
+ for i in 0..messages_count {
+ let payload = Bytes::from(i.to_le_bytes().to_vec());
+ let msg = IggyMessage::builder()
+ .id(i as u128 + 1)
+ .payload(payload)
+ .build()
+ .unwrap();
+ producer.send(vec![msg]).await.unwrap();
+ }
+
+ producer.shutdown().await;
+
+ let consumer = Consumer::default();
+ let polled_messages = client
+ .poll_messages(
+ &Identifier::named(STREAM_NAME).unwrap(),
+ &Identifier::named(TOPIC_NAME).unwrap(),
+ Some(PARTITION_ID),
+ &consumer,
+ &PollingStrategy::offset(0),
+ messages_count,
+ false,
+ )
+ .await
+ .unwrap();
+
+ assert_eq!(polled_messages.messages.len() as u32, messages_count);
+
+ for (idx, msg) in polled_messages.messages.iter().enumerate() {
+ let expected = idx as u32;
+ let actual = u32::from_le_bytes(msg.payload[..4].try_into().unwrap());
+ assert_eq!(
+ actual, expected,
+ "Message at position {} has wrong order: expected {}, got {}",
+ idx, expected, actual
+ );
+ }
+
+ cleanup(&client).await;
+}
+
+#[tokio::test]
+#[parallel]
+async fn background_preserves_ordering_with_multiple_shards() {
+ let mut test_server = TestServer::default();
+ test_server.start();
+
+ let tcp_client_config = TcpClientConfig {
+ server_address: test_server.get_raw_tcp_addr().unwrap(),
+ ..TcpClientConfig::default()
+ };
+ let client =
ClientWrapper::Tcp(TcpClient::create(Arc::new(tcp_client_config)).unwrap());
+ let client = IggyClient::create(client, None, None);
+
+ client.connect().await.unwrap();
+ login_root(&client).await;
+ init_system(&client).await;
+
+ let messages_count = 1000u32;
+
+ let cfg = BackgroundConfig::builder()
+ .num_shards(4)
+ .linger_time(IggyDuration::from(50_000))
+ .build();
+
+ let producer = client
+ .producer(STREAM_NAME, TOPIC_NAME)
+ .unwrap()
+ .partitioning(Partitioning::partition_id(PARTITION_ID))
+ .background(cfg)
+ .build();
+
+ for i in 0..messages_count {
+ let payload = Bytes::from(i.to_le_bytes().to_vec());
+ let msg = IggyMessage::builder()
+ .id(i as u128 + 1)
+ .payload(payload)
+ .build()
+ .unwrap();
+ producer.send(vec![msg]).await.unwrap();
+ }
+
+ producer.shutdown().await;
+
+ let consumer = Consumer::default();
+ let polled_messages = client
+ .poll_messages(
+ &Identifier::named(STREAM_NAME).unwrap(),
+ &Identifier::named(TOPIC_NAME).unwrap(),
+ Some(PARTITION_ID),
+ &consumer,
+ &PollingStrategy::offset(0),
+ messages_count,
+ false,
+ )
+ .await
+ .unwrap();
+
+ assert_eq!(polled_messages.messages.len() as u32, messages_count);
+
+ for (idx, msg) in polled_messages.messages.iter().enumerate() {
+ let expected = idx as u32;
+ let actual = u32::from_le_bytes(msg.payload[..4].try_into().unwrap());
+ assert_eq!(
+ actual, expected,
+ "Message at position {} has wrong order: expected {}, got {}",
+ idx, expected, actual
+ );
+ }
+
+ cleanup(&client).await;
+}
diff --git a/core/sdk/Cargo.toml b/core/sdk/Cargo.toml
index c83cfc06b..6d33647bb 100644
--- a/core/sdk/Cargo.toml
+++ b/core/sdk/Cargo.toml
@@ -17,7 +17,7 @@
[package]
name = "iggy"
-version = "0.8.1-edge.5"
+version = "0.8.1-edge.6"
description = "Iggy is the persistent message streaming platform written in
Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing
millions of messages per second."
edition = "2024"
license = "Apache-2.0"
@@ -48,7 +48,6 @@ futures = { workspace = true }
futures-util = { workspace = true }
iggy_binary_protocol = { workspace = true }
iggy_common = { workspace = true }
-num_cpus = "1.17.0"
quinn = { workspace = true }
reqwest = { workspace = true }
reqwest-middleware = { workspace = true }
diff --git a/core/sdk/src/clients/consumer.rs b/core/sdk/src/clients/consumer.rs
index 3d7784938..81be663a7 100644
--- a/core/sdk/src/clients/consumer.rs
+++ b/core/sdk/src/clients/consumer.rs
@@ -475,10 +475,6 @@ impl IggyConsumer {
tokio::spawn(async move {
loop {
sleep(interval.get_duration()).await;
- if shutdown.load(ORDERING) {
- trace!("Shutdown signal received, stopping background
offset storage");
- break;
- }
for entry in last_consumed_offsets.iter() {
let partition_id = *entry.key();
let consumed_offset = entry.load(ORDERING);
@@ -494,6 +490,10 @@ impl IggyConsumer {
)
.await;
}
+ if shutdown.load(ORDERING) {
+ trace!("Shutdown signal received, stopping background
offset storage");
+ break;
+ }
}
});
}
@@ -1056,6 +1056,69 @@ impl Stream for IggyConsumer {
}
}
+impl IggyConsumer {
+ pub async fn shutdown(&mut self) -> Result<(), IggyError> {
+ if self.shutdown.swap(true, ORDERING) {
+ return Ok(());
+ }
+
+ info!("Shutting down consumer: {}...", self.consumer_name);
+
+ for entry in self.last_consumed_offsets.iter() {
+ let partition_id = *entry.key();
+ let consumed_offset = entry.load(ORDERING);
+
+ let stored_offset = self
+ .last_stored_offsets
+ .get(&partition_id)
+ .map(|e| e.load(ORDERING))
+ .unwrap_or(0);
+
+ if consumed_offset > stored_offset {
+ trace!(
+ "Flushing final offset: {consumed_offset} for partition:
{partition_id}, stream: {}, topic: {}",
+ self.stream_id, self.topic_id
+ );
+ let _ = Self::store_consumer_offset(
+ &self.client,
+ &self.consumer,
+ &self.stream_id,
+ &self.topic_id,
+ partition_id,
+ consumed_offset,
+ &self.last_stored_offsets,
+ self.allow_replay,
+ )
+ .await;
+ }
+ }
+
+ if self.is_consumer_group && self.joined_consumer_group.load(ORDERING)
{
+ let group_id = self.consumer.id.clone();
+ trace!(
+ "Leaving consumer group: {group_id} for stream: {}, topic: {}",
+ self.stream_id, self.topic_id
+ );
+
+ let client = self.client.read().await;
+ if let Err(error) = client
+ .leave_consumer_group(&self.stream_id, &self.topic_id,
&group_id)
+ .await
+ {
+ warn!(
+ "Failed to leave consumer group: {group_id} for stream:
{}, topic: {}. {error}",
+ self.stream_id, self.topic_id
+ );
+ } else {
+ self.joined_consumer_group.store(false, ORDERING);
+ }
+ }
+
+ info!("Consumer: {} has been shut down.", self.consumer_name);
+ Ok(())
+ }
+}
+
impl Drop for IggyConsumer {
fn drop(&mut self) {
self.shutdown.store(true, ORDERING);
diff --git a/core/sdk/src/clients/producer.rs b/core/sdk/src/clients/producer.rs
index 0a6ec018e..67a5bf8d4 100644
--- a/core/sdk/src/clients/producer.rs
+++ b/core/sdk/src/clients/producer.rs
@@ -185,6 +185,7 @@ impl ProducerCore {
messages: &mut [IggyMessage],
) -> Result<(), IggyError> {
let client = self.client.read().await;
+
let Some(max_retries) = self.send_retries_count else {
return client
.send_messages(stream, topic, partitioning, messages)
@@ -197,25 +198,10 @@ impl ProducerCore {
.await;
}
- let mut timer = if let Some(interval) = self.send_retries_interval {
- let mut timer = tokio::time::interval(interval.get_duration());
- timer.tick().await;
- Some(timer)
- } else {
- None
- };
-
- self.wait_until_connected(max_retries, stream, topic, &mut timer)
+ self.wait_until_connected(max_retries, stream, topic)
.await?;
- self.send_with_retries(
- max_retries,
- stream,
- topic,
- partitioning,
- messages,
- &mut timer,
- )
- .await
+ self.send_with_retries(&client, max_retries, stream, topic,
partitioning, messages)
+ .await
}
async fn wait_until_connected(
@@ -223,9 +209,10 @@ impl ProducerCore {
max_retries: u32,
stream: &Identifier,
topic: &Identifier,
- timer: &mut Option<Interval>,
) -> Result<(), IggyError> {
let mut retries = 0;
+ let mut timer: Option<Interval> = None;
+
while !self.can_send.load(ORDERING) {
retries += 1;
if retries > max_retries {
@@ -241,7 +228,9 @@ impl ProducerCore {
but the client is disconnected. Retrying
{retries}/{max_retries}..."
);
- if let Some(timer) = timer.as_mut() {
+ if let Some(interval) = self.send_retries_interval {
+ let timer =
+ timer.get_or_insert_with(||
tokio::time::interval(interval.get_duration()));
trace!(
"Waiting for the next retry to send messages to topic:
{topic}, \
stream: {stream} for disconnected client..."
@@ -254,15 +243,16 @@ impl ProducerCore {
async fn send_with_retries(
&self,
+ client: &ClientWrapper,
max_retries: u32,
stream: &Identifier,
topic: &Identifier,
partitioning: &Arc<Partitioning>,
messages: &mut [IggyMessage],
- timer: &mut Option<Interval>,
) -> Result<(), IggyError> {
- let client = self.client.read().await;
let mut retries = 0;
+ let mut timer: Option<Interval> = None;
+
loop {
match client
.send_messages(stream, topic, partitioning, messages)
@@ -284,12 +274,14 @@ impl ProducerCore {
{error} Retrying {retries}/{max_retries}..."
);
- if let Some(t) = timer.as_mut() {
+ if let Some(interval) = self.send_retries_interval {
+ let timer = timer
+ .get_or_insert_with(||
tokio::time::interval(interval.get_duration()));
trace!(
"Waiting for the next retry to send messages to
topic: {topic}, \
stream: {stream}..."
);
- t.tick().await;
+ timer.tick().await;
}
}
}
@@ -570,8 +562,8 @@ impl IggyProducer {
}
pub async fn shutdown(self) {
- if let Some(disp) = self.dispatcher {
- disp.shutdown().await;
+ if let Some(dispatcher) = self.dispatcher {
+ dispatcher.shutdown().await;
}
}
}
diff --git a/core/sdk/src/clients/producer_config.rs
b/core/sdk/src/clients/producer_config.rs
index dd3f301de..26d4bed6f 100644
--- a/core/sdk/src/clients/producer_config.rs
+++ b/core/sdk/src/clients/producer_config.rs
@@ -17,7 +17,7 @@
*/
use crate::clients::MIB;
use crate::clients::producer_error_callback::{ErrorCallback, LogErrorCallback};
-use crate::clients::producer_sharding::{BalancedSharding, Sharding};
+use crate::clients::producer_sharding::{OrderedSharding, Sharding};
use bon::Builder;
use iggy_common::{IggyByteSize, IggyDuration};
use std::sync::Arc;
@@ -67,10 +67,13 @@ pub enum BackpressureMode {
pub struct BackgroundConfig {
/// Number of shard-workers that run in parallel.
///
- /// The default is `num_cpus::get().clamp(2, 16)`.
- /// More shards increase throughput by parallelising
- /// serialisation, compression and network I/O, but consume more memory.
- #[builder(default = default_shard_count())]
+ /// With the default `OrderedSharding` strategy, messages to the same
+ /// stream/topic are always routed to the same shard, preserving ordering.
+ /// Increasing shards improves throughput only when sending to multiple
streams/topics.
+ ///
+ /// With `BalancedSharding`, messages are distributed round-robin across
all shards
+ /// for maximum single-destination throughput, but ordering is **not**
preserved.
+ #[builder(default = 1)]
pub num_shards: usize,
/// How long a shard may wait before flushing an *incomplete* batch.
///
@@ -84,26 +87,36 @@ pub struct BackgroundConfig {
#[builder(default = Arc::new(Box::new(LogErrorCallback)))]
pub error_callback: Arc<Box<dyn ErrorCallback + Send + Sync>>,
/// Strategy that maps a message to a shard.
- #[builder(default = Box::new(BalancedSharding::default()))]
+ ///
+ /// Default is `OrderedSharding` which routes all messages for the same
+ /// stream/topic to the same shard, preserving message ordering.
+ ///
+ /// Use `BalancedSharding` for maximum throughput when ordering doesn't
matter.
+ #[builder(default = Box::new(OrderedSharding))]
pub sharding: Box<dyn Sharding + Send + Sync>,
- /// Maximum **total size in bytes** of a batch.
+ /// Maximum **total size in bytes** of a batch.
/// `0` ⇒ unlimited (size-based batching disabled).
#[builder(default = MIB)]
pub batch_size: usize,
- /// Maximum **number of messages** per batch.
+ /// Maximum **number of messages** per batch.
/// `0` ⇒ unlimited (length-based batching disabled).
#[builder(default = 1000)]
pub batch_length: usize,
/// Action to apply when back-pressure limits are reached
#[builder(default = BackpressureMode::Block)]
pub failure_mode: BackpressureMode,
- /// Upper bound for the **bytes held in memory** across *all* shards.
+ /// Upper bound for the **bytes held in memory** across *all* shards.
/// `IggyByteSize::from(0)` ⇒ unlimited.
#[builder(default = IggyByteSize::from(32 * MIB as u64))]
pub max_buffer_size: IggyByteSize,
- /// Maximum number of **in-flight requests** (batches being sent).
- /// `0` ⇒ unlimited.
- #[builder(default = default_shard_count() * 2)]
+ /// Maximum number of **in-flight requests** (batches being sent).
+ ///
+ /// **WARNING**: Using more than 1 may cause message reordering if retries
occur.
+ /// With max_in_flight > 1, a failed batch could be retried after later
batches succeed.
+ ///
+ /// The default is `1` to preserve message ordering.
+ /// `0` ⇒ unlimited (no ordering guarantee).
+ #[builder(default = 1)]
pub max_in_flight: usize,
}
@@ -134,11 +147,6 @@ pub struct DirectConfig {
#[builder(default = 1000)]
pub batch_length: u32,
/// How long to wait for more messages before flushing the current set.
- #[builder(default = IggyDuration::from(1000))]
+ #[builder(default = IggyDuration::from(0))]
pub linger_time: IggyDuration,
}
-
-fn default_shard_count() -> usize {
- let cpus = num_cpus::get();
- cpus.clamp(2, 64)
-}
diff --git a/core/sdk/src/clients/producer_dispatcher.rs
b/core/sdk/src/clients/producer_dispatcher.rs
index fa6dded4c..c8ab06423 100644
--- a/core/sdk/src/clients/producer_dispatcher.rs
+++ b/core/sdk/src/clients/producer_dispatcher.rs
@@ -38,7 +38,12 @@ pub struct ProducerDispatcher {
impl ProducerDispatcher {
pub fn new(core: Arc<impl ProducerCoreBackend>, config: BackgroundConfig)
-> Self {
- let mut shards = Vec::with_capacity(config.num_shards);
+ let num_shards = if config.num_shards == 0 {
+ 1
+ } else {
+ config.num_shards
+ };
+ let mut shards = Vec::with_capacity(num_shards);
let config = Arc::new(config);
let (err_tx, err_rx) = flume::unbounded::<ErrorCtx>();
@@ -70,7 +75,7 @@ impl ProducerDispatcher {
}
});
- for _ in 0..config.num_shards {
+ for _ in 0..num_shards {
let stop_rx = stop_tx.subscribe();
shards.push(Shard::new(
core.clone(),
diff --git a/core/sdk/src/clients/producer_sharding.rs
b/core/sdk/src/clients/producer_sharding.rs
index 8bdca2213..545212984 100644
--- a/core/sdk/src/clients/producer_sharding.rs
+++ b/core/sdk/src/clients/producer_sharding.rs
@@ -19,6 +19,8 @@ use crate::clients::producer::ProducerCoreBackend;
use crate::clients::producer_config::BackgroundConfig;
use crate::clients::producer_error_callback::ErrorCtx;
use iggy_common::{Identifier, IggyByteSize, IggyError, IggyMessage,
Partitioning, Sizeable};
+use std::hash::DefaultHasher;
+use std::hash::{Hash, Hasher};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use tokio::sync::{OwnedSemaphorePermit, broadcast};
@@ -42,13 +44,16 @@ pub trait Sharding: Send + Sync + std::fmt::Debug + 'static
{
/// A simple round-robin sharding strategy.
/// Distributes messages evenly across all shards by incrementing an atomic
counter.
+///
+/// **WARNING**: This strategy does NOT preserve message ordering across
shards.
+/// Messages to the same stream/topic may be processed out of order.
+/// Use `OrderedSharding` if message ordering is required.
#[derive(Default, Debug)]
pub struct BalancedSharding {
counter: AtomicUsize,
}
impl Sharding for BalancedSharding {
- /// Picks the next shard in a round-robin fashion.
fn pick_shard(
&self,
num_shards: usize,
@@ -60,6 +65,29 @@ impl Sharding for BalancedSharding {
}
}
+/// A sharding strategy that preserves message ordering by routing all messages
+/// for the same stream/topic combination to the same shard.
+///
+/// This ensures that messages sent to the same destination are processed
+/// in the order they were dispatched, even when using multiple shards.
+#[derive(Default, Debug)]
+pub struct OrderedSharding;
+
+impl Sharding for OrderedSharding {
+ fn pick_shard(
+ &self,
+ num_shards: usize,
+ _: &[IggyMessage],
+ stream: &Identifier,
+ topic: &Identifier,
+ ) -> usize {
+ let mut hasher = DefaultHasher::new();
+ stream.hash(&mut hasher);
+ topic.hash(&mut hasher);
+ (hasher.finish() as usize) % num_shards
+ }
+}
+
#[derive(Debug)]
pub struct ShardMessage {
pub stream: Arc<Identifier>,
@@ -169,6 +197,10 @@ impl Shard {
}
_ = stop_rx.recv() => {
closed_clone.store(true, Ordering::Release);
+ while let Ok(msg) = rx.try_recv() {
+ buffer_bytes +=
msg.inner.get_size_bytes().as_bytes_usize();
+ buffer.push(msg);
+ }
if !buffer.is_empty() {
Self::flush_buffer(&core, &mut buffer, &mut
buffer_bytes, &err_sender).await;
}
@@ -191,7 +223,22 @@ impl Shard {
buffer_bytes: &mut usize,
err_sender: &flume::Sender<ErrorCtx>,
) {
+ if buffer.is_empty() {
+ return;
+ }
+
+ let mut merged_batches: Vec<ShardMessageWithPermits> = Vec::new();
for msg in buffer.drain(..) {
+ if let Some(last) = merged_batches.last_mut()
+ && Self::same_destination(&last.inner, &msg.inner)
+ {
+ last.inner.messages.extend(msg.inner.messages);
+ continue;
+ }
+ merged_batches.push(msg);
+ }
+
+ for msg in merged_batches {
let result = core
.send_internal(
&msg.inner.stream,
@@ -201,13 +248,13 @@ impl Shard {
)
.await;
- if let Err(err) = result {
+ if let Err(error) = result {
if let IggyError::ProducerSendFailed {
failed,
cause,
stream_name,
topic_name,
- } = &err
+ } = &error
{
let ctx = ErrorCtx {
cause: cause.to_owned(),
@@ -220,7 +267,7 @@ impl Shard {
};
let _ = err_sender.send_async(ctx).await;
} else {
- tracing::error!("background send failed: {err}");
+ error!("Background send failed. {error}");
}
}
}
@@ -237,6 +284,12 @@ impl Shard {
IggyError::BackgroundSendError
})
}
+
+ fn same_destination(first: &ShardMessage, second: &ShardMessage) -> bool {
+ first.stream == second.stream
+ && first.topic == second.topic
+ && first.partitioning == second.partitioning
+ }
}
#[cfg(test)]
diff --git a/core/sdk/src/prelude.rs b/core/sdk/src/prelude.rs
index 5459b3125..ba374fe6b 100644
--- a/core/sdk/src/prelude.rs
+++ b/core/sdk/src/prelude.rs
@@ -40,6 +40,7 @@ pub use crate::clients::consumer_builder::IggyConsumerBuilder;
pub use crate::clients::producer::IggyProducer;
pub use crate::clients::producer_builder::IggyProducerBuilder;
pub use crate::clients::producer_config::{BackgroundConfig, DirectConfig};
+pub use crate::clients::producer_sharding::{BalancedSharding, OrderedSharding,
Sharding};
pub use crate::consumer_ext::IggyConsumerMessageExt;
pub use crate::stream_builder::IggyConsumerConfig;
pub use crate::stream_builder::IggyStreamConsumer;
diff --git a/core/server/Cargo.toml b/core/server/Cargo.toml
index 26170f137..3e4e7192f 100644
--- a/core/server/Cargo.toml
+++ b/core/server/Cargo.toml
@@ -17,7 +17,7 @@
[package]
name = "server"
-version = "0.6.1-edge.5"
+version = "0.6.1-edge.6"
edition = "2024"
license = "Apache-2.0"
@@ -75,7 +75,7 @@ lending-iterator = "0.1.7"
metadata = { workspace = true }
mimalloc = { workspace = true, optional = true }
mime_guess = { version = "2.0", optional = true }
-moka = { version = "0.12.12", features = ["future"] }
+moka = { version = "0.12.13", features = ["future"] }
nix = { workspace = true }
opentelemetry = { workspace = true }
opentelemetry-appender-tracing = { workspace = true }
@@ -95,7 +95,7 @@ send_wrapper = "0.6.0"
serde = { workspace = true }
serde_with = { workspace = true }
slab = { workspace = true }
-socket2 = "0.6.1"
+socket2 = "0.6.2"
static-toml = "1.3.0"
strum = { workspace = true }
sysinfo = { workspace = true }