This is an automated email from the ASF dual-hosted git repository.

piotr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/master by this push:
     new 486cdbcf8 fix(sdk): improve high-level SDK direct and background 
producers (#2621)
486cdbcf8 is described below

commit 486cdbcf8c605c10d57dfbd45658e6f2164b1dfa
Author: Piotr Gankiewicz <[email protected]>
AuthorDate: Tue Jan 27 10:03:01 2026 +0100

    fix(sdk): improve high-level SDK direct and background producers (#2621)
    
    - Direct producer: removed redundant lock acquisition, lazy timer 
initialization
    - Background producer now preserves message ordering by default 
(`OrderedSharding`, `max_in_flight = 1`)
    - Added `shutdown()` to consumer - flushes pending offsets and leaves 
consumer group gracefully
    - Fixed message loss on shutdown - both producer and consumer now 
drain/flush before exit
    - Fixed `num_shards = 0` panic - treated as 1
    - Updated Rust dependencies
---
 Cargo.lock                                        | 158 ++++++++++++++++------
 Cargo.toml                                        |  28 ++--
 DEPENDENCIES.md                                   |  27 ++--
 core/ai/mcp/Cargo.toml                            |   4 +-
 core/bench/Cargo.toml                             |   2 +-
 core/bench/dashboard/server/Cargo.toml            |   2 +-
 core/binary_protocol/Cargo.toml                   |   2 +-
 core/common/Cargo.toml                            |   4 +-
 core/connectors/runtime/Cargo.toml                |   2 +-
 core/connectors/runtime/src/sink.rs               |   8 +-
 core/connectors/runtime/src/source.rs             |   8 +-
 core/connectors/sdk/Cargo.toml                    |   2 +-
 core/integration/Cargo.toml                       |   4 +-
 core/integration/src/test_mcp_server.rs           |   1 +
 core/integration/tests/mcp/mod.rs                 |   5 +-
 core/integration/tests/sdk/producer/background.rs | 143 ++++++++++++++++++++
 core/sdk/Cargo.toml                               |   3 +-
 core/sdk/src/clients/consumer.rs                  |  71 +++++++++-
 core/sdk/src/clients/producer.rs                  |  44 +++---
 core/sdk/src/clients/producer_config.rs           |  44 +++---
 core/sdk/src/clients/producer_dispatcher.rs       |   9 +-
 core/sdk/src/clients/producer_sharding.rs         |  61 ++++++++-
 core/sdk/src/prelude.rs                           |   1 +
 core/server/Cargo.toml                            |   6 +-
 24 files changed, 492 insertions(+), 147 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index c8894b31d..4d342f57b 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1093,7 +1093,7 @@ dependencies = [
  "rand 0.9.2",
  "serde",
  "serde_json",
- "sysinfo",
+ "sysinfo 0.38.0",
  "tracing",
  "uuid",
 ]
@@ -1317,7 +1317,7 @@ version = "3.8.2"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "89ec27229c38ed0eb3c0feee3d2c1d6a4379ae44f418a29a658890e062d8f365"
 dependencies = [
- "darling 0.23.0",
+ "darling 0.20.11",
  "ident_case",
  "prettyplease",
  "proc-macro2",
@@ -1665,7 +1665,7 @@ version = "3.1.1"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "faf9468729b8cbcea668e36183cb69d317348c2e08e994829fb56ebfdfbaac34"
 dependencies = [
- "windows-sys 0.61.2",
+ "windows-sys 0.48.0",
 ]
 
 [[package]]
@@ -2224,7 +2224,7 @@ source = 
"registry+https://github.com/rust-lang/crates.io-index";
 checksum = "73736a89c4aff73035ba2ed2e565061954da00d4970fc9ac25dcc85a2a20d790"
 dependencies = [
  "dispatch2",
- "nix",
+ "nix 0.30.1",
  "windows-sys 0.61.2",
 ]
 
@@ -2665,7 +2665,7 @@ dependencies = [
  "libc",
  "option-ext",
  "redox_users",
- "windows-sys 0.61.2",
+ "windows-sys 0.59.0",
 ]
 
 [[package]]
@@ -2969,7 +2969,7 @@ source = 
"registry+https://github.com/rust-lang/crates.io-index";
 checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb"
 dependencies = [
  "libc",
- "windows-sys 0.61.2",
+ "windows-sys 0.59.0",
 ]
 
 [[package]]
@@ -4175,7 +4175,7 @@ dependencies = [
  "libc",
  "percent-encoding",
  "pin-project-lite",
- "socket2 0.6.2",
+ "socket2 0.5.10",
  "tokio",
  "tower-service",
  "tracing",
@@ -4407,7 +4407,7 @@ dependencies = [
 
 [[package]]
 name = "iggy"
-version = "0.8.1-edge.5"
+version = "0.8.1-edge.6"
 dependencies = [
  "async-broadcast",
  "async-dropper",
@@ -4421,7 +4421,6 @@ dependencies = [
  "iggy_binary_protocol",
  "iggy_common",
  "mockall",
- "num_cpus",
  "quinn",
  "reqwest",
  "reqwest-middleware",
@@ -4440,7 +4439,7 @@ dependencies = [
 
 [[package]]
 name = "iggy-bench"
-version = "0.3.1-edge.1"
+version = "0.3.1-edge.2"
 dependencies = [
  "async-trait",
  "bench-report",
@@ -4458,7 +4457,7 @@ dependencies = [
  "rand 0.9.2",
  "rayon",
  "serde",
- "sysinfo",
+ "sysinfo 0.38.0",
  "tokio",
  "tracing",
  "tracing-appender",
@@ -4515,7 +4514,7 @@ dependencies = [
 
 [[package]]
 name = "iggy-connectors"
-version = "0.2.1-edge.4"
+version = "0.2.1-edge.5"
 dependencies = [
  "async-trait",
  "axum",
@@ -4563,7 +4562,7 @@ dependencies = [
 
 [[package]]
 name = "iggy-mcp"
-version = "0.2.1-edge.4"
+version = "0.2.1-edge.5"
 dependencies = [
  "axum",
  "axum-server",
@@ -4594,7 +4593,7 @@ dependencies = [
 
 [[package]]
 name = "iggy_binary_protocol"
-version = "0.8.1-edge.2"
+version = "0.8.1-edge.3"
 dependencies = [
  "anyhow",
  "async-broadcast",
@@ -4615,7 +4614,7 @@ dependencies = [
 
 [[package]]
 name = "iggy_common"
-version = "0.8.1-edge.1"
+version = "0.8.1-edge.2"
 dependencies = [
  "aes-gcm",
  "ahash 0.8.12",
@@ -4639,7 +4638,7 @@ dependencies = [
  "figment",
  "human-repr",
  "humantime",
- "nix",
+ "nix 0.31.1",
  "once_cell",
  "rcgen",
  "ring",
@@ -4786,7 +4785,7 @@ dependencies = [
 
 [[package]]
 name = "iggy_connector_sdk"
-version = "0.1.1-edge.2"
+version = "0.1.1-edge.3"
 dependencies = [
  "async-trait",
  "base64 0.22.1",
@@ -5089,7 +5088,7 @@ dependencies = [
  "portable-atomic",
  "portable-atomic-util",
  "serde_core",
- "windows-sys 0.61.2",
+ "windows-sys 0.59.0",
 ]
 
 [[package]]
@@ -5782,6 +5781,18 @@ dependencies = [
  "libc",
 ]
 
+[[package]]
+name = "nix"
+version = "0.31.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "225e7cfe711e0ba79a68baeddb2982723e4235247aefce1482f2f16c27865b66"
+dependencies = [
+ "bitflags 2.10.0",
+ "cfg-if",
+ "cfg_aliases",
+ "libc",
+]
+
 [[package]]
 name = "nom"
 version = "7.1.3"
@@ -5893,7 +5904,7 @@ version = "0.50.3"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5"
 dependencies = [
- "windows-sys 0.61.2",
+ "windows-sys 0.59.0",
 ]
 
 [[package]]
@@ -6349,7 +6360,7 @@ source = 
"registry+https://github.com/rust-lang/crates.io-index";
 checksum = "7d8fae84b431384b68627d0f9b3b1245fcf9f46f6c0e3dc902e9dce64edd1967"
 dependencies = [
  "libc",
- "windows-sys 0.61.2",
+ "windows-sys 0.48.0",
 ]
 
 [[package]]
@@ -7070,7 +7081,7 @@ dependencies = [
  "quinn-udp",
  "rustc-hash",
  "rustls",
- "socket2 0.6.2",
+ "socket2 0.5.10",
  "thiserror 2.0.18",
  "tokio",
  "tracing",
@@ -7109,9 +7120,9 @@ dependencies = [
  "cfg_aliases",
  "libc",
  "once_cell",
- "socket2 0.6.2",
+ "socket2 0.5.10",
  "tracing",
- "windows-sys 0.60.2",
+ "windows-sys 0.59.0",
 ]
 
 [[package]]
@@ -7532,9 +7543,9 @@ dependencies = [
 
 [[package]]
 name = "rmcp"
-version = "0.13.0"
+version = "0.14.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "d1815dbc06c414d720f8bc1951eccd66bc99efc6376331f1e7093a119b3eb508"
+checksum = "0a621b37a548ff6ab6292d57841eb25785a7f146d89391a19c9f199414bd13da"
 dependencies = [
  "async-trait",
  "axum",
@@ -7565,9 +7576,9 @@ dependencies = [
 
 [[package]]
 name = "rmcp-macros"
-version = "0.13.0"
+version = "0.14.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "11f0bc7008fa102e771a76c6d2c9b253be3f2baa5964e060464d038ae1cbc573"
+checksum = "6b79ed92303f9262db79575aa8c3652581668e9d136be6fd0b9ededa78954c95"
 dependencies = [
  "darling 0.23.0",
  "proc-macro2",
@@ -7725,7 +7736,7 @@ dependencies = [
  "errno",
  "libc",
  "linux-raw-sys",
- "windows-sys 0.61.2",
+ "windows-sys 0.59.0",
 ]
 
 [[package]]
@@ -7793,7 +7804,7 @@ dependencies = [
  "security-framework",
  "security-framework-sys",
  "webpki-root-certs",
- "windows-sys 0.61.2",
+ "windows-sys 0.59.0",
 ]
 
 [[package]]
@@ -7979,7 +7990,7 @@ source = 
"registry+https://github.com/rust-lang/crates.io-index";
 checksum = "5b55fb86dfd3a2f5f76ea78310a88f96c4ea21a3031f8d212443d56123fd0521"
 dependencies = [
  "libc",
- "windows-sys 0.61.2",
+ "windows-sys 0.59.0",
 ]
 
 [[package]]
@@ -8206,7 +8217,7 @@ dependencies = [
 
 [[package]]
 name = "server"
-version = "0.6.1-edge.5"
+version = "0.6.1-edge.6"
 dependencies = [
  "ahash 0.8.12",
  "anyhow",
@@ -8247,7 +8258,7 @@ dependencies = [
  "mimalloc",
  "mime_guess",
  "moka",
- "nix",
+ "nix 0.31.1",
  "opentelemetry",
  "opentelemetry-appender-tracing",
  "opentelemetry-otlp",
@@ -8269,7 +8280,7 @@ dependencies = [
  "socket2 0.6.2",
  "static-toml",
  "strum",
- "sysinfo",
+ "sysinfo 0.38.0",
  "tempfile",
  "thiserror 2.0.18",
  "tokio",
@@ -8887,7 +8898,21 @@ dependencies = [
  "ntapi",
  "objc2-core-foundation",
  "objc2-io-kit",
- "windows",
+ "windows 0.61.3",
+]
+
+[[package]]
+name = "sysinfo"
+version = "0.38.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "fe840c5b1afe259a5657392a4dbb74473a14c8db999c3ec2f4ae812e028a94da"
+dependencies = [
+ "libc",
+ "memchr",
+ "ntapi",
+ "objc2-core-foundation",
+ "objc2-io-kit",
+ "windows 0.62.2",
 ]
 
 [[package]]
@@ -8923,7 +8948,7 @@ dependencies = [
  "getrandom 0.3.4",
  "once_cell",
  "rustix",
- "windows-sys 0.61.2",
+ "windows-sys 0.59.0",
 ]
 
 [[package]]
@@ -9884,7 +9909,7 @@ dependencies = [
  "regex",
  "rustc_version",
  "rustversion",
- "sysinfo",
+ "sysinfo 0.37.2",
  "time",
  "vergen-lib",
 ]
@@ -10148,7 +10173,7 @@ version = "0.1.11"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22"
 dependencies = [
- "windows-sys 0.61.2",
+ "windows-sys 0.48.0",
 ]
 
 [[package]]
@@ -10163,11 +10188,23 @@ version = "0.61.3"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "9babd3a767a4c1aef6900409f85f5d53ce2544ccdfaa86dad48c91782c6d6893"
 dependencies = [
- "windows-collections",
+ "windows-collections 0.2.0",
  "windows-core 0.61.2",
- "windows-future",
+ "windows-future 0.2.1",
  "windows-link 0.1.3",
- "windows-numerics",
+ "windows-numerics 0.2.0",
+]
+
+[[package]]
+name = "windows"
+version = "0.62.2"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "527fadee13e0c05939a6a05d5bd6eec6cd2e3dbd648b9f8e447c6518133d8580"
+dependencies = [
+ "windows-collections 0.3.2",
+ "windows-core 0.62.2",
+ "windows-future 0.3.2",
+ "windows-numerics 0.3.1",
 ]
 
 [[package]]
@@ -10179,6 +10216,15 @@ dependencies = [
  "windows-core 0.61.2",
 ]
 
+[[package]]
+name = "windows-collections"
+version = "0.3.2"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "23b2d95af1a8a14a3c7367e1ed4fc9c20e0a26e79551b1454d72583c97cc6610"
+dependencies = [
+ "windows-core 0.62.2",
+]
+
 [[package]]
 name = "windows-core"
 version = "0.61.2"
@@ -10213,7 +10259,18 @@ checksum = 
"fc6a41e98427b19fe4b73c550f060b59fa592d7d686537eebf9385621bfbad8e"
 dependencies = [
  "windows-core 0.61.2",
  "windows-link 0.1.3",
- "windows-threading",
+ "windows-threading 0.1.0",
+]
+
+[[package]]
+name = "windows-future"
+version = "0.3.2"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "e1d6f90251fe18a279739e78025bd6ddc52a7e22f921070ccdc67dde84c605cb"
+dependencies = [
+ "windows-core 0.62.2",
+ "windows-link 0.2.1",
+ "windows-threading 0.2.1",
 ]
 
 [[package]]
@@ -10260,6 +10317,16 @@ dependencies = [
  "windows-link 0.1.3",
 ]
 
+[[package]]
+name = "windows-numerics"
+version = "0.3.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "6e2e40844ac143cdb44aead537bbf727de9b044e107a0f1220392177d15b0f26"
+dependencies = [
+ "windows-core 0.62.2",
+ "windows-link 0.2.1",
+]
+
 [[package]]
 name = "windows-result"
 version = "0.3.4"
@@ -10422,6 +10489,15 @@ dependencies = [
  "windows-link 0.1.3",
 ]
 
+[[package]]
+name = "windows-threading"
+version = "0.2.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "3949bd5b99cafdf1c7ca86b43ca564028dfe27d66958f2470940f73d86d75b37"
+dependencies = [
+ "windows-link 0.2.1",
+]
+
 [[package]]
 name = "windows_aarch64_gnullvm"
 version = "0.42.2"
diff --git a/Cargo.toml b/Cargo.toml
index d845a659a..129bc2203 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -58,9 +58,9 @@ aes-gcm = "0.10.3"
 ahash = { version = "0.8.12", features = ["serde"] }
 anyhow = "1.0.100"
 argon2 = "0.5.3"
-arrow = "57.0.0"
-arrow-array = "57.0.0"
-arrow-json = "57.0.0"
+arrow = "57.2.0"
+arrow-array = "57.2.0"
+arrow-json = "57.2.0"
 async-broadcast = "0.7.2"
 async-channel = "2.5.0"
 async-dropper = { version = "0.3.1", features = ["tokio", "simple"] }
@@ -135,10 +135,10 @@ humantime = "2.3.0"
 hwlocality = "1.0.0-alpha.11"
 iceberg = "0.8.0"
 iceberg-catalog-rest = "0.8.0"
-iggy = { path = "core/sdk", version = "0.8.1-edge.1" }
-iggy_binary_protocol = { path = "core/binary_protocol", version = 
"0.8.1-edge.1" }
-iggy_common = { path = "core/common", version = "0.8.1-edge.1" }
-iggy_connector_sdk = { path = "core/connectors/sdk", version = "0.1.1-edge.1" }
+iggy = { path = "core/sdk", version = "0.8.1-edge.6" }
+iggy_binary_protocol = { path = "core/binary_protocol", version = 
"0.8.1-edge.3" }
+iggy_common = { path = "core/common", version = "0.8.1-edge.2" }
+iggy_connector_sdk = { path = "core/connectors/sdk", version = "0.1.1-edge.3" }
 integration = { path = "core/integration" }
 keyring = { version = "3.6.3", features = ["sync-secret-service", "vendored"] }
 lazy_static = "1.5.0"
@@ -147,7 +147,7 @@ log = "0.4.29"
 metadata = { path = "core/metadata" }
 mimalloc = "0.1"
 mockall = "0.14.0"
-nix = { version = "0.30.1", features = ["fs", "resource", "sched"] }
+nix = { version = "0.31.1", features = ["fs", "resource", "sched"] }
 nonzero_lit = "0.1.2"
 once_cell = "1.21.3"
 opentelemetry = { version = "0.31.0", features = ["trace", "logs"] }
@@ -168,7 +168,7 @@ opentelemetry_sdk = { version = "0.31.0", features = [
     "experimental_logs_batch_log_processor_with_async_runtime",
     "experimental_trace_batch_span_processor_with_async_runtime",
 ] }
-parquet = "57.0.0"
+parquet = "57.2.0"
 
 passterm = "=2.0.1"
 postcard = { version = "1.1.3", features = ["alloc"] }
@@ -197,13 +197,13 @@ serde_yaml_ng = "0.10.0"
 serial_test = "3.3.1"
 server = { path = "core/server" }
 simd-json = { version = "0.17.0", features = ["serde_impl"] }
-slab = "0.4.1"
+slab = "0.4.11"
 strum = { version = "0.27.2", features = ["derive"] }
 strum_macros = "0.27.2"
-sysinfo = "0.37.2"
+sysinfo = "0.38.0"
 tempfile = "3.24.0"
 test-case = "3.3.1"
-thiserror = "2.0.17"
+thiserror = "2.0.18"
 tokio = { version = "1.49.0", features = ["full"] }
 tokio-rustls = "0.26.4"
 tokio-tungstenite = { version = "0.28", features = ["rustls-tls-webpki-roots"] 
}
@@ -225,7 +225,7 @@ tracing-subscriber = { version = "0.3.22", default-features 
= false, features =
 trait-variant = "0.1.2"
 tungstenite = "0.28.0"
 twox-hash = { version = "2.1.2", features = ["xxhash32"] }
-uuid = { version = "1.19.0", features = [
+uuid = { version = "1.20.0", features = [
     "v4",
     "v7",
     "fast-rng",
@@ -233,7 +233,7 @@ uuid = { version = "1.19.0", features = [
     "zerocopy",
 ] }
 webpki-roots = "1.0.5"
-zip = { version = "7.1.0", default-features = false, features = ["deflate"] }
+zip = { version = "7.2.0", default-features = false, features = ["deflate"] }
 
 [profile.release]
 lto = true
diff --git a/DEPENDENCIES.md b/DEPENDENCIES.md
index 7d4f8133a..1320800e7 100644
--- a/DEPENDENCIES.md
+++ b/DEPENDENCIES.md
@@ -383,14 +383,14 @@ icu_provider: 2.1.1, "Unicode-3.0",
 ident_case: 1.0.1, "Apache-2.0 OR MIT",
 idna: 1.1.0, "Apache-2.0 OR MIT",
 idna_adapter: 1.2.1, "Apache-2.0 OR MIT",
-iggy: 0.8.1-edge.5, "Apache-2.0",
-iggy-bench: 0.3.1-edge.1, "Apache-2.0",
+iggy: 0.8.1-edge.6, "Apache-2.0",
+iggy-bench: 0.3.1-edge.2, "Apache-2.0",
 iggy-bench-dashboard-server: 0.5.1-edge.1, "Apache-2.0",
 iggy-cli: 0.10.1-edge.1, "Apache-2.0",
-iggy-connectors: 0.2.1-edge.4, "Apache-2.0",
-iggy-mcp: 0.2.1-edge.4, "Apache-2.0",
-iggy_binary_protocol: 0.8.1-edge.2, "Apache-2.0",
-iggy_common: 0.8.1-edge.1, "Apache-2.0",
+iggy-connectors: 0.2.1-edge.5, "Apache-2.0",
+iggy-mcp: 0.2.1-edge.5, "Apache-2.0",
+iggy_binary_protocol: 0.8.1-edge.3, "Apache-2.0",
+iggy_common: 0.8.1-edge.2, "Apache-2.0",
 iggy_connector_elasticsearch_sink: 0.2.0-edge.1, "Apache-2.0",
 iggy_connector_elasticsearch_source: 0.2.0-edge.1, "Apache-2.0",
 iggy_connector_iceberg_sink: 0.2.0-edge.1, "Apache-2.0",
@@ -398,7 +398,7 @@ iggy_connector_postgres_sink: 0.2.0-edge.1, "Apache-2.0",
 iggy_connector_postgres_source: 0.2.0-edge.1, "Apache-2.0",
 iggy_connector_quickwit_sink: 0.2.0-edge.1, "Apache-2.0",
 iggy_connector_random_source: 0.2.0-edge.1, "Apache-2.0",
-iggy_connector_sdk: 0.1.1-edge.2, "Apache-2.0",
+iggy_connector_sdk: 0.1.1-edge.3, "Apache-2.0",
 iggy_connector_stdout_sink: 0.2.0-edge.1, "Apache-2.0",
 iggy_examples: 0.0.5, "Apache-2.0",
 ignore: 0.4.25, "MIT OR Unlicense",
@@ -496,6 +496,7 @@ moka: 0.12.13, "(Apache-2.0 OR MIT) AND Apache-2.0",
 murmur3: 0.5.2, "Apache-2.0 OR MIT",
 never-say-never: 6.6.666, "Apache-2.0 OR MIT OR Zlib",
 nix: 0.30.1, "MIT",
+nix: 0.31.1, "MIT",
 nom: 7.1.3, "MIT",
 nom: 8.0.0, "MIT",
 nom_locate: 5.0.0, "MIT",
@@ -654,8 +655,8 @@ ring: 0.17.14, "Apache-2.0 AND ISC",
 ringbuffer: 0.16.0, "MIT",
 rkyv: 0.7.46, "MIT",
 rkyv_derive: 0.7.46, "MIT",
-rmcp: 0.13.0, "Apache-2.0",
-rmcp-macros: 0.13.0, "Apache-2.0",
+rmcp: 0.14.0, "Apache-2.0",
+rmcp-macros: 0.14.0, "Apache-2.0",
 rmp: 0.8.15, "MIT",
 rmp-serde: 1.3.1, "MIT",
 roaring: 0.11.3, "Apache-2.0 OR MIT",
@@ -715,7 +716,7 @@ serde_with_macros: 3.16.1, "Apache-2.0 OR MIT",
 serde_yaml_ng: 0.10.0, "MIT",
 serial_test: 3.3.1, "MIT",
 serial_test_derive: 3.3.1, "MIT",
-server: 0.6.1-edge.5, "Apache-2.0",
+server: 0.6.1-edge.6, "Apache-2.0",
 sha1: 0.10.6, "Apache-2.0 OR MIT",
 sha2: 0.10.9, "Apache-2.0 OR MIT",
 sha3: 0.10.8, "Apache-2.0 OR MIT",
@@ -765,6 +766,7 @@ synthez: 0.4.0, "BlueOak-1.0.0",
 synthez-codegen: 0.4.0, "BlueOak-1.0.0",
 synthez-core: 0.4.0, "BlueOak-1.0.0",
 sysinfo: 0.37.2, "MIT",
+sysinfo: 0.38.0, "MIT",
 tagptr: 0.2.0, "Apache-2.0 OR MIT",
 tap: 1.0.1, "MIT",
 tar: 0.4.44, "Apache-2.0 OR MIT",
@@ -890,15 +892,19 @@ winapi-i686-pc-windows-gnu: 0.4.0, "Apache-2.0 OR MIT",
 winapi-util: 0.1.11, "MIT OR Unlicense",
 winapi-x86_64-pc-windows-gnu: 0.4.0, "Apache-2.0 OR MIT",
 windows: 0.61.3, "Apache-2.0 OR MIT",
+windows: 0.62.2, "Apache-2.0 OR MIT",
 windows-collections: 0.2.0, "Apache-2.0 OR MIT",
+windows-collections: 0.3.2, "Apache-2.0 OR MIT",
 windows-core: 0.61.2, "Apache-2.0 OR MIT",
 windows-core: 0.62.2, "Apache-2.0 OR MIT",
 windows-future: 0.2.1, "Apache-2.0 OR MIT",
+windows-future: 0.3.2, "Apache-2.0 OR MIT",
 windows-implement: 0.60.2, "Apache-2.0 OR MIT",
 windows-interface: 0.59.3, "Apache-2.0 OR MIT",
 windows-link: 0.1.3, "Apache-2.0 OR MIT",
 windows-link: 0.2.1, "Apache-2.0 OR MIT",
 windows-numerics: 0.2.0, "Apache-2.0 OR MIT",
+windows-numerics: 0.3.1, "Apache-2.0 OR MIT",
 windows-result: 0.3.4, "Apache-2.0 OR MIT",
 windows-result: 0.4.1, "Apache-2.0 OR MIT",
 windows-strings: 0.4.2, "Apache-2.0 OR MIT",
@@ -914,6 +920,7 @@ windows-targets: 0.48.5, "Apache-2.0 OR MIT",
 windows-targets: 0.52.6, "Apache-2.0 OR MIT",
 windows-targets: 0.53.5, "Apache-2.0 OR MIT",
 windows-threading: 0.1.0, "Apache-2.0 OR MIT",
+windows-threading: 0.2.1, "Apache-2.0 OR MIT",
 windows_aarch64_gnullvm: 0.42.2, "Apache-2.0 OR MIT",
 windows_aarch64_gnullvm: 0.48.5, "Apache-2.0 OR MIT",
 windows_aarch64_gnullvm: 0.52.6, "Apache-2.0 OR MIT",
diff --git a/core/ai/mcp/Cargo.toml b/core/ai/mcp/Cargo.toml
index f8f8c138e..d9c8b9ad9 100644
--- a/core/ai/mcp/Cargo.toml
+++ b/core/ai/mcp/Cargo.toml
@@ -17,7 +17,7 @@
 
 [package]
 name = "iggy-mcp"
-version = "0.2.1-edge.4"
+version = "0.2.1-edge.5"
 description = "MCP Server for Iggy message streaming platform"
 edition = "2024"
 license = "Apache-2.0"
@@ -41,7 +41,7 @@ opentelemetry-otlp = { workspace = true }
 opentelemetry-semantic-conventions = { workspace = true }
 opentelemetry_sdk = { workspace = true, features = ["rt-tokio"] }
 reqwest = { workspace = true }
-rmcp = { version = "0.13.0", features = [
+rmcp = { version = "0.14.0", features = [
     "server",
     "transport-io",
     "transport-streamable-http-server",
diff --git a/core/bench/Cargo.toml b/core/bench/Cargo.toml
index 42ed50e44..bbfbae035 100644
--- a/core/bench/Cargo.toml
+++ b/core/bench/Cargo.toml
@@ -17,7 +17,7 @@
 
 [package]
 name = "iggy-bench"
-version = "0.3.1-edge.1"
+version = "0.3.1-edge.2"
 edition = "2024"
 license = "Apache-2.0"
 repository = "https://github.com/apache/iggy";
diff --git a/core/bench/dashboard/server/Cargo.toml 
b/core/bench/dashboard/server/Cargo.toml
index 6303a663a..0ae74810b 100644
--- a/core/bench/dashboard/server/Cargo.toml
+++ b/core/bench/dashboard/server/Cargo.toml
@@ -30,7 +30,7 @@ bench-report = { workspace = true }
 chrono = { workspace = true, features = ["serde"] }
 clap = { workspace = true }
 dashmap = { workspace = true }
-file-operation = "0.8.11"
+file-operation = "0.8.17"
 notify = "8.2.0"
 octocrab = "0.49.5"
 serde = { workspace = true, features = ["derive"] }
diff --git a/core/binary_protocol/Cargo.toml b/core/binary_protocol/Cargo.toml
index bb10cb841..c015b5257 100644
--- a/core/binary_protocol/Cargo.toml
+++ b/core/binary_protocol/Cargo.toml
@@ -17,7 +17,7 @@
 
 [package]
 name = "iggy_binary_protocol"
-version = "0.8.1-edge.2"
+version = "0.8.1-edge.3"
 description = "Iggy is the persistent message streaming platform written in 
Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing 
millions of messages per second."
 edition = "2024"
 license = "Apache-2.0"
diff --git a/core/common/Cargo.toml b/core/common/Cargo.toml
index a529c1ab3..52ea4c5ea 100644
--- a/core/common/Cargo.toml
+++ b/core/common/Cargo.toml
@@ -16,7 +16,7 @@
 # under the License.
 [package]
 name = "iggy_common"
-version = "0.8.1-edge.1"
+version = "0.8.1-edge.2"
 description = "Iggy is the persistent message streaming platform written in 
Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing 
millions of messages per second."
 edition = "2024"
 license = "Apache-2.0"
@@ -57,7 +57,7 @@ human-repr = { workspace = true }
 humantime = { workspace = true }
 nix = { workspace = true }
 once_cell = { workspace = true }
-rcgen = "0.14.6"
+rcgen = "0.14.7"
 ring = "0.17.14"
 rustls = { workspace = true }
 serde = { workspace = true }
diff --git a/core/connectors/runtime/Cargo.toml 
b/core/connectors/runtime/Cargo.toml
index 0ec17f5b3..dea88451a 100644
--- a/core/connectors/runtime/Cargo.toml
+++ b/core/connectors/runtime/Cargo.toml
@@ -17,7 +17,7 @@
 
 [package]
 name = "iggy-connectors"
-version = "0.2.1-edge.4"
+version = "0.2.1-edge.5"
 description = "Connectors runtime for Iggy message streaming platform"
 edition = "2024"
 license = "Apache-2.0"
diff --git a/core/connectors/runtime/src/sink.rs 
b/core/connectors/runtime/src/sink.rs
index 3f55d4d8a..40b41d886 100644
--- a/core/connectors/runtime/src/sink.rs
+++ b/core/connectors/runtime/src/sink.rs
@@ -181,16 +181,14 @@ pub async fn init(
 pub fn consume(sinks: Vec<SinkConnectorWrapper>, context: Arc<RuntimeContext>) 
{
     for sink in sinks {
         for plugin in sink.plugins {
-            if plugin.error.is_none() {
-                info!("Starting consume for sink with ID: {}...", plugin.id);
-            } else {
+            if let Some(error) = &plugin.error {
                 error!(
-                    "Failed to initialize sink connector with ID: {}: {}. 
Skipping...",
+                    "Failed to initialize sink connector with ID: {}: {error}. 
Skipping...",
                     plugin.id,
-                    plugin.error.as_ref().expect("Error should be present")
                 );
                 continue;
             }
+            info!("Starting consume for sink with ID: {}...", plugin.id);
             for consumer in plugin.consumers {
                 let plugin_key = plugin.key.clone();
                 let context = context.clone();
diff --git a/core/connectors/runtime/src/source.rs 
b/core/connectors/runtime/src/source.rs
index d366a5de5..fba517989 100644
--- a/core/connectors/runtime/src/source.rs
+++ b/core/connectors/runtime/src/source.rs
@@ -226,15 +226,13 @@ pub fn handle(sources: Vec<SourceConnectorWrapper>, 
context: Arc<RuntimeContext>
             let plugin_key = plugin.key.clone();
             let context = context.clone();
 
-            if plugin.error.is_none() {
-                info!("Starting handler for source connector with ID: 
{plugin_id}...");
-            } else {
+            if let Some(error) = &plugin.error {
                 error!(
-                    "Failed to initialize source connector with ID: 
{plugin_id}: {}. Skipping...",
-                    plugin.error.as_ref().expect("Error should be present")
+                    "Failed to initialize source connector with ID: 
{plugin_id}: {error}. Skipping...",
                 );
                 continue;
             }
+            info!("Starting handler for source connector with ID: 
{plugin_id}...");
 
             let handle = source.callback;
             tokio::task::spawn_blocking(move || {
diff --git a/core/connectors/sdk/Cargo.toml b/core/connectors/sdk/Cargo.toml
index 72bafef01..530c5ba56 100644
--- a/core/connectors/sdk/Cargo.toml
+++ b/core/connectors/sdk/Cargo.toml
@@ -17,7 +17,7 @@
 
 [package]
 name = "iggy_connector_sdk"
-version = "0.1.1-edge.2"
+version = "0.1.1-edge.3"
 description = "Iggy is the persistent message streaming platform written in 
Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing 
millions of messages per second."
 edition = "2024"
 license = "Apache-2.0"
diff --git a/core/integration/Cargo.toml b/core/integration/Cargo.toml
index 6dec491f5..87b95b6f1 100644
--- a/core/integration/Cargo.toml
+++ b/core/integration/Cargo.toml
@@ -47,9 +47,9 @@ libc = "0.2.180"
 log = { workspace = true }
 predicates = { workspace = true }
 rand = { workspace = true }
-rcgen = "0.14.6"
+rcgen = "0.14.7"
 reqwest = { workspace = true }
-rmcp = { version = "0.13.0", features = [
+rmcp = { version = "0.14.0", features = [
     "client",
     "reqwest",
     "transport-streamable-http-client",
diff --git a/core/integration/src/test_mcp_server.rs 
b/core/integration/src/test_mcp_server.rs
index 9d729eaa8..f4179a698 100644
--- a/core/integration/src/test_mcp_server.rs
+++ b/core/integration/src/test_mcp_server.rs
@@ -182,6 +182,7 @@ impl TestMcpServer {
         let client_info = ClientInfo {
             protocol_version: Default::default(),
             capabilities: ClientCapabilities::default(),
+            meta: None,
             client_info: Implementation {
                 name: "test-mcp-client".to_string(),
                 version: "1.0.0".to_string(),
diff --git a/core/integration/tests/mcp/mod.rs 
b/core/integration/tests/mcp/mod.rs
index 3fab31930..0fadd7ea3 100644
--- a/core/integration/tests/mcp/mod.rs
+++ b/core/integration/tests/mcp/mod.rs
@@ -35,7 +35,7 @@ use integration::{
 use lazy_static::lazy_static;
 use rmcp::{
     ServiceError,
-    model::{CallToolRequestParam, CallToolResult, ListToolsResult},
+    model::{CallToolRequestParams, CallToolResult, ListToolsResult},
     serde::de::DeserializeOwned,
     serde_json::{self, json},
 };
@@ -698,10 +698,11 @@ impl TestMcpClient {
         data: Option<serde_json::Value>,
     ) -> Result<CallToolResult, ServiceError> {
         self.mcp_client
-            .call_tool(CallToolRequestParam {
+            .call_tool(CallToolRequestParams {
                 name: method.to_owned().into(),
                 arguments: data.and_then(|value| value.as_object().cloned()),
                 task: None,
+                meta: None,
             })
             .await
     }
diff --git a/core/integration/tests/sdk/producer/background.rs 
b/core/integration/tests/sdk/producer/background.rs
index d8d7818ae..8acb05de2 100644
--- a/core/integration/tests/sdk/producer/background.rs
+++ b/core/integration/tests/sdk/producer/background.rs
@@ -493,3 +493,146 @@ async fn background_linger_time_respected_after_idle() {
     producer.shutdown().await;
     cleanup(&client).await;
 }
+
+#[tokio::test]
+#[parallel]
+async fn background_preserves_message_ordering() {
+    let mut test_server = TestServer::default();
+    test_server.start();
+
+    let tcp_client_config = TcpClientConfig {
+        server_address: test_server.get_raw_tcp_addr().unwrap(),
+        ..TcpClientConfig::default()
+    };
+    let client = 
ClientWrapper::Tcp(TcpClient::create(Arc::new(tcp_client_config)).unwrap());
+    let client = IggyClient::create(client, None, None);
+
+    client.connect().await.unwrap();
+    login_root(&client).await;
+    init_system(&client).await;
+
+    let messages_count = 1000u32;
+
+    let cfg = BackgroundConfig::builder()
+        .linger_time(IggyDuration::from(50_000))
+        .build();
+
+    let producer = client
+        .producer(STREAM_NAME, TOPIC_NAME)
+        .unwrap()
+        .partitioning(Partitioning::partition_id(PARTITION_ID))
+        .background(cfg)
+        .build();
+
+    for i in 0..messages_count {
+        let payload = Bytes::from(i.to_le_bytes().to_vec());
+        let msg = IggyMessage::builder()
+            .id(i as u128 + 1)
+            .payload(payload)
+            .build()
+            .unwrap();
+        producer.send(vec![msg]).await.unwrap();
+    }
+
+    producer.shutdown().await;
+
+    let consumer = Consumer::default();
+    let polled_messages = client
+        .poll_messages(
+            &Identifier::named(STREAM_NAME).unwrap(),
+            &Identifier::named(TOPIC_NAME).unwrap(),
+            Some(PARTITION_ID),
+            &consumer,
+            &PollingStrategy::offset(0),
+            messages_count,
+            false,
+        )
+        .await
+        .unwrap();
+
+    assert_eq!(polled_messages.messages.len() as u32, messages_count);
+
+    for (idx, msg) in polled_messages.messages.iter().enumerate() {
+        let expected = idx as u32;
+        let actual = u32::from_le_bytes(msg.payload[..4].try_into().unwrap());
+        assert_eq!(
+            actual, expected,
+            "Message at position {} has wrong order: expected {}, got {}",
+            idx, expected, actual
+        );
+    }
+
+    cleanup(&client).await;
+}
+
+#[tokio::test]
+#[parallel]
+async fn background_preserves_ordering_with_multiple_shards() {
+    let mut test_server = TestServer::default();
+    test_server.start();
+
+    let tcp_client_config = TcpClientConfig {
+        server_address: test_server.get_raw_tcp_addr().unwrap(),
+        ..TcpClientConfig::default()
+    };
+    let client = 
ClientWrapper::Tcp(TcpClient::create(Arc::new(tcp_client_config)).unwrap());
+    let client = IggyClient::create(client, None, None);
+
+    client.connect().await.unwrap();
+    login_root(&client).await;
+    init_system(&client).await;
+
+    let messages_count = 1000u32;
+
+    let cfg = BackgroundConfig::builder()
+        .num_shards(4)
+        .linger_time(IggyDuration::from(50_000))
+        .build();
+
+    let producer = client
+        .producer(STREAM_NAME, TOPIC_NAME)
+        .unwrap()
+        .partitioning(Partitioning::partition_id(PARTITION_ID))
+        .background(cfg)
+        .build();
+
+    for i in 0..messages_count {
+        let payload = Bytes::from(i.to_le_bytes().to_vec());
+        let msg = IggyMessage::builder()
+            .id(i as u128 + 1)
+            .payload(payload)
+            .build()
+            .unwrap();
+        producer.send(vec![msg]).await.unwrap();
+    }
+
+    producer.shutdown().await;
+
+    let consumer = Consumer::default();
+    let polled_messages = client
+        .poll_messages(
+            &Identifier::named(STREAM_NAME).unwrap(),
+            &Identifier::named(TOPIC_NAME).unwrap(),
+            Some(PARTITION_ID),
+            &consumer,
+            &PollingStrategy::offset(0),
+            messages_count,
+            false,
+        )
+        .await
+        .unwrap();
+
+    assert_eq!(polled_messages.messages.len() as u32, messages_count);
+
+    for (idx, msg) in polled_messages.messages.iter().enumerate() {
+        let expected = idx as u32;
+        let actual = u32::from_le_bytes(msg.payload[..4].try_into().unwrap());
+        assert_eq!(
+            actual, expected,
+            "Message at position {} has wrong order: expected {}, got {}",
+            idx, expected, actual
+        );
+    }
+
+    cleanup(&client).await;
+}
diff --git a/core/sdk/Cargo.toml b/core/sdk/Cargo.toml
index c83cfc06b..6d33647bb 100644
--- a/core/sdk/Cargo.toml
+++ b/core/sdk/Cargo.toml
@@ -17,7 +17,7 @@
 
 [package]
 name = "iggy"
-version = "0.8.1-edge.5"
+version = "0.8.1-edge.6"
 description = "Iggy is the persistent message streaming platform written in 
Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing 
millions of messages per second."
 edition = "2024"
 license = "Apache-2.0"
@@ -48,7 +48,6 @@ futures = { workspace = true }
 futures-util = { workspace = true }
 iggy_binary_protocol = { workspace = true }
 iggy_common = { workspace = true }
-num_cpus = "1.17.0"
 quinn = { workspace = true }
 reqwest = { workspace = true }
 reqwest-middleware = { workspace = true }
diff --git a/core/sdk/src/clients/consumer.rs b/core/sdk/src/clients/consumer.rs
index 3d7784938..81be663a7 100644
--- a/core/sdk/src/clients/consumer.rs
+++ b/core/sdk/src/clients/consumer.rs
@@ -475,10 +475,6 @@ impl IggyConsumer {
         tokio::spawn(async move {
             loop {
                 sleep(interval.get_duration()).await;
-                if shutdown.load(ORDERING) {
-                    trace!("Shutdown signal received, stopping background 
offset storage");
-                    break;
-                }
                 for entry in last_consumed_offsets.iter() {
                     let partition_id = *entry.key();
                     let consumed_offset = entry.load(ORDERING);
@@ -494,6 +490,10 @@ impl IggyConsumer {
                     )
                     .await;
                 }
+                if shutdown.load(ORDERING) {
+                    trace!("Shutdown signal received, stopping background 
offset storage");
+                    break;
+                }
             }
         });
     }
@@ -1056,6 +1056,69 @@ impl Stream for IggyConsumer {
     }
 }
 
+impl IggyConsumer {
+    pub async fn shutdown(&mut self) -> Result<(), IggyError> {
+        if self.shutdown.swap(true, ORDERING) {
+            return Ok(());
+        }
+
+        info!("Shutting down consumer: {}...", self.consumer_name);
+
+        for entry in self.last_consumed_offsets.iter() {
+            let partition_id = *entry.key();
+            let consumed_offset = entry.load(ORDERING);
+
+            let stored_offset = self
+                .last_stored_offsets
+                .get(&partition_id)
+                .map(|e| e.load(ORDERING))
+                .unwrap_or(0);
+
+            if consumed_offset > stored_offset {
+                trace!(
+                    "Flushing final offset: {consumed_offset} for partition: 
{partition_id}, stream: {}, topic: {}",
+                    self.stream_id, self.topic_id
+                );
+                let _ = Self::store_consumer_offset(
+                    &self.client,
+                    &self.consumer,
+                    &self.stream_id,
+                    &self.topic_id,
+                    partition_id,
+                    consumed_offset,
+                    &self.last_stored_offsets,
+                    self.allow_replay,
+                )
+                .await;
+            }
+        }
+
+        if self.is_consumer_group && self.joined_consumer_group.load(ORDERING) 
{
+            let group_id = self.consumer.id.clone();
+            trace!(
+                "Leaving consumer group: {group_id} for stream: {}, topic: {}",
+                self.stream_id, self.topic_id
+            );
+
+            let client = self.client.read().await;
+            if let Err(error) = client
+                .leave_consumer_group(&self.stream_id, &self.topic_id, 
&group_id)
+                .await
+            {
+                warn!(
+                    "Failed to leave consumer group: {group_id} for stream: 
{}, topic: {}. {error}",
+                    self.stream_id, self.topic_id
+                );
+            } else {
+                self.joined_consumer_group.store(false, ORDERING);
+            }
+        }
+
+        info!("Consumer: {} has been shut down.", self.consumer_name);
+        Ok(())
+    }
+}
+
 impl Drop for IggyConsumer {
     fn drop(&mut self) {
         self.shutdown.store(true, ORDERING);
diff --git a/core/sdk/src/clients/producer.rs b/core/sdk/src/clients/producer.rs
index 0a6ec018e..67a5bf8d4 100644
--- a/core/sdk/src/clients/producer.rs
+++ b/core/sdk/src/clients/producer.rs
@@ -185,6 +185,7 @@ impl ProducerCore {
         messages: &mut [IggyMessage],
     ) -> Result<(), IggyError> {
         let client = self.client.read().await;
+
         let Some(max_retries) = self.send_retries_count else {
             return client
                 .send_messages(stream, topic, partitioning, messages)
@@ -197,25 +198,10 @@ impl ProducerCore {
                 .await;
         }
 
-        let mut timer = if let Some(interval) = self.send_retries_interval {
-            let mut timer = tokio::time::interval(interval.get_duration());
-            timer.tick().await;
-            Some(timer)
-        } else {
-            None
-        };
-
-        self.wait_until_connected(max_retries, stream, topic, &mut timer)
+        self.wait_until_connected(max_retries, stream, topic)
             .await?;
-        self.send_with_retries(
-            max_retries,
-            stream,
-            topic,
-            partitioning,
-            messages,
-            &mut timer,
-        )
-        .await
+        self.send_with_retries(&client, max_retries, stream, topic, 
partitioning, messages)
+            .await
     }
 
     async fn wait_until_connected(
@@ -223,9 +209,10 @@ impl ProducerCore {
         max_retries: u32,
         stream: &Identifier,
         topic: &Identifier,
-        timer: &mut Option<Interval>,
     ) -> Result<(), IggyError> {
         let mut retries = 0;
+        let mut timer: Option<Interval> = None;
+
         while !self.can_send.load(ORDERING) {
             retries += 1;
             if retries > max_retries {
@@ -241,7 +228,9 @@ impl ProducerCore {
                  but the client is disconnected. Retrying 
{retries}/{max_retries}..."
             );
 
-            if let Some(timer) = timer.as_mut() {
+            if let Some(interval) = self.send_retries_interval {
+                let timer =
+                    timer.get_or_insert_with(|| 
tokio::time::interval(interval.get_duration()));
                 trace!(
                     "Waiting for the next retry to send messages to topic: 
{topic}, \
                      stream: {stream} for disconnected client..."
@@ -254,15 +243,16 @@ impl ProducerCore {
 
     async fn send_with_retries(
         &self,
+        client: &ClientWrapper,
         max_retries: u32,
         stream: &Identifier,
         topic: &Identifier,
         partitioning: &Arc<Partitioning>,
         messages: &mut [IggyMessage],
-        timer: &mut Option<Interval>,
     ) -> Result<(), IggyError> {
-        let client = self.client.read().await;
         let mut retries = 0;
+        let mut timer: Option<Interval> = None;
+
         loop {
             match client
                 .send_messages(stream, topic, partitioning, messages)
@@ -284,12 +274,14 @@ impl ProducerCore {
                          {error} Retrying {retries}/{max_retries}..."
                     );
 
-                    if let Some(t) = timer.as_mut() {
+                    if let Some(interval) = self.send_retries_interval {
+                        let timer = timer
+                            .get_or_insert_with(|| 
tokio::time::interval(interval.get_duration()));
                         trace!(
                             "Waiting for the next retry to send messages to 
topic: {topic}, \
                              stream: {stream}..."
                         );
-                        t.tick().await;
+                        timer.tick().await;
                     }
                 }
             }
@@ -570,8 +562,8 @@ impl IggyProducer {
     }
 
     pub async fn shutdown(self) {
-        if let Some(disp) = self.dispatcher {
-            disp.shutdown().await;
+        if let Some(dispatcher) = self.dispatcher {
+            dispatcher.shutdown().await;
         }
     }
 }
diff --git a/core/sdk/src/clients/producer_config.rs 
b/core/sdk/src/clients/producer_config.rs
index dd3f301de..26d4bed6f 100644
--- a/core/sdk/src/clients/producer_config.rs
+++ b/core/sdk/src/clients/producer_config.rs
@@ -17,7 +17,7 @@
  */
 use crate::clients::MIB;
 use crate::clients::producer_error_callback::{ErrorCallback, LogErrorCallback};
-use crate::clients::producer_sharding::{BalancedSharding, Sharding};
+use crate::clients::producer_sharding::{OrderedSharding, Sharding};
 use bon::Builder;
 use iggy_common::{IggyByteSize, IggyDuration};
 use std::sync::Arc;
@@ -67,10 +67,13 @@ pub enum BackpressureMode {
 pub struct BackgroundConfig {
     /// Number of shard-workers that run in parallel.
     ///
-    /// The default is `num_cpus::get().clamp(2, 16)`.  
-    /// More shards increase throughput by parallelising
-    /// serialisation, compression and network I/O, but consume more memory.
-    #[builder(default = default_shard_count())]
+    /// With the default `OrderedSharding` strategy, messages to the same
+    /// stream/topic are always routed to the same shard, preserving ordering.
+    /// Increasing shards improves throughput only when sending to multiple 
streams/topics.
+    ///
+    /// With `BalancedSharding`, messages are distributed round-robin across 
all shards
+    /// for maximum single-destination throughput, but ordering is **not** 
preserved.
+    #[builder(default = 1)]
     pub num_shards: usize,
     /// How long a shard may wait before flushing an *incomplete* batch.
     ///
@@ -84,26 +87,36 @@ pub struct BackgroundConfig {
     #[builder(default = Arc::new(Box::new(LogErrorCallback)))]
     pub error_callback: Arc<Box<dyn ErrorCallback + Send + Sync>>,
     /// Strategy that maps a message to a shard.
-    #[builder(default = Box::new(BalancedSharding::default()))]
+    ///
+    /// Default is `OrderedSharding` which routes all messages for the same
+    /// stream/topic to the same shard, preserving message ordering.
+    ///
+    /// Use `BalancedSharding` for maximum throughput when ordering doesn't 
matter.
+    #[builder(default = Box::new(OrderedSharding))]
     pub sharding: Box<dyn Sharding + Send + Sync>,
-    /// Maximum **total size in bytes** of a batch.  
+    /// Maximum **total size in bytes** of a batch.
     /// `0` ⇒ unlimited (size-based batching disabled).
     #[builder(default = MIB)]
     pub batch_size: usize,
-    /// Maximum **number of messages** per batch.  
+    /// Maximum **number of messages** per batch.
     /// `0` ⇒ unlimited (length-based batching disabled).
     #[builder(default = 1000)]
     pub batch_length: usize,
     /// Action to apply when back-pressure limits are reached
     #[builder(default = BackpressureMode::Block)]
     pub failure_mode: BackpressureMode,
-    /// Upper bound for the **bytes held in memory** across *all* shards.  
+    /// Upper bound for the **bytes held in memory** across *all* shards.
     /// `IggyByteSize::from(0)` ⇒ unlimited.
     #[builder(default = IggyByteSize::from(32 * MIB as u64))]
     pub max_buffer_size: IggyByteSize,
-    /// Maximum number of **in-flight requests** (batches being sent).  
-    /// `0` ⇒ unlimited.
-    #[builder(default = default_shard_count() * 2)]
+    /// Maximum number of **in-flight requests** (batches being sent).
+    ///
+    /// **WARNING**: Using more than 1 may cause message reordering if retries 
occur.
+    /// With max_in_flight > 1, a failed batch could be retried after later 
batches succeed.
+    ///
+    /// The default is `1` to preserve message ordering.
+    /// `0` ⇒ unlimited (no ordering guarantee).
+    #[builder(default = 1)]
     pub max_in_flight: usize,
 }
 
@@ -134,11 +147,6 @@ pub struct DirectConfig {
     #[builder(default = 1000)]
     pub batch_length: u32,
     /// How long to wait for more messages before flushing the current set.
-    #[builder(default = IggyDuration::from(1000))]
+    #[builder(default = IggyDuration::from(0))]
     pub linger_time: IggyDuration,
 }
-
-fn default_shard_count() -> usize {
-    let cpus = num_cpus::get();
-    cpus.clamp(2, 64)
-}
diff --git a/core/sdk/src/clients/producer_dispatcher.rs 
b/core/sdk/src/clients/producer_dispatcher.rs
index fa6dded4c..c8ab06423 100644
--- a/core/sdk/src/clients/producer_dispatcher.rs
+++ b/core/sdk/src/clients/producer_dispatcher.rs
@@ -38,7 +38,12 @@ pub struct ProducerDispatcher {
 
 impl ProducerDispatcher {
     pub fn new(core: Arc<impl ProducerCoreBackend>, config: BackgroundConfig) 
-> Self {
-        let mut shards = Vec::with_capacity(config.num_shards);
+        let num_shards = if config.num_shards == 0 {
+            1
+        } else {
+            config.num_shards
+        };
+        let mut shards = Vec::with_capacity(num_shards);
         let config = Arc::new(config);
 
         let (err_tx, err_rx) = flume::unbounded::<ErrorCtx>();
@@ -70,7 +75,7 @@ impl ProducerDispatcher {
             }
         });
 
-        for _ in 0..config.num_shards {
+        for _ in 0..num_shards {
             let stop_rx = stop_tx.subscribe();
             shards.push(Shard::new(
                 core.clone(),
diff --git a/core/sdk/src/clients/producer_sharding.rs 
b/core/sdk/src/clients/producer_sharding.rs
index 8bdca2213..545212984 100644
--- a/core/sdk/src/clients/producer_sharding.rs
+++ b/core/sdk/src/clients/producer_sharding.rs
@@ -19,6 +19,8 @@ use crate::clients::producer::ProducerCoreBackend;
 use crate::clients::producer_config::BackgroundConfig;
 use crate::clients::producer_error_callback::ErrorCtx;
 use iggy_common::{Identifier, IggyByteSize, IggyError, IggyMessage, 
Partitioning, Sizeable};
+use std::hash::DefaultHasher;
+use std::hash::{Hash, Hasher};
 use std::sync::Arc;
 use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
 use tokio::sync::{OwnedSemaphorePermit, broadcast};
@@ -42,13 +44,16 @@ pub trait Sharding: Send + Sync + std::fmt::Debug + 'static 
{
 
 /// A simple round-robin sharding strategy.
 /// Distributes messages evenly across all shards by incrementing an atomic 
counter.
+///
+/// **WARNING**: This strategy does NOT preserve message ordering across 
shards.
+/// Messages to the same stream/topic may be processed out of order.
+/// Use `OrderedSharding` if message ordering is required.
 #[derive(Default, Debug)]
 pub struct BalancedSharding {
     counter: AtomicUsize,
 }
 
 impl Sharding for BalancedSharding {
-    /// Picks the next shard in a round-robin fashion.
     fn pick_shard(
         &self,
         num_shards: usize,
@@ -60,6 +65,29 @@ impl Sharding for BalancedSharding {
     }
 }
 
+/// A sharding strategy that preserves message ordering by routing all messages
+/// for the same stream/topic combination to the same shard.
+///
+/// This ensures that messages sent to the same destination are processed
+/// in the order they were dispatched, even when using multiple shards.
+#[derive(Default, Debug)]
+pub struct OrderedSharding;
+
+impl Sharding for OrderedSharding {
+    fn pick_shard(
+        &self,
+        num_shards: usize,
+        _: &[IggyMessage],
+        stream: &Identifier,
+        topic: &Identifier,
+    ) -> usize {
+        let mut hasher = DefaultHasher::new();
+        stream.hash(&mut hasher);
+        topic.hash(&mut hasher);
+        (hasher.finish() as usize) % num_shards
+    }
+}
+
 #[derive(Debug)]
 pub struct ShardMessage {
     pub stream: Arc<Identifier>,
@@ -169,6 +197,10 @@ impl Shard {
                     }
                     _ = stop_rx.recv() => {
                         closed_clone.store(true, Ordering::Release);
+                        while let Ok(msg) = rx.try_recv() {
+                            buffer_bytes += 
msg.inner.get_size_bytes().as_bytes_usize();
+                            buffer.push(msg);
+                        }
                         if !buffer.is_empty() {
                             Self::flush_buffer(&core, &mut buffer, &mut 
buffer_bytes, &err_sender).await;
                         }
@@ -191,7 +223,22 @@ impl Shard {
         buffer_bytes: &mut usize,
         err_sender: &flume::Sender<ErrorCtx>,
     ) {
+        if buffer.is_empty() {
+            return;
+        }
+
+        let mut merged_batches: Vec<ShardMessageWithPermits> = Vec::new();
         for msg in buffer.drain(..) {
+            if let Some(last) = merged_batches.last_mut()
+                && Self::same_destination(&last.inner, &msg.inner)
+            {
+                last.inner.messages.extend(msg.inner.messages);
+                continue;
+            }
+            merged_batches.push(msg);
+        }
+
+        for msg in merged_batches {
             let result = core
                 .send_internal(
                     &msg.inner.stream,
@@ -201,13 +248,13 @@ impl Shard {
                 )
                 .await;
 
-            if let Err(err) = result {
+            if let Err(error) = result {
                 if let IggyError::ProducerSendFailed {
                     failed,
                     cause,
                     stream_name,
                     topic_name,
-                } = &err
+                } = &error
                 {
                     let ctx = ErrorCtx {
                         cause: cause.to_owned(),
@@ -220,7 +267,7 @@ impl Shard {
                     };
                     let _ = err_sender.send_async(ctx).await;
                 } else {
-                    tracing::error!("background send failed: {err}");
+                    error!("Background send failed. {error}");
                 }
             }
         }
@@ -237,6 +284,12 @@ impl Shard {
             IggyError::BackgroundSendError
         })
     }
+
+    fn same_destination(first: &ShardMessage, second: &ShardMessage) -> bool {
+        first.stream == second.stream
+            && first.topic == second.topic
+            && first.partitioning == second.partitioning
+    }
 }
 
 #[cfg(test)]
diff --git a/core/sdk/src/prelude.rs b/core/sdk/src/prelude.rs
index 5459b3125..ba374fe6b 100644
--- a/core/sdk/src/prelude.rs
+++ b/core/sdk/src/prelude.rs
@@ -40,6 +40,7 @@ pub use crate::clients::consumer_builder::IggyConsumerBuilder;
 pub use crate::clients::producer::IggyProducer;
 pub use crate::clients::producer_builder::IggyProducerBuilder;
 pub use crate::clients::producer_config::{BackgroundConfig, DirectConfig};
+pub use crate::clients::producer_sharding::{BalancedSharding, OrderedSharding, 
Sharding};
 pub use crate::consumer_ext::IggyConsumerMessageExt;
 pub use crate::stream_builder::IggyConsumerConfig;
 pub use crate::stream_builder::IggyStreamConsumer;
diff --git a/core/server/Cargo.toml b/core/server/Cargo.toml
index 26170f137..3e4e7192f 100644
--- a/core/server/Cargo.toml
+++ b/core/server/Cargo.toml
@@ -17,7 +17,7 @@
 
 [package]
 name = "server"
-version = "0.6.1-edge.5"
+version = "0.6.1-edge.6"
 edition = "2024"
 license = "Apache-2.0"
 
@@ -75,7 +75,7 @@ lending-iterator = "0.1.7"
 metadata = { workspace = true }
 mimalloc = { workspace = true, optional = true }
 mime_guess = { version = "2.0", optional = true }
-moka = { version = "0.12.12", features = ["future"] }
+moka = { version = "0.12.13", features = ["future"] }
 nix = { workspace = true }
 opentelemetry = { workspace = true }
 opentelemetry-appender-tracing = { workspace = true }
@@ -95,7 +95,7 @@ send_wrapper = "0.6.0"
 serde = { workspace = true }
 serde_with = { workspace = true }
 slab = { workspace = true }
-socket2 = "0.6.1"
+socket2 = "0.6.2"
 static-toml = "1.3.0"
 strum = { workspace = true }
 sysinfo = { workspace = true }

Reply via email to