This is an automated email from the ASF dual-hosted git repository. bashirbekov pushed a commit to branch feat/add-background-send in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 2d93b889e3787305e9c66bdca2b8315adb15c129 Author: haze518 <[email protected]> AuthorDate: Wed Jun 4 14:56:16 2025 +0600 del --- .../tests/examples/test_new_publisher.rs | 4 +- core/sdk/src/clients/producer_builder.rs | 5 +- core/sdk/src/clients/producer_dispatcher.rs | 110 +++++++++++---------- core/sdk/src/clients/producer_sharding.rs | 54 +++++----- 4 files changed, 95 insertions(+), 78 deletions(-) diff --git a/core/integration/tests/examples/test_new_publisher.rs b/core/integration/tests/examples/test_new_publisher.rs index adc3b5ee..0b5ddc0f 100644 --- a/core/integration/tests/examples/test_new_publisher.rs +++ b/core/integration/tests/examples/test_new_publisher.rs @@ -43,8 +43,8 @@ async fn test_new_publisher() { let producer = client .producer("1", "1") .unwrap() - .background(|b| b.batch_length(100).max_in_flight(100)) - // .sync(|b| b.batch_length(10)) + .background(|b| b.batch_length(100).max_in_flight(400)) + // .sync(|b| b.batch_length(100)) // .send_mode(SendMode::Background) .build(); diff --git a/core/sdk/src/clients/producer_builder.rs b/core/sdk/src/clients/producer_builder.rs index f7582735..3030aa5f 100644 --- a/core/sdk/src/clients/producer_builder.rs +++ b/core/sdk/src/clients/producer_builder.rs @@ -44,8 +44,9 @@ pub struct BackgroundBuilder { impl Default for BackgroundBuilder { fn default() -> Self { + let num_shards = default_shard_count(); BackgroundBuilder { - num_shards: Some(default_shard_count()), + num_shards: Some(num_shards), sharding: Box::new(BalancedSharding::default()), error_callback: Box::new(LogErrorCallback::default()), batch_size: Some(1_048_576), @@ -53,7 +54,7 @@ impl Default for BackgroundBuilder { failure_mode: Some(BackpressureMode::Block), max_buffer_size: Some(IggyByteSize::from(32 * 1_048_576)), linger_time: Some(IggyDuration::from(1000)), - max_in_flight: Some(default_shard_count()), + max_in_flight: Some(num_shards * num_shards), } } } diff --git a/core/sdk/src/clients/producer_dispatcher.rs b/core/sdk/src/clients/producer_dispatcher.rs index b1c34ee2..96c146c3 100644 --- a/core/sdk/src/clients/producer_dispatcher.rs +++ b/core/sdk/src/clients/producer_dispatcher.rs @@ -108,7 +108,7 @@ impl ProducerDispatcher { } let start = Instant::now(); - + let shard_message = ShardMessage { messages, stream, @@ -117,63 +117,73 @@ impl ProducerDispatcher { }; let batch_bytes = shard_message.get_size_bytes(); - let permit_bytes = match self.config.failure_mode { - BackpressureMode::Block => self - .bytes_permit - .clone() - .acquire_many_owned(batch_bytes.as_bytes_u32()) - .await - .map_err(|_| IggyError::BackgroundSendError)?, - BackpressureMode::BlockWithTimeout(t) => { - match tokio::time::timeout( - t.get_duration(), - self.bytes_permit - .clone() - .acquire_many_owned(batch_bytes.as_bytes_u32()), - ) - .await - { - Ok(permit_res) => permit_res.map_err(|_| IggyError::BackgroundSendError)?, - Err(_) => { - return Err(IggyError::BackgroundSendTimeout); + let permit_bytes = match self.bytes_permit.clone().try_acquire_many_owned(batch_bytes.as_bytes_u32()) { + Ok(perm) => { + perm + } + Err(_) => { + match self.config.failure_mode { + BackpressureMode::FailImmediately => { + return Err(IggyError::BackgroundSendBufferOverflow); + } + BackpressureMode::Block => { + self.bytes_permit + .clone() + .acquire_many_owned(batch_bytes.as_bytes_u32()) + .await + .map_err(|_| IggyError::BackgroundSendError)? + } + BackpressureMode::BlockWithTimeout(timeout_dur) => { + match tokio::time::timeout( + timeout_dur.get_duration(), + self.bytes_permit.clone().acquire_many_owned(batch_bytes.as_bytes_u32()), + ) + .await { + Ok(Ok(perm)) => perm, + Ok(Err(_)) => return Err(IggyError::BackgroundSendError), + Err(_) => return Err(IggyError::BackgroundSendTimeout), + } } } } - BackpressureMode::FailImmediately => self - .bytes_permit - .clone() - .try_acquire_many_owned(batch_bytes.as_bytes_u32()) - .map_err(|_| IggyError::BackgroundSendBufferOverflow)?, }; - let permit_slot = match self.config.failure_mode { - BackpressureMode::Block => self - .slots_permit - .clone() - .acquire_owned() - .await - .map_err(|_| IggyError::BackgroundSendError)?, - BackpressureMode::BlockWithTimeout(timeout_dur) => { - match tokio::time::timeout( - timeout_dur.get_duration(), - self.slots_permit.clone().acquire_owned(), - ) - .await - { - Ok(permit_res) => permit_res.map_err(|_| IggyError::BackgroundSendError)?, - Err(_) => { - drop(permit_bytes); - return Err(IggyError::BackgroundSendTimeout); - } - } + let permit_slot = match self.slots_permit.clone().try_acquire_owned() { + Ok(perm) => { + perm } - BackpressureMode::FailImmediately => { - match self.slots_permit.clone().try_acquire_owned() { - Ok(p) => p, - Err(_) => { + Err(_) => { + match self.config.failure_mode { + BackpressureMode::FailImmediately => { drop(permit_bytes); return Err(IggyError::BackgroundSendError); } + BackpressureMode::Block => { + match self.slots_permit.clone().acquire_owned().await { + Ok(perm) => perm, + Err(_) => { + drop(permit_bytes); + return Err(IggyError::BackgroundSendError) + } + } + } + BackpressureMode::BlockWithTimeout(timeout_dur) => { + match tokio::time::timeout( + timeout_dur.get_duration(), + self.slots_permit.clone().acquire_owned(), + ) + .await { + Ok(Ok(perm)) => perm, + Ok(Err(_)) => { + drop(permit_bytes); + return Err(IggyError::BackgroundSendError); + } + Err(_) => { + drop(permit_bytes); + return Err(IggyError::BackgroundSendTimeout); + } + } + } } } }; @@ -194,7 +204,7 @@ impl ProducerDispatcher { )) .await; - print!("dispatch: {:?}", start.elapsed()); + // print!("dispatch: {:?}", start.elapsed()); res } diff --git a/core/sdk/src/clients/producer_sharding.rs b/core/sdk/src/clients/producer_sharding.rs index d210e3c4..40c9ff38 100644 --- a/core/sdk/src/clients/producer_sharding.rs +++ b/core/sdk/src/clients/producer_sharding.rs @@ -92,13 +92,13 @@ impl Sizeable for ShardMessage { pub struct ShardMessageWithPermits { pub inner: ShardMessage, - _bytes_permit: OwnedSemaphorePermit, - _slot_permit: OwnedSemaphorePermit, + _bytes_permit: Option<OwnedSemaphorePermit>, + _slot_permit: Option<OwnedSemaphorePermit>, } impl ShardMessageWithPermits { pub fn new(msg: ShardMessage, permit_bytes: OwnedSemaphorePermit, permit_slot: OwnedSemaphorePermit) -> Self { - Self { inner: msg, _bytes_permit: permit_bytes, _slot_permit: permit_slot } + Self { inner: msg, _bytes_permit: Some(permit_bytes), _slot_permit: Some(permit_slot) } } } @@ -115,7 +115,7 @@ impl Shard { config: Arc<BackgroundConfig>, err_sender: flume::Sender<ErrorCtx>, ) -> Self { - let (tx, rx) = flume::bounded::<ShardMessageWithPermits>(100); + let (tx, rx) = flume::bounded::<ShardMessageWithPermits>(256); let shutdown_notify = Arc::new(Notify::new()); let closed = Arc::new(AtomicBool::new(false)); @@ -141,6 +141,7 @@ impl Shard { .map_or(false, |size| buffer_bytes >= size); if exceed_batch_len || exceed_batch_size { + println!("flush by size/len: buffer.len()={}, buffer_bytes={}", buffer.len(), buffer_bytes); Self::flush_buffer(&core, &mut buffer, &mut buffer_bytes, &err_sender).await; last_flush = tokio::time::Instant::now(); } @@ -150,6 +151,7 @@ impl Shard { } _ = tokio::time::sleep_until(deadline) => { if !buffer.is_empty() { + println!("flush by timeout: buffer.len()={}, buffer_bytes={}", buffer.len(), buffer_bytes); Self::flush_buffer(&core, &mut buffer, &mut buffer_bytes, &err_sender).await; last_flush = tokio::time::Instant::now(); } @@ -180,37 +182,41 @@ impl Shard { err_sender: &flume::Sender<ErrorCtx>, ) { let start = Instant::now(); + println!("buffer size: {}", buffer.len()); // let mut tasks = FuturesUnordered::new(); - for msg in buffer.drain(..) { + for mut msg in buffer.drain(..) { // let core = Arc::clone(core); // let err_sender = err_sender.clone(); // tasks.push(async move { - let result = core - .send_internal(&msg.inner.stream, &msg.inner.topic, msg.inner.messages, msg.inner.partitioning.clone()) - .await; - - if let Err(err) = result { - if let IggyError::ProducerSendFailed { failed, cause } = &err { - let ctx = ErrorCtx { - cause: cause.clone(), - stream: msg.inner.stream, - topic: msg.inner.topic, - partitioning: msg.inner.partitioning, - messages: failed.clone(), - }; - let _ = err_sender.send_async(ctx).await; - } else { - tracing::error!("background send failed: {err}"); + let result = core + .send_internal(&msg.inner.stream, &msg.inner.topic, msg.inner.messages, msg.inner.partitioning.clone()) + .await; + + if let Err(err) = result { + if let IggyError::ProducerSendFailed { failed, cause } = &err { + let ctx = ErrorCtx { + cause: cause.clone(), + stream: msg.inner.stream, + topic: msg.inner.topic, + partitioning: msg.inner.partitioning, + messages: failed.clone(), + }; + let _ = err_sender.send_async(ctx).await; + } else { + tracing::error!("background send failed: {err}"); + } } - } + + // drop(msg._bytes_permit.take().unwrap()); + // drop(msg._slot_permit.take().unwrap()); // }); } // while tasks.next().await.is_some() {} *buffer_bytes = 0; - println!("flush_buffer: {:?}", start.elapsed()) + // println!("flush_buffer: {:?}", start.elapsed()) } pub(crate) async fn send(&self, message: ShardMessageWithPermits) -> Result<(), IggyError> { @@ -225,7 +231,7 @@ impl Shard { IggyError::BackgroundSendError }); - println!("send shard: {:?}", start.elapsed()); + // println!("send shard: {:?}", start.elapsed()); res }
