This is an automated email from the ASF dual-hosted git repository.

piotr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/master by this push:
     new 6b700d85b fix(sdk): acquire max_in_flight permits at network send, not 
dispatch (#2625)
6b700d85b is described below

commit 6b700d85b21e619ab93d77b4e9ba1e6517603f30
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Wed Jan 28 09:48:54 2026 +0100

    fix(sdk): acquire max_in_flight permits at network send, not dispatch 
(#2625)
    
    Slot permits were acquired in dispatch() and held until flush completed.
    With max_in_flight=1, each message blocked waiting for linger_time before 
the next could be buffered. For linger set to 50ms, 1000 messages took 50+ 
seconds instead of ~50ms.
    max_in_flight should limit concurrent network requests, not buffer depth.
---
 Cargo.lock                                  |   2 +-
 DEPENDENCIES.md                             |   2 +-
 core/sdk/Cargo.toml                         |   2 +-
 core/sdk/src/clients/producer_dispatcher.rs |  68 ++++--------------
 core/sdk/src/clients/producer_sharding.rs   | 108 ++++++++++++++--------------
 5 files changed, 71 insertions(+), 111 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index f7eeda3fb..5b992c303 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -4429,7 +4429,7 @@ dependencies = [
 
 [[package]]
 name = "iggy"
-version = "0.8.1-edge.6"
+version = "0.8.1-edge.7"
 dependencies = [
  "async-broadcast",
  "async-dropper",
diff --git a/DEPENDENCIES.md b/DEPENDENCIES.md
index c00b459ed..94277cfcc 100644
--- a/DEPENDENCIES.md
+++ b/DEPENDENCIES.md
@@ -385,7 +385,7 @@ icu_provider: 2.1.1, "Unicode-3.0",
 ident_case: 1.0.1, "Apache-2.0 OR MIT",
 idna: 1.1.0, "Apache-2.0 OR MIT",
 idna_adapter: 1.2.1, "Apache-2.0 OR MIT",
-iggy: 0.8.1-edge.6, "Apache-2.0",
+iggy: 0.8.1-edge.7, "Apache-2.0",
 iggy-bench: 0.3.1-edge.2, "Apache-2.0",
 iggy-bench-dashboard-server: 0.5.1-edge.1, "Apache-2.0",
 iggy-cli: 0.10.1-edge.1, "Apache-2.0",
diff --git a/core/sdk/Cargo.toml b/core/sdk/Cargo.toml
index 6d33647bb..3f16531c6 100644
--- a/core/sdk/Cargo.toml
+++ b/core/sdk/Cargo.toml
@@ -17,7 +17,7 @@
 
 [package]
 name = "iggy"
-version = "0.8.1-edge.6"
+version = "0.8.1-edge.7"
 description = "Iggy is the persistent message streaming platform written in 
Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing 
millions of messages per second."
 edition = "2024"
 license = "Apache-2.0"
diff --git a/core/sdk/src/clients/producer_dispatcher.rs 
b/core/sdk/src/clients/producer_dispatcher.rs
index c8ab06423..16d2a0e79 100644
--- a/core/sdk/src/clients/producer_dispatcher.rs
+++ b/core/sdk/src/clients/producer_dispatcher.rs
@@ -18,7 +18,7 @@
 use crate::clients::producer::ProducerCoreBackend;
 use crate::clients::producer_config::{BackgroundConfig, BackpressureMode};
 use crate::clients::producer_error_callback::ErrorCtx;
-use crate::clients::producer_sharding::{Shard, ShardMessage, 
ShardMessageWithPermits};
+use crate::clients::producer_sharding::{Shard, ShardMessage, 
ShardMessageWithPermit};
 use futures::FutureExt;
 use iggy_common::{Identifier, IggyError, IggyMessage, Partitioning, Sizeable};
 use std::sync::Arc;
@@ -30,7 +30,6 @@ pub struct ProducerDispatcher {
     shards: Vec<Shard>,
     config: Arc<BackgroundConfig>,
     closed: AtomicBool,
-    slots_permit: Arc<Semaphore>,
     bytes_permit: Arc<Semaphore>,
     stop_tx: broadcast::Sender<()>,
     _join_handle: JoinHandle<()>,
@@ -75,33 +74,33 @@ impl ProducerDispatcher {
             }
         });
 
+        let bytes_permit = {
+            let bytes = config.max_buffer_size.as_bytes_usize();
+            if bytes == 0 { usize::MAX } else { bytes }
+        };
+
+        let slots_permit = Arc::new(Semaphore::new(if config.max_in_flight == 
0 {
+            usize::MAX
+        } else {
+            config.max_in_flight
+        }));
+
         for _ in 0..num_shards {
             let stop_rx = stop_tx.subscribe();
             shards.push(Shard::new(
                 core.clone(),
                 config.clone(),
+                slots_permit.clone(),
                 err_tx.clone(),
                 stop_rx,
             ));
         }
 
-        let bytes_permit = {
-            let bytes = config.max_buffer_size.as_bytes_usize();
-            if bytes == 0 { usize::MAX } else { bytes }
-        };
-
-        let slot_permit = if config.max_in_flight == 0 {
-            usize::MAX
-        } else {
-            config.max_in_flight
-        };
-
         Self {
             shards,
             config,
             closed: AtomicBool::new(false),
             bytes_permit: Arc::new(Semaphore::new(bytes_permit)),
-            slots_permit: Arc::new(Semaphore::new(slot_permit)),
             stop_tx,
             _join_handle: handle,
         }
@@ -163,41 +162,6 @@ impl ProducerDispatcher {
             },
         };
 
-        let permit_slot = match self.slots_permit.clone().try_acquire_owned() {
-            Ok(perm) => perm,
-            Err(_) => match self.config.failure_mode {
-                BackpressureMode::FailImmediately => {
-                    drop(permit_bytes);
-                    return Err(IggyError::BackgroundSendError);
-                }
-                BackpressureMode::Block => match 
self.slots_permit.clone().acquire_owned().await {
-                    Ok(perm) => perm,
-                    Err(_) => {
-                        drop(permit_bytes);
-                        return Err(IggyError::BackgroundSendError);
-                    }
-                },
-                BackpressureMode::BlockWithTimeout(timeout_dur) => {
-                    match tokio::time::timeout(
-                        timeout_dur.get_duration(),
-                        self.slots_permit.clone().acquire_owned(),
-                    )
-                    .await
-                    {
-                        Ok(Ok(perm)) => perm,
-                        Ok(Err(_)) => {
-                            drop(permit_bytes);
-                            return Err(IggyError::BackgroundSendError);
-                        }
-                        Err(_) => {
-                            drop(permit_bytes);
-                            return Err(IggyError::BackgroundSendTimeout);
-                        }
-                    }
-                }
-            },
-        };
-
         let shard_ix = self.config.sharding.pick_shard(
             self.shards.len(),
             &shard_message.messages,
@@ -208,11 +172,7 @@ impl ProducerDispatcher {
         let shard = &self.shards[shard_ix];
 
         shard
-            .send(ShardMessageWithPermits::new(
-                shard_message,
-                permit_bytes,
-                permit_slot,
-            ))
+            .send(ShardMessageWithPermit::new(shard_message, permit_bytes))
             .await
     }
 
diff --git a/core/sdk/src/clients/producer_sharding.rs 
b/core/sdk/src/clients/producer_sharding.rs
index 545212984..49cf2fd64 100644
--- a/core/sdk/src/clients/producer_sharding.rs
+++ b/core/sdk/src/clients/producer_sharding.rs
@@ -23,7 +23,7 @@ use std::hash::DefaultHasher;
 use std::hash::{Hash, Hasher};
 use std::sync::Arc;
 use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
-use tokio::sync::{OwnedSemaphorePermit, broadcast};
+use tokio::sync::{OwnedSemaphorePermit, Semaphore, broadcast};
 use tokio::task::JoinHandle;
 use tracing::{debug, error};
 
@@ -108,28 +108,22 @@ impl Sizeable for ShardMessage {
     }
 }
 
-pub struct ShardMessageWithPermits {
+pub struct ShardMessageWithPermit {
     pub inner: ShardMessage,
     _bytes_permit: Option<OwnedSemaphorePermit>,
-    _slot_permit: Option<OwnedSemaphorePermit>,
 }
 
-impl ShardMessageWithPermits {
-    pub fn new(
-        msg: ShardMessage,
-        permit_bytes: OwnedSemaphorePermit,
-        permit_slot: OwnedSemaphorePermit,
-    ) -> Self {
+impl ShardMessageWithPermit {
+    pub fn new(msg: ShardMessage, permit_bytes: OwnedSemaphorePermit) -> Self {
         Self {
             inner: msg,
             _bytes_permit: Some(permit_bytes),
-            _slot_permit: Some(permit_slot),
         }
     }
 }
 
 pub struct Shard {
-    tx: flume::Sender<ShardMessageWithPermits>,
+    tx: flume::Sender<ShardMessageWithPermit>,
     closed: Arc<AtomicBool>,
     pub(crate) _handle: JoinHandle<()>,
 }
@@ -138,10 +132,11 @@ impl Shard {
     pub fn new(
         core: Arc<impl ProducerCoreBackend>,
         config: Arc<BackgroundConfig>,
+        slots_permit: Arc<Semaphore>,
         err_sender: flume::Sender<ErrorCtx>,
         mut stop_rx: broadcast::Receiver<()>,
     ) -> Self {
-        let (tx, rx) = flume::bounded::<ShardMessageWithPermits>(256);
+        let (tx, rx) = flume::bounded::<ShardMessageWithPermit>(256);
         let closed = Arc::new(AtomicBool::new(false));
 
         let closed_clone = closed.clone();
@@ -176,7 +171,7 @@ impl Shard {
                                         exceed_batch_size,
                                     );
 
-                                    Self::flush_buffer(&core, &mut buffer, 
&mut buffer_bytes, &err_sender).await;
+                                    Self::flush_buffer(&core, &slots_permit, 
&mut buffer, &mut buffer_bytes, &err_sender).await;
                                     debug!(
                                         new_buffer_len = buffer.len(),
                                         new_buffer_bytes = buffer_bytes,
@@ -191,7 +186,7 @@ impl Shard {
                     }
                     _ = tokio::time::sleep_until(deadline) => {
                         if !buffer.is_empty() {
-                            Self::flush_buffer(&core, &mut buffer, &mut 
buffer_bytes, &err_sender).await;
+                            Self::flush_buffer(&core, &slots_permit, &mut 
buffer, &mut buffer_bytes, &err_sender).await;
                         }
                         last_flush = tokio::time::Instant::now();
                     }
@@ -202,7 +197,7 @@ impl Shard {
                             buffer.push(msg);
                         }
                         if !buffer.is_empty() {
-                            Self::flush_buffer(&core, &mut buffer, &mut 
buffer_bytes, &err_sender).await;
+                            Self::flush_buffer(&core, &slots_permit, &mut 
buffer, &mut buffer_bytes, &err_sender).await;
                         }
                         break;
                     }
@@ -219,7 +214,8 @@ impl Shard {
 
     async fn flush_buffer(
         core: &Arc<impl ProducerCoreBackend>,
-        buffer: &mut Vec<ShardMessageWithPermits>,
+        slots_permit: &Arc<Semaphore>,
+        buffer: &mut Vec<ShardMessageWithPermit>,
         buffer_bytes: &mut usize,
         err_sender: &flume::Sender<ErrorCtx>,
     ) {
@@ -227,7 +223,7 @@ impl Shard {
             return;
         }
 
-        let mut merged_batches: Vec<ShardMessageWithPermits> = Vec::new();
+        let mut merged_batches: Vec<ShardMessageWithPermit> = Vec::new();
         for msg in buffer.drain(..) {
             if let Some(last) = merged_batches.last_mut()
                 && Self::same_destination(&last.inner, &msg.inner)
@@ -239,6 +235,8 @@ impl Shard {
         }
 
         for msg in merged_batches {
+            let _slot_permit = slots_permit.acquire().await;
+
             let result = core
                 .send_internal(
                     &msg.inner.stream,
@@ -274,7 +272,7 @@ impl Shard {
         *buffer_bytes = 0;
     }
 
-    pub(crate) async fn send(&self, message: ShardMessageWithPermits) -> 
Result<(), IggyError> {
+    pub(crate) async fn send(&self, message: ShardMessageWithPermit) -> 
Result<(), IggyError> {
         if self.closed.load(Ordering::Acquire) {
             return Err(IggyError::ProducerClosed);
         }
@@ -299,7 +297,7 @@ mod tests {
     use bytes::Bytes;
     use iggy_common::IggyDuration;
     use std::time::Duration;
-    use tokio::{sync::Semaphore, time::sleep};
+    use tokio::time::sleep;
 
     fn dummy_identifier() -> Arc<Identifier> {
         Arc::new(Identifier::numeric(1).unwrap())
@@ -325,13 +323,17 @@ mod tests {
             .batch_size(10_000);
         let config = Arc::new(bb.build());
 
-        let (permit_bytes, permit_slot) = (
-            Arc::new(Semaphore::new(100_000)),
-            Arc::new(Semaphore::new(100)),
-        );
+        let permit_bytes = Arc::new(Semaphore::new(100_000));
+        let slots_permit = Arc::new(Semaphore::new(100));
 
         let (_stop_tx, stop_rx) = broadcast::channel(1);
-        let shard = Shard::new(Arc::new(mock), config, flume::unbounded().0, 
stop_rx);
+        let shard = Shard::new(
+            Arc::new(mock),
+            config,
+            slots_permit,
+            flume::unbounded().0,
+            stop_rx,
+        );
 
         for _ in 0..10 {
             let message = ShardMessage {
@@ -340,10 +342,9 @@ mod tests {
                 messages: vec![dummy_message(1)],
                 partitioning: None,
             };
-            let wrapped = ShardMessageWithPermits::new(
+            let wrapped = ShardMessageWithPermit::new(
                 message,
                 permit_bytes.clone().acquire_many_owned(1).await.unwrap(),
-                permit_slot.clone().acquire_owned().await.unwrap(),
             );
             shard.send(wrapped).await.unwrap();
         }
@@ -364,13 +365,17 @@ mod tests {
             .batch_size(10_000);
         let config = Arc::new(bb.build());
 
-        let (permit_bytes, permit_slot) = (
-            Arc::new(Semaphore::new(10_000)),
-            Arc::new(Semaphore::new(100)),
-        );
+        let permit_bytes = Arc::new(Semaphore::new(10_000));
+        let slots_permit = Arc::new(Semaphore::new(100));
 
         let (_stop_tx, stop_rx) = broadcast::channel(1);
-        let shard = Shard::new(Arc::new(mock), config, flume::unbounded().0, 
stop_rx);
+        let shard = Shard::new(
+            Arc::new(mock),
+            config,
+            slots_permit,
+            flume::unbounded().0,
+            stop_rx,
+        );
 
         let message = ShardMessage {
             stream: dummy_identifier(),
@@ -378,14 +383,13 @@ mod tests {
             messages: vec![dummy_message(10_000)],
             partitioning: None,
         };
-        let wrapped = ShardMessageWithPermits::new(
+        let wrapped = ShardMessageWithPermit::new(
             message,
             permit_bytes
                 .clone()
                 .acquire_many_owned(10_000)
                 .await
                 .unwrap(),
-            permit_slot.clone().acquire_owned().await.unwrap(),
         );
         shard.send(wrapped).await.unwrap();
 
@@ -405,13 +409,17 @@ mod tests {
             .batch_size(10_000);
         let config = Arc::new(bb.build());
 
-        let (permit_bytes, permit_slot) = (
-            Arc::new(Semaphore::new(10_000)),
-            Arc::new(Semaphore::new(100)),
-        );
+        let permit_bytes = Arc::new(Semaphore::new(10_000));
+        let slots_permit = Arc::new(Semaphore::new(100));
 
         let (_stop_tx, stop_rx) = broadcast::channel(1);
-        let shard = Shard::new(Arc::new(mock), config, flume::unbounded().0, 
stop_rx);
+        let shard = Shard::new(
+            Arc::new(mock),
+            config,
+            slots_permit,
+            flume::unbounded().0,
+            stop_rx,
+        );
 
         let message = ShardMessage {
             stream: dummy_identifier(),
@@ -419,10 +427,9 @@ mod tests {
             messages: vec![dummy_message(1)],
             partitioning: None,
         };
-        let wrapped = ShardMessageWithPermits::new(
+        let wrapped = ShardMessageWithPermit::new(
             message,
             permit_bytes.clone().acquire_many_owned(1).await.unwrap(),
-            permit_slot.clone().acquire_owned().await.unwrap(),
         );
         shard.send(wrapped).await.unwrap();
 
@@ -448,13 +455,11 @@ mod tests {
         let bb = BackgroundConfig::builder();
         let config = Arc::new(bb.build());
 
-        let (permit_bytes, permit_slot) = (
-            Arc::new(Semaphore::new(10_000)),
-            Arc::new(Semaphore::new(100)),
-        );
+        let permit_bytes = Arc::new(Semaphore::new(10_000));
+        let slots_permit = Arc::new(Semaphore::new(100));
 
         let (_stop_tx, stop_rx) = broadcast::channel(1);
-        let shard = Shard::new(Arc::new(mock), config, err_tx, stop_rx);
+        let shard = Shard::new(Arc::new(mock), config, slots_permit, err_tx, 
stop_rx);
 
         let message = ShardMessage {
             stream: dummy_identifier(),
@@ -462,10 +467,9 @@ mod tests {
             messages: vec![dummy_message(1)],
             partitioning: None,
         };
-        let wrapped = ShardMessageWithPermits::new(
+        let wrapped = ShardMessageWithPermit::new(
             message,
             permit_bytes.clone().acquire_many_owned(1).await.unwrap(),
-            permit_slot.clone().acquire_owned().await.unwrap(),
         );
         shard.send(wrapped).await.unwrap();
 
@@ -476,7 +480,7 @@ mod tests {
 
     #[tokio::test]
     async fn test_shard_send_error_on_closed_channel() {
-        let (tx, rx) = flume::bounded::<ShardMessageWithPermits>(1);
+        let (tx, rx) = flume::bounded::<ShardMessageWithPermit>(1);
         drop(rx);
 
         let shard = Shard {
@@ -485,10 +489,7 @@ mod tests {
             _handle: tokio::spawn(async {}),
         };
 
-        let (permit_bytes, permit_slot) = (
-            Arc::new(Semaphore::new(10_000)),
-            Arc::new(Semaphore::new(100)),
-        );
+        let permit_bytes = Arc::new(Semaphore::new(10_000));
 
         let message = ShardMessage {
             stream: dummy_identifier(),
@@ -496,10 +497,9 @@ mod tests {
             messages: vec![dummy_message(1)],
             partitioning: None,
         };
-        let wrapped = ShardMessageWithPermits::new(
+        let wrapped = ShardMessageWithPermit::new(
             message,
             permit_bytes.clone().acquire_many_owned(1).await.unwrap(),
-            permit_slot.clone().acquire_owned().await.unwrap(),
         );
 
         let result = shard.send(wrapped).await;

Reply via email to