This is an automated email from the ASF dual-hosted git repository.

bashirbekov pushed a commit to branch feat/add-background-send
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit 2d93b889e3787305e9c66bdca2b8315adb15c129
Author: haze518 <[email protected]>
AuthorDate: Wed Jun 4 14:56:16 2025 +0600

    del
---
 .../tests/examples/test_new_publisher.rs           |   4 +-
 core/sdk/src/clients/producer_builder.rs           |   5 +-
 core/sdk/src/clients/producer_dispatcher.rs        | 110 +++++++++++----------
 core/sdk/src/clients/producer_sharding.rs          |  54 +++++-----
 4 files changed, 95 insertions(+), 78 deletions(-)

diff --git a/core/integration/tests/examples/test_new_publisher.rs 
b/core/integration/tests/examples/test_new_publisher.rs
index adc3b5ee..0b5ddc0f 100644
--- a/core/integration/tests/examples/test_new_publisher.rs
+++ b/core/integration/tests/examples/test_new_publisher.rs
@@ -43,8 +43,8 @@ async fn test_new_publisher() {
     let producer = client
         .producer("1", "1")
         .unwrap()
-        .background(|b| b.batch_length(100).max_in_flight(100))
-        // .sync(|b| b.batch_length(10))
+        .background(|b| b.batch_length(100).max_in_flight(400))
+        // .sync(|b| b.batch_length(100))
         // .send_mode(SendMode::Background)
         .build();
 
diff --git a/core/sdk/src/clients/producer_builder.rs 
b/core/sdk/src/clients/producer_builder.rs
index f7582735..3030aa5f 100644
--- a/core/sdk/src/clients/producer_builder.rs
+++ b/core/sdk/src/clients/producer_builder.rs
@@ -44,8 +44,9 @@ pub struct BackgroundBuilder {
 
 impl Default for BackgroundBuilder {
     fn default() -> Self {
+        let num_shards = default_shard_count();
         BackgroundBuilder {
-            num_shards: Some(default_shard_count()),
+            num_shards: Some(num_shards),
             sharding: Box::new(BalancedSharding::default()),
             error_callback: Box::new(LogErrorCallback::default()),
             batch_size: Some(1_048_576),
@@ -53,7 +54,7 @@ impl Default for BackgroundBuilder {
             failure_mode: Some(BackpressureMode::Block),
             max_buffer_size: Some(IggyByteSize::from(32 * 1_048_576)),
             linger_time: Some(IggyDuration::from(1000)),
-            max_in_flight: Some(default_shard_count()),
+            max_in_flight: Some(num_shards * num_shards),
         }
     }
 }
diff --git a/core/sdk/src/clients/producer_dispatcher.rs 
b/core/sdk/src/clients/producer_dispatcher.rs
index b1c34ee2..96c146c3 100644
--- a/core/sdk/src/clients/producer_dispatcher.rs
+++ b/core/sdk/src/clients/producer_dispatcher.rs
@@ -108,7 +108,7 @@ impl ProducerDispatcher {
         }
 
         let start = Instant::now();
-    
+
         let shard_message = ShardMessage {
             messages,
             stream,
@@ -117,63 +117,73 @@ impl ProducerDispatcher {
         };
         let batch_bytes = shard_message.get_size_bytes();
 
-        let permit_bytes = match self.config.failure_mode {
-            BackpressureMode::Block => self
-                .bytes_permit
-                .clone()
-                .acquire_many_owned(batch_bytes.as_bytes_u32())
-                .await
-                .map_err(|_| IggyError::BackgroundSendError)?,
-            BackpressureMode::BlockWithTimeout(t) => {
-                match tokio::time::timeout(
-                    t.get_duration(),
-                    self.bytes_permit
-                        .clone()
-                        .acquire_many_owned(batch_bytes.as_bytes_u32()),
-                )
-                .await
-                {
-                    Ok(permit_res) => permit_res.map_err(|_| 
IggyError::BackgroundSendError)?,
-                    Err(_) => {
-                        return Err(IggyError::BackgroundSendTimeout);
+        let permit_bytes = match 
self.bytes_permit.clone().try_acquire_many_owned(batch_bytes.as_bytes_u32()) {
+            Ok(perm) => {
+                perm
+            }
+            Err(_) => {
+                match self.config.failure_mode {
+                    BackpressureMode::FailImmediately => {
+                        return Err(IggyError::BackgroundSendBufferOverflow);
+                    }
+                    BackpressureMode::Block => {
+                        self.bytes_permit
+                            .clone()
+                            .acquire_many_owned(batch_bytes.as_bytes_u32())
+                            .await
+                            .map_err(|_| IggyError::BackgroundSendError)?
+                    }
+                    BackpressureMode::BlockWithTimeout(timeout_dur) => {
+                        match tokio::time::timeout(
+                            timeout_dur.get_duration(),
+                            
self.bytes_permit.clone().acquire_many_owned(batch_bytes.as_bytes_u32()),
+                        )
+                        .await {
+                            Ok(Ok(perm)) => perm,
+                            Ok(Err(_)) => return 
Err(IggyError::BackgroundSendError),
+                            Err(_) => return 
Err(IggyError::BackgroundSendTimeout),
+                        }
                     }
                 }
             }
-            BackpressureMode::FailImmediately => self
-                .bytes_permit
-                .clone()
-                .try_acquire_many_owned(batch_bytes.as_bytes_u32())
-                .map_err(|_| IggyError::BackgroundSendBufferOverflow)?,
         };
 
-        let permit_slot = match self.config.failure_mode {
-            BackpressureMode::Block => self
-                .slots_permit
-                .clone()
-                .acquire_owned()
-                .await
-                .map_err(|_| IggyError::BackgroundSendError)?,
-            BackpressureMode::BlockWithTimeout(timeout_dur) => {
-                match tokio::time::timeout(
-                    timeout_dur.get_duration(),
-                    self.slots_permit.clone().acquire_owned(),
-                )
-                .await
-                {
-                    Ok(permit_res) => permit_res.map_err(|_| 
IggyError::BackgroundSendError)?,
-                    Err(_) => {
-                        drop(permit_bytes);
-                        return Err(IggyError::BackgroundSendTimeout);
-                    }
-                }
+        let permit_slot = match self.slots_permit.clone().try_acquire_owned() {
+            Ok(perm) => {
+                perm
             }
-            BackpressureMode::FailImmediately => {
-                match self.slots_permit.clone().try_acquire_owned() {
-                    Ok(p) => p,
-                    Err(_) => {
+            Err(_) => {
+                match self.config.failure_mode {
+                    BackpressureMode::FailImmediately => {
                         drop(permit_bytes);
                         return Err(IggyError::BackgroundSendError);
                     }
+                    BackpressureMode::Block => {
+                        match self.slots_permit.clone().acquire_owned().await {
+                            Ok(perm) => perm,
+                            Err(_) => {
+                                drop(permit_bytes);
+                                return Err(IggyError::BackgroundSendError)
+                            }
+                        }
+                    }
+                    BackpressureMode::BlockWithTimeout(timeout_dur) => {
+                        match tokio::time::timeout(
+                            timeout_dur.get_duration(),
+                            self.slots_permit.clone().acquire_owned(),
+                        )
+                        .await {
+                            Ok(Ok(perm)) => perm,
+                            Ok(Err(_)) => {
+                                drop(permit_bytes);
+                                return Err(IggyError::BackgroundSendError);
+                            }
+                            Err(_) => {
+                                drop(permit_bytes);
+                                return Err(IggyError::BackgroundSendTimeout);
+                            }
+                        }
+                    }
                 }
             }
         };
@@ -194,7 +204,7 @@ impl ProducerDispatcher {
             ))
             .await;
         
-        print!("dispatch: {:?}", start.elapsed());
+        // print!("dispatch: {:?}", start.elapsed());
         res
     }
 
diff --git a/core/sdk/src/clients/producer_sharding.rs 
b/core/sdk/src/clients/producer_sharding.rs
index d210e3c4..40c9ff38 100644
--- a/core/sdk/src/clients/producer_sharding.rs
+++ b/core/sdk/src/clients/producer_sharding.rs
@@ -92,13 +92,13 @@ impl Sizeable for ShardMessage {
 
 pub struct ShardMessageWithPermits {
     pub inner: ShardMessage,
-    _bytes_permit: OwnedSemaphorePermit,
-    _slot_permit:  OwnedSemaphorePermit,
+    _bytes_permit: Option<OwnedSemaphorePermit>,
+    _slot_permit:  Option<OwnedSemaphorePermit>,
 }
 
 impl ShardMessageWithPermits {
     pub fn new(msg: ShardMessage, permit_bytes: OwnedSemaphorePermit, 
permit_slot: OwnedSemaphorePermit) -> Self {
-        Self { inner: msg, _bytes_permit: permit_bytes, _slot_permit: 
permit_slot }
+        Self { inner: msg, _bytes_permit: Some(permit_bytes), _slot_permit: 
Some(permit_slot) }
     }
 }
 
@@ -115,7 +115,7 @@ impl Shard {
         config: Arc<BackgroundConfig>,
         err_sender: flume::Sender<ErrorCtx>,
     ) -> Self {
-        let (tx, rx) = flume::bounded::<ShardMessageWithPermits>(100);
+        let (tx, rx) = flume::bounded::<ShardMessageWithPermits>(256);
         let shutdown_notify = Arc::new(Notify::new());
         let closed = Arc::new(AtomicBool::new(false));
 
@@ -141,6 +141,7 @@ impl Shard {
                                     .map_or(false, |size| buffer_bytes >= 
size);
 
                                 if exceed_batch_len || exceed_batch_size {
+                                    println!("flush by size/len: 
buffer.len()={}, buffer_bytes={}", buffer.len(), buffer_bytes);
                                     Self::flush_buffer(&core, &mut buffer, 
&mut buffer_bytes, &err_sender).await;
                                     last_flush = tokio::time::Instant::now();
                                 }
@@ -150,6 +151,7 @@ impl Shard {
                     }
                     _ = tokio::time::sleep_until(deadline) => {
                         if !buffer.is_empty() {
+                            println!("flush by timeout: buffer.len()={}, 
buffer_bytes={}", buffer.len(), buffer_bytes);
                             Self::flush_buffer(&core, &mut buffer, &mut 
buffer_bytes, &err_sender).await;
                             last_flush = tokio::time::Instant::now();
                         }
@@ -180,37 +182,41 @@ impl Shard {
         err_sender: &flume::Sender<ErrorCtx>,
     ) {
         let start = Instant::now();
+        println!("buffer size: {}", buffer.len());
         // let mut tasks = FuturesUnordered::new();
         
-        for msg in buffer.drain(..) {
+        for mut msg in buffer.drain(..) {
             // let core = Arc::clone(core);
             // let err_sender = err_sender.clone();
             // tasks.push(async move {
-            let result = core
-                .send_internal(&msg.inner.stream, &msg.inner.topic, 
msg.inner.messages, msg.inner.partitioning.clone())
-                .await;
-
-            if let Err(err) = result {
-                if let IggyError::ProducerSendFailed { failed, cause } = &err {
-                    let ctx = ErrorCtx {
-                        cause: cause.clone(),
-                        stream: msg.inner.stream,
-                        topic: msg.inner.topic,
-                        partitioning: msg.inner.partitioning,
-                        messages: failed.clone(),
-                    };
-                    let _ = err_sender.send_async(ctx).await;
-                } else {
-                    tracing::error!("background send failed: {err}");
+                let result = core
+                    .send_internal(&msg.inner.stream, &msg.inner.topic, 
msg.inner.messages, msg.inner.partitioning.clone())
+                    .await;
+
+                if let Err(err) = result {
+                    if let IggyError::ProducerSendFailed { failed, cause } = 
&err {
+                        let ctx = ErrorCtx {
+                            cause: cause.clone(),
+                            stream: msg.inner.stream,
+                            topic: msg.inner.topic,
+                            partitioning: msg.inner.partitioning,
+                            messages: failed.clone(),
+                        };
+                        let _ = err_sender.send_async(ctx).await;
+                    } else {
+                        tracing::error!("background send failed: {err}");
+                    }
                 }
-            }
+
+                // drop(msg._bytes_permit.take().unwrap());
+                // drop(msg._slot_permit.take().unwrap());
             // });
         }
 
         // while tasks.next().await.is_some() {}
 
         *buffer_bytes = 0;
-        println!("flush_buffer: {:?}", start.elapsed())
+        // println!("flush_buffer: {:?}", start.elapsed())
     }
 
     pub(crate) async fn send(&self, message: ShardMessageWithPermits) -> 
Result<(), IggyError> {
@@ -225,7 +231,7 @@ impl Shard {
             IggyError::BackgroundSendError
         });
 
-        println!("send shard: {:?}", start.elapsed());
+        // println!("send shard: {:?}", start.elapsed());
 
         res
     }

Reply via email to