This is an automated email from the ASF dual-hosted git repository.
piotr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new 6b700d85b fix(sdk): acquire max_in_flight permits at network send, not
dispatch (#2625)
6b700d85b is described below
commit 6b700d85b21e619ab93d77b4e9ba1e6517603f30
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Wed Jan 28 09:48:54 2026 +0100
fix(sdk): acquire max_in_flight permits at network send, not dispatch
(#2625)
Slot permits were acquired in dispatch() and held until flush completed.
With max_in_flight=1, each message blocked waiting for linger_time before
the next could be buffered. For linger set to 50ms, 1000 messages took 50+
seconds instead of ~50ms.
max_in_flight should limit concurrent network requests, not buffer depth.
---
Cargo.lock | 2 +-
DEPENDENCIES.md | 2 +-
core/sdk/Cargo.toml | 2 +-
core/sdk/src/clients/producer_dispatcher.rs | 68 ++++--------------
core/sdk/src/clients/producer_sharding.rs | 108 ++++++++++++++--------------
5 files changed, 71 insertions(+), 111 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index f7eeda3fb..5b992c303 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -4429,7 +4429,7 @@ dependencies = [
[[package]]
name = "iggy"
-version = "0.8.1-edge.6"
+version = "0.8.1-edge.7"
dependencies = [
"async-broadcast",
"async-dropper",
diff --git a/DEPENDENCIES.md b/DEPENDENCIES.md
index c00b459ed..94277cfcc 100644
--- a/DEPENDENCIES.md
+++ b/DEPENDENCIES.md
@@ -385,7 +385,7 @@ icu_provider: 2.1.1, "Unicode-3.0",
ident_case: 1.0.1, "Apache-2.0 OR MIT",
idna: 1.1.0, "Apache-2.0 OR MIT",
idna_adapter: 1.2.1, "Apache-2.0 OR MIT",
-iggy: 0.8.1-edge.6, "Apache-2.0",
+iggy: 0.8.1-edge.7, "Apache-2.0",
iggy-bench: 0.3.1-edge.2, "Apache-2.0",
iggy-bench-dashboard-server: 0.5.1-edge.1, "Apache-2.0",
iggy-cli: 0.10.1-edge.1, "Apache-2.0",
diff --git a/core/sdk/Cargo.toml b/core/sdk/Cargo.toml
index 6d33647bb..3f16531c6 100644
--- a/core/sdk/Cargo.toml
+++ b/core/sdk/Cargo.toml
@@ -17,7 +17,7 @@
[package]
name = "iggy"
-version = "0.8.1-edge.6"
+version = "0.8.1-edge.7"
description = "Iggy is the persistent message streaming platform written in
Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing
millions of messages per second."
edition = "2024"
license = "Apache-2.0"
diff --git a/core/sdk/src/clients/producer_dispatcher.rs
b/core/sdk/src/clients/producer_dispatcher.rs
index c8ab06423..16d2a0e79 100644
--- a/core/sdk/src/clients/producer_dispatcher.rs
+++ b/core/sdk/src/clients/producer_dispatcher.rs
@@ -18,7 +18,7 @@
use crate::clients::producer::ProducerCoreBackend;
use crate::clients::producer_config::{BackgroundConfig, BackpressureMode};
use crate::clients::producer_error_callback::ErrorCtx;
-use crate::clients::producer_sharding::{Shard, ShardMessage,
ShardMessageWithPermits};
+use crate::clients::producer_sharding::{Shard, ShardMessage,
ShardMessageWithPermit};
use futures::FutureExt;
use iggy_common::{Identifier, IggyError, IggyMessage, Partitioning, Sizeable};
use std::sync::Arc;
@@ -30,7 +30,6 @@ pub struct ProducerDispatcher {
shards: Vec<Shard>,
config: Arc<BackgroundConfig>,
closed: AtomicBool,
- slots_permit: Arc<Semaphore>,
bytes_permit: Arc<Semaphore>,
stop_tx: broadcast::Sender<()>,
_join_handle: JoinHandle<()>,
@@ -75,33 +74,33 @@ impl ProducerDispatcher {
}
});
+ let bytes_permit = {
+ let bytes = config.max_buffer_size.as_bytes_usize();
+ if bytes == 0 { usize::MAX } else { bytes }
+ };
+
+ let slots_permit = Arc::new(Semaphore::new(if config.max_in_flight ==
0 {
+ usize::MAX
+ } else {
+ config.max_in_flight
+ }));
+
for _ in 0..num_shards {
let stop_rx = stop_tx.subscribe();
shards.push(Shard::new(
core.clone(),
config.clone(),
+ slots_permit.clone(),
err_tx.clone(),
stop_rx,
));
}
- let bytes_permit = {
- let bytes = config.max_buffer_size.as_bytes_usize();
- if bytes == 0 { usize::MAX } else { bytes }
- };
-
- let slot_permit = if config.max_in_flight == 0 {
- usize::MAX
- } else {
- config.max_in_flight
- };
-
Self {
shards,
config,
closed: AtomicBool::new(false),
bytes_permit: Arc::new(Semaphore::new(bytes_permit)),
- slots_permit: Arc::new(Semaphore::new(slot_permit)),
stop_tx,
_join_handle: handle,
}
@@ -163,41 +162,6 @@ impl ProducerDispatcher {
},
};
- let permit_slot = match self.slots_permit.clone().try_acquire_owned() {
- Ok(perm) => perm,
- Err(_) => match self.config.failure_mode {
- BackpressureMode::FailImmediately => {
- drop(permit_bytes);
- return Err(IggyError::BackgroundSendError);
- }
- BackpressureMode::Block => match
self.slots_permit.clone().acquire_owned().await {
- Ok(perm) => perm,
- Err(_) => {
- drop(permit_bytes);
- return Err(IggyError::BackgroundSendError);
- }
- },
- BackpressureMode::BlockWithTimeout(timeout_dur) => {
- match tokio::time::timeout(
- timeout_dur.get_duration(),
- self.slots_permit.clone().acquire_owned(),
- )
- .await
- {
- Ok(Ok(perm)) => perm,
- Ok(Err(_)) => {
- drop(permit_bytes);
- return Err(IggyError::BackgroundSendError);
- }
- Err(_) => {
- drop(permit_bytes);
- return Err(IggyError::BackgroundSendTimeout);
- }
- }
- }
- },
- };
-
let shard_ix = self.config.sharding.pick_shard(
self.shards.len(),
&shard_message.messages,
@@ -208,11 +172,7 @@ impl ProducerDispatcher {
let shard = &self.shards[shard_ix];
shard
- .send(ShardMessageWithPermits::new(
- shard_message,
- permit_bytes,
- permit_slot,
- ))
+ .send(ShardMessageWithPermit::new(shard_message, permit_bytes))
.await
}
diff --git a/core/sdk/src/clients/producer_sharding.rs
b/core/sdk/src/clients/producer_sharding.rs
index 545212984..49cf2fd64 100644
--- a/core/sdk/src/clients/producer_sharding.rs
+++ b/core/sdk/src/clients/producer_sharding.rs
@@ -23,7 +23,7 @@ use std::hash::DefaultHasher;
use std::hash::{Hash, Hasher};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
-use tokio::sync::{OwnedSemaphorePermit, broadcast};
+use tokio::sync::{OwnedSemaphorePermit, Semaphore, broadcast};
use tokio::task::JoinHandle;
use tracing::{debug, error};
@@ -108,28 +108,22 @@ impl Sizeable for ShardMessage {
}
}
-pub struct ShardMessageWithPermits {
+pub struct ShardMessageWithPermit {
pub inner: ShardMessage,
_bytes_permit: Option<OwnedSemaphorePermit>,
- _slot_permit: Option<OwnedSemaphorePermit>,
}
-impl ShardMessageWithPermits {
- pub fn new(
- msg: ShardMessage,
- permit_bytes: OwnedSemaphorePermit,
- permit_slot: OwnedSemaphorePermit,
- ) -> Self {
+impl ShardMessageWithPermit {
+ pub fn new(msg: ShardMessage, permit_bytes: OwnedSemaphorePermit) -> Self {
Self {
inner: msg,
_bytes_permit: Some(permit_bytes),
- _slot_permit: Some(permit_slot),
}
}
}
pub struct Shard {
- tx: flume::Sender<ShardMessageWithPermits>,
+ tx: flume::Sender<ShardMessageWithPermit>,
closed: Arc<AtomicBool>,
pub(crate) _handle: JoinHandle<()>,
}
@@ -138,10 +132,11 @@ impl Shard {
pub fn new(
core: Arc<impl ProducerCoreBackend>,
config: Arc<BackgroundConfig>,
+ slots_permit: Arc<Semaphore>,
err_sender: flume::Sender<ErrorCtx>,
mut stop_rx: broadcast::Receiver<()>,
) -> Self {
- let (tx, rx) = flume::bounded::<ShardMessageWithPermits>(256);
+ let (tx, rx) = flume::bounded::<ShardMessageWithPermit>(256);
let closed = Arc::new(AtomicBool::new(false));
let closed_clone = closed.clone();
@@ -176,7 +171,7 @@ impl Shard {
exceed_batch_size,
);
- Self::flush_buffer(&core, &mut buffer,
&mut buffer_bytes, &err_sender).await;
+ Self::flush_buffer(&core, &slots_permit,
&mut buffer, &mut buffer_bytes, &err_sender).await;
debug!(
new_buffer_len = buffer.len(),
new_buffer_bytes = buffer_bytes,
@@ -191,7 +186,7 @@ impl Shard {
}
_ = tokio::time::sleep_until(deadline) => {
if !buffer.is_empty() {
- Self::flush_buffer(&core, &mut buffer, &mut
buffer_bytes, &err_sender).await;
+ Self::flush_buffer(&core, &slots_permit, &mut
buffer, &mut buffer_bytes, &err_sender).await;
}
last_flush = tokio::time::Instant::now();
}
@@ -202,7 +197,7 @@ impl Shard {
buffer.push(msg);
}
if !buffer.is_empty() {
- Self::flush_buffer(&core, &mut buffer, &mut
buffer_bytes, &err_sender).await;
+ Self::flush_buffer(&core, &slots_permit, &mut
buffer, &mut buffer_bytes, &err_sender).await;
}
break;
}
@@ -219,7 +214,8 @@ impl Shard {
async fn flush_buffer(
core: &Arc<impl ProducerCoreBackend>,
- buffer: &mut Vec<ShardMessageWithPermits>,
+ slots_permit: &Arc<Semaphore>,
+ buffer: &mut Vec<ShardMessageWithPermit>,
buffer_bytes: &mut usize,
err_sender: &flume::Sender<ErrorCtx>,
) {
@@ -227,7 +223,7 @@ impl Shard {
return;
}
- let mut merged_batches: Vec<ShardMessageWithPermits> = Vec::new();
+ let mut merged_batches: Vec<ShardMessageWithPermit> = Vec::new();
for msg in buffer.drain(..) {
if let Some(last) = merged_batches.last_mut()
&& Self::same_destination(&last.inner, &msg.inner)
@@ -239,6 +235,8 @@ impl Shard {
}
for msg in merged_batches {
+ let _slot_permit = slots_permit.acquire().await;
+
let result = core
.send_internal(
&msg.inner.stream,
@@ -274,7 +272,7 @@ impl Shard {
*buffer_bytes = 0;
}
- pub(crate) async fn send(&self, message: ShardMessageWithPermits) ->
Result<(), IggyError> {
+ pub(crate) async fn send(&self, message: ShardMessageWithPermit) ->
Result<(), IggyError> {
if self.closed.load(Ordering::Acquire) {
return Err(IggyError::ProducerClosed);
}
@@ -299,7 +297,7 @@ mod tests {
use bytes::Bytes;
use iggy_common::IggyDuration;
use std::time::Duration;
- use tokio::{sync::Semaphore, time::sleep};
+ use tokio::time::sleep;
fn dummy_identifier() -> Arc<Identifier> {
Arc::new(Identifier::numeric(1).unwrap())
@@ -325,13 +323,17 @@ mod tests {
.batch_size(10_000);
let config = Arc::new(bb.build());
- let (permit_bytes, permit_slot) = (
- Arc::new(Semaphore::new(100_000)),
- Arc::new(Semaphore::new(100)),
- );
+ let permit_bytes = Arc::new(Semaphore::new(100_000));
+ let slots_permit = Arc::new(Semaphore::new(100));
let (_stop_tx, stop_rx) = broadcast::channel(1);
- let shard = Shard::new(Arc::new(mock), config, flume::unbounded().0,
stop_rx);
+ let shard = Shard::new(
+ Arc::new(mock),
+ config,
+ slots_permit,
+ flume::unbounded().0,
+ stop_rx,
+ );
for _ in 0..10 {
let message = ShardMessage {
@@ -340,10 +342,9 @@ mod tests {
messages: vec![dummy_message(1)],
partitioning: None,
};
- let wrapped = ShardMessageWithPermits::new(
+ let wrapped = ShardMessageWithPermit::new(
message,
permit_bytes.clone().acquire_many_owned(1).await.unwrap(),
- permit_slot.clone().acquire_owned().await.unwrap(),
);
shard.send(wrapped).await.unwrap();
}
@@ -364,13 +365,17 @@ mod tests {
.batch_size(10_000);
let config = Arc::new(bb.build());
- let (permit_bytes, permit_slot) = (
- Arc::new(Semaphore::new(10_000)),
- Arc::new(Semaphore::new(100)),
- );
+ let permit_bytes = Arc::new(Semaphore::new(10_000));
+ let slots_permit = Arc::new(Semaphore::new(100));
let (_stop_tx, stop_rx) = broadcast::channel(1);
- let shard = Shard::new(Arc::new(mock), config, flume::unbounded().0,
stop_rx);
+ let shard = Shard::new(
+ Arc::new(mock),
+ config,
+ slots_permit,
+ flume::unbounded().0,
+ stop_rx,
+ );
let message = ShardMessage {
stream: dummy_identifier(),
@@ -378,14 +383,13 @@ mod tests {
messages: vec![dummy_message(10_000)],
partitioning: None,
};
- let wrapped = ShardMessageWithPermits::new(
+ let wrapped = ShardMessageWithPermit::new(
message,
permit_bytes
.clone()
.acquire_many_owned(10_000)
.await
.unwrap(),
- permit_slot.clone().acquire_owned().await.unwrap(),
);
shard.send(wrapped).await.unwrap();
@@ -405,13 +409,17 @@ mod tests {
.batch_size(10_000);
let config = Arc::new(bb.build());
- let (permit_bytes, permit_slot) = (
- Arc::new(Semaphore::new(10_000)),
- Arc::new(Semaphore::new(100)),
- );
+ let permit_bytes = Arc::new(Semaphore::new(10_000));
+ let slots_permit = Arc::new(Semaphore::new(100));
let (_stop_tx, stop_rx) = broadcast::channel(1);
- let shard = Shard::new(Arc::new(mock), config, flume::unbounded().0,
stop_rx);
+ let shard = Shard::new(
+ Arc::new(mock),
+ config,
+ slots_permit,
+ flume::unbounded().0,
+ stop_rx,
+ );
let message = ShardMessage {
stream: dummy_identifier(),
@@ -419,10 +427,9 @@ mod tests {
messages: vec![dummy_message(1)],
partitioning: None,
};
- let wrapped = ShardMessageWithPermits::new(
+ let wrapped = ShardMessageWithPermit::new(
message,
permit_bytes.clone().acquire_many_owned(1).await.unwrap(),
- permit_slot.clone().acquire_owned().await.unwrap(),
);
shard.send(wrapped).await.unwrap();
@@ -448,13 +455,11 @@ mod tests {
let bb = BackgroundConfig::builder();
let config = Arc::new(bb.build());
- let (permit_bytes, permit_slot) = (
- Arc::new(Semaphore::new(10_000)),
- Arc::new(Semaphore::new(100)),
- );
+ let permit_bytes = Arc::new(Semaphore::new(10_000));
+ let slots_permit = Arc::new(Semaphore::new(100));
let (_stop_tx, stop_rx) = broadcast::channel(1);
- let shard = Shard::new(Arc::new(mock), config, err_tx, stop_rx);
+ let shard = Shard::new(Arc::new(mock), config, slots_permit, err_tx,
stop_rx);
let message = ShardMessage {
stream: dummy_identifier(),
@@ -462,10 +467,9 @@ mod tests {
messages: vec![dummy_message(1)],
partitioning: None,
};
- let wrapped = ShardMessageWithPermits::new(
+ let wrapped = ShardMessageWithPermit::new(
message,
permit_bytes.clone().acquire_many_owned(1).await.unwrap(),
- permit_slot.clone().acquire_owned().await.unwrap(),
);
shard.send(wrapped).await.unwrap();
@@ -476,7 +480,7 @@ mod tests {
#[tokio::test]
async fn test_shard_send_error_on_closed_channel() {
- let (tx, rx) = flume::bounded::<ShardMessageWithPermits>(1);
+ let (tx, rx) = flume::bounded::<ShardMessageWithPermit>(1);
drop(rx);
let shard = Shard {
@@ -485,10 +489,7 @@ mod tests {
_handle: tokio::spawn(async {}),
};
- let (permit_bytes, permit_slot) = (
- Arc::new(Semaphore::new(10_000)),
- Arc::new(Semaphore::new(100)),
- );
+ let permit_bytes = Arc::new(Semaphore::new(10_000));
let message = ShardMessage {
stream: dummy_identifier(),
@@ -496,10 +497,9 @@ mod tests {
messages: vec![dummy_message(1)],
partitioning: None,
};
- let wrapped = ShardMessageWithPermits::new(
+ let wrapped = ShardMessageWithPermit::new(
message,
permit_bytes.clone().acquire_many_owned(1).await.unwrap(),
- permit_slot.clone().acquire_owned().await.unwrap(),
);
let result = shard.send(wrapped).await;