This is an automated email from the ASF dual-hosted git repository. bashirbekov pushed a commit to branch feat/add-background-send-new in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 28947c5e899d5dbdf2dd3c16f93119101d0a1fbb Author: haze518 <[email protected]> AuthorDate: Sun Jun 8 10:41:20 2025 +0600 del --- core/sdk/src/clients/producer.rs | 7 +- core/sdk/src/clients/producer_builder.rs | 438 +++++++++++---------- core/sdk/src/clients/producer_config.rs | 39 +- core/sdk/src/clients/producer_dispatcher.rs | 13 +- .../stream_builder/build/build_iggy_producer.rs | 3 +- 5 files changed, 260 insertions(+), 240 deletions(-) diff --git a/core/sdk/src/clients/producer.rs b/core/sdk/src/clients/producer.rs index 58f38e2b..1303eecc 100644 --- a/core/sdk/src/clients/producer.rs +++ b/core/sdk/src/clients/producer.rs @@ -378,16 +378,13 @@ impl ProducerCoreBackend for ProducerCore { match &self.sync_config { Some(cfg) => { - let linger_time_micros = match cfg.linger_time { - Some(t) => t.as_micros(), - None => 0, - }; + let linger_time_micros = cfg.linger_time.as_micros(); if linger_time_micros > 0 { Self::wait_before_sending(linger_time_micros, self.last_sent_at.load(ORDERING)) .await; } - let max = cfg.batch_length; + let max = cfg.batch_length as usize; let mut index = 0; while index < msgs.len() { let end = (index + max).min(msgs.len()); diff --git a/core/sdk/src/clients/producer_builder.rs b/core/sdk/src/clients/producer_builder.rs index 431f94ee..1ab5d3fd 100644 --- a/core/sdk/src/clients/producer_builder.rs +++ b/core/sdk/src/clients/producer_builder.rs @@ -16,11 +16,12 @@ // under the License. use super::MAX_BATCH_LENGTH; -use crate::clients::producer_config::{BackgroundConfig, BackpressureMode, SyncConfig}; +use crate::clients::producer_config::{BackgroundConfig, BackpressureMode, SyncConfig, SyncConfigBuilder}; use crate::clients::producer_error_callback::ErrorCallback; use crate::clients::producer_error_callback::LogErrorCallback; use crate::clients::producer_sharding::{BalancedSharding, Sharding}; use crate::prelude::IggyProducer; +use bon::builder; use iggy_binary_protocol::Client; use iggy_common::locking::IggySharedMut; use iggy_common::{ @@ -28,201 +29,202 @@ use iggy_common::{ Partitioning, }; use std::sync::Arc; - -pub struct BackgroundBuilder { - num_shards: Option<usize>, - batch_size: Option<usize>, - batch_length: Option<usize>, - failure_mode: Option<BackpressureMode>, - max_buffer_size: Option<IggyByteSize>, - linger_time: Option<IggyDuration>, - max_in_flight: Option<usize>, - - error_callback: Box<dyn ErrorCallback>, - sharding: Box<dyn Sharding>, -} - -impl Default for BackgroundBuilder { - fn default() -> Self { - let num_shards = default_shard_count(); - BackgroundBuilder { - num_shards: Some(num_shards), - sharding: Box::new(BalancedSharding::default()), - error_callback: Box::new(LogErrorCallback), - batch_size: Some(1_048_576), - batch_length: Some(1000), - failure_mode: Some(BackpressureMode::Block), - max_buffer_size: Some(IggyByteSize::from(32 * 1_048_576)), - linger_time: Some(IggyDuration::from(1000)), - max_in_flight: Some(num_shards * num_shards), - } - } -} - -impl BackgroundBuilder { - /// Sets the number of messages to batch before sending them, can be combined with `interval`. - pub fn batch_length(self, batch_length: u32) -> Self { - Self { - batch_length: if batch_length == 0 { - None - } else { - Some(batch_length.min(MAX_BATCH_LENGTH as u32) as usize) - }, - ..self - } - } - - /// Clears the batch size. - pub fn without_batch_length(self) -> Self { - Self { - batch_length: None, - ..self - } - } - - /// Sets the interval between sending the messages, can be combined with `batch_length`. - pub fn linger_time(self, interval: IggyDuration) -> Self { - Self { - linger_time: Some(interval), - ..self - } - } - - /// Clears the interval. - pub fn without_linger_time(self) -> Self { - Self { - linger_time: None, - ..self - } - } - - /// Sets the number of shards (background workers). - pub fn num_shards(self, value: usize) -> Self { - Self { - num_shards: Some(value), - ..self - } - } - - /// Sets the maximum size of a batch in bytes. - pub fn batch_size(self, value: usize) -> Self { - Self { - batch_size: Some(value), - ..self - } - } - - /// Sets the sharding strategy. - /// You can pass a custom implementation that implements the `Sharding` trait. - pub fn sharding(self, sharding: Box<dyn Sharding>) -> Self { - Self { sharding, ..self } - } - - /// Sets the maximum buffer size for all in-flight messages (in bytes). - pub fn max_buffer_size(self, value: IggyByteSize) -> Self { - Self { - max_buffer_size: Some(value), - ..self - } - } - - /// Sets the failure mode behavior (e.g., block, fail immediately, timeout). - pub fn failure_mode(self, mode: BackpressureMode) -> Self { - Self { - failure_mode: Some(mode), - ..self - } - } - - /// Sets the error callback for handling background sending errors. - pub fn error_callback(self, callback: Box<dyn ErrorCallback>) -> Self { - Self { - error_callback: callback, - ..self - } - } - - /// Sets the maximum number of in-flight batches/messages. - pub fn max_in_flight(self, value: usize) -> Self { - Self { - max_in_flight: Some(value), - ..self - } - } - - pub fn build(self) -> BackgroundConfig { - BackgroundConfig { - num_shards: self.num_shards.unwrap_or(8), - batch_size: self.batch_size, - batch_length: self.batch_length, - failure_mode: self.failure_mode.unwrap_or(BackpressureMode::Block), - max_buffer_size: self.max_buffer_size, - linger_time: self.linger_time.unwrap_or(IggyDuration::from(1000)), - error_callback: Arc::new(self.error_callback), - sharding: self.sharding, - max_in_flight: self.max_in_flight, - } - } -} - -pub struct SyncBuilder { - batch_length: Option<usize>, - linger_time: Option<IggyDuration>, -} - -impl Default for SyncBuilder { - fn default() -> Self { - Self { - batch_length: Some(1000), - linger_time: Some(IggyDuration::from(1000)), - } - } -} - -impl SyncBuilder { - /// Sets the number of messages to batch before sending them, can be combined with `interval`. - pub fn batch_length(self, batch_length: u32) -> Self { - Self { - batch_length: if batch_length == 0 { - None - } else { - Some(batch_length.min(MAX_BATCH_LENGTH as u32) as usize) - }, - ..self - } - } - - /// Clears the batch size. - pub fn without_batch_length(self) -> Self { - Self { - batch_length: None, - ..self - } - } - - /// Sets the interval between sending the messages, can be combined with `batch_length`. - pub fn linger_time(self, interval: IggyDuration) -> Self { - Self { - linger_time: Some(interval), - ..self - } - } - - /// Clears the interval. - pub fn without_linger_time(self) -> Self { - Self { - linger_time: None, - ..self - } - } - - pub fn build(self) -> SyncConfig { - SyncConfig { - batch_length: self.batch_length.unwrap_or(MAX_BATCH_LENGTH), - linger_time: self.linger_time, - } - } -} +use bon::Builder; + +// pub struct BackgroundBuilder { +// num_shards: Option<usize>, +// batch_size: Option<usize>, +// batch_length: Option<usize>, +// failure_mode: Option<BackpressureMode>, +// max_buffer_size: Option<IggyByteSize>, +// linger_time: Option<IggyDuration>, +// max_in_flight: Option<usize>, +// error_callback: Box<dyn ErrorCallback>, +// sharding: Box<dyn Sharding>, +// } + +// impl Default for BackgroundBuilder { +// fn default() -> Self { +// let num_shards = default_shard_count(); +// BackgroundBuilder { +// num_shards: Some(num_shards), +// sharding: Box::new(BalancedSharding::default()), +// error_callback: Box::new(LogErrorCallback), +// batch_size: Some(1_048_576), +// batch_length: Some(1000), +// failure_mode: Some(BackpressureMode::Block), +// max_buffer_size: Some(IggyByteSize::from(32 * 1_048_576)), +// linger_time: Some(IggyDuration::from(1000)), +// max_in_flight: Some(num_shards * num_shards), +// } +// } +// } + +// impl BackgroundBuilder { +// /// Sets the number of messages to batch before sending them, can be combined with `interval`. +// pub fn batch_length(self, batch_length: u32) -> Self { +// Self { +// batch_length: if batch_length == 0 { +// None +// } else { +// Some(batch_length.min(MAX_BATCH_LENGTH as u32) as usize) +// }, +// ..self +// } +// } + +// /// Clears the batch size. +// pub fn without_batch_length(self) -> Self { +// Self { +// batch_length: None, +// ..self +// } +// } + +// /// Sets the interval between sending the messages, can be combined with `batch_length`. +// pub fn linger_time(self, interval: IggyDuration) -> Self { +// Self { +// linger_time: Some(interval), +// ..self +// } +// } + +// /// Clears the interval. +// pub fn without_linger_time(self) -> Self { +// Self { +// linger_time: None, +// ..self +// } +// } + +// /// Sets the number of shards (background workers). +// pub fn num_shards(self, value: usize) -> Self { +// Self { +// num_shards: Some(value), +// ..self +// } +// } + +// /// Sets the maximum size of a batch in bytes. +// pub fn batch_size(self, value: usize) -> Self { +// Self { +// batch_size: Some(value), +// ..self +// } +// } + +// /// Sets the sharding strategy. +// /// You can pass a custom implementation that implements the `Sharding` trait. +// pub fn sharding(self, sharding: Box<dyn Sharding>) -> Self { +// Self { sharding, ..self } +// } + +// /// Sets the maximum buffer size for all in-flight messages (in bytes). +// pub fn max_buffer_size(self, value: IggyByteSize) -> Self { +// Self { +// max_buffer_size: Some(value), +// ..self +// } +// } + +// /// Sets the failure mode behavior (e.g., block, fail immediately, timeout). +// pub fn failure_mode(self, mode: BackpressureMode) -> Self { +// Self { +// failure_mode: Some(mode), +// ..self +// } +// } + +// /// Sets the error callback for handling background sending errors. +// pub fn error_callback(self, callback: Box<dyn ErrorCallback>) -> Self { +// Self { +// error_callback: callback, +// ..self +// } +// } + +// /// Sets the maximum number of in-flight batches/messages. +// pub fn max_in_flight(self, value: usize) -> Self { +// Self { +// max_in_flight: Some(value), +// ..self +// } +// } + +// pub fn build(self) -> BackgroundConfig { +// BackgroundConfig { +// num_shards: self.num_shards.unwrap_or(8), +// batch_size: self.batch_size, +// batch_length: self.batch_length, +// failure_mode: self.failure_mode.unwrap_or(BackpressureMode::Block), +// max_buffer_size: self.max_buffer_size, +// linger_time: self.linger_time.unwrap_or(IggyDuration::from(1000)), +// error_callback: Arc::new(self.error_callback), +// sharding: self.sharding, +// max_in_flight: self.max_in_flight, +// } +// } +// } + +// #[derive(Builder)] +// pub struct SyncBuilder { +// batch_length: Option<usize>, +// linger_time: Option<IggyDuration>, +// } + +// impl Default for SyncBuilder { +// fn default() -> Self { +// Self { +// batch_length: Some(1000), +// linger_time: Some(IggyDuration::from(1000)), +// } +// } +// } + +// impl SyncBuilder { +// /// Sets the number of messages to batch before sending them, can be combined with `interval`. +// pub fn batch_length(self, batch_length: u32) -> Self { +// Self { +// batch_length: if batch_length == 0 { +// None +// } else { +// Some(batch_length.min(MAX_BATCH_LENGTH as u32) as usize) +// }, +// ..self +// } +// } + +// /// Clears the batch size. +// pub fn without_batch_length(self) -> Self { +// Self { +// batch_length: None, +// ..self +// } +// } + +// /// Sets the interval between sending the messages, can be combined with `batch_length`. +// pub fn linger_time(self, interval: IggyDuration) -> Self { +// Self { +// linger_time: Some(interval), +// ..self +// } +// } + +// /// Clears the interval. +// pub fn without_linger_time(self) -> Self { +// Self { +// linger_time: None, +// ..self +// } +// } + +// pub fn build(self) -> SyncConfig { +// SyncConfig { +// batch_length: self.batch_length.unwrap_or(MAX_BATCH_LENGTH), +// linger_time: self.linger_time, +// } +// } +// } pub enum SendMode { Sync(SyncConfig), @@ -231,7 +233,7 @@ pub enum SendMode { impl Default for SendMode { fn default() -> Self { - SendMode::Sync(SyncBuilder::default().build()) + SendMode::Sync(SyncConfig::builder().build()) } } @@ -398,20 +400,30 @@ impl IggyProducerBuilder { } } + pub fn sync(mut self, config: SyncConfig) -> Self { + self.mode = SendMode::Sync(config); + self + } + + pub fn background(mut self, config: BackgroundConfig) -> Self { + self.mode = SendMode::Background(config); + self + } + /// Configures the producer to use synchronous (immediate) sending mode. /// /// In sync mode, messages are sent immediately on `.send()` without background buffering. /// /// # Arguments /// * `f` - A closure that modifies the `SyncBuilder` configuration. - pub fn sync<F>(mut self, f: F) -> Self - where - F: FnOnce(SyncBuilder) -> SyncBuilder, - { - let cfg = f(SyncBuilder::default()).build(); - self.mode = SendMode::Sync(cfg); - self - } + // pub fn sync<F>(mut self, f: F) -> Self + // where + // F: FnOnce(SyncConfigBuilder) -> SyncConfigBuilder, + // { + // let cfg = f(SyncConfig::builder()).build(); + // self.mode = SendMode::Sync(cfg); + // self + // } /// Configures the producer to use background (asynchronous) sending mode. /// @@ -419,14 +431,14 @@ impl IggyProducerBuilder { /// /// # Arguments /// * `f` - A closure that modifies the `BackgroundBuilder` configuration. - pub fn background<F>(mut self, f: F) -> Self - where - F: FnOnce(BackgroundBuilder) -> BackgroundBuilder, - { - let cfg = f(BackgroundBuilder::default()).build(); - self.mode = SendMode::Background(cfg); - self - } + // pub fn background<F>(mut self, f: F) -> Self + // where + // F: FnOnce(BackgroundBuilder) -> BackgroundBuilder, + // { + // let cfg = f(BackgroundBuilder::default()).build(); + // self.mode = SendMode::Background(cfg); + // self + // } pub fn build(self) -> IggyProducer { IggyProducer::new( diff --git a/core/sdk/src/clients/producer_config.rs b/core/sdk/src/clients/producer_config.rs index 0ba0e6ef..2848e986 100644 --- a/core/sdk/src/clients/producer_config.rs +++ b/core/sdk/src/clients/producer_config.rs @@ -17,10 +17,11 @@ */ use std::sync::Arc; +use bon::Builder; use iggy_common::{IggyByteSize, IggyDuration}; -use crate::clients::producer_error_callback::ErrorCallback; -use crate::clients::producer_sharding::Sharding; +use crate::clients::producer_error_callback::{ErrorCallback, LogErrorCallback}; +use crate::clients::producer_sharding::{BalancedSharding, Sharding}; #[derive(Debug, Clone)] /// Determines how the `send_messages` API should behave when problem is encountered @@ -33,21 +34,37 @@ pub enum BackpressureMode { FailImmediately, } -#[derive(Debug)] +#[derive(Debug, Builder)] pub struct BackgroundConfig { + #[builder(default = default_shard_count())] pub num_shards: usize, - pub batch_size: Option<usize>, - pub batch_length: Option<usize>, - pub failure_mode: BackpressureMode, - pub max_buffer_size: Option<IggyByteSize>, - pub max_in_flight: Option<usize>, + #[builder(default = IggyDuration::from(1000))] pub linger_time: IggyDuration, + #[builder(default = Arc::new(Box::new(LogErrorCallback)))] pub error_callback: Arc<Box<dyn ErrorCallback + Send + Sync>>, + #[builder(default = Box::new(BalancedSharding::default()))] pub sharding: Box<dyn Sharding + Send + Sync>, + #[builder(default = 1_048_576)] + pub batch_size: usize, + #[builder(default = 1000)] + pub batch_length: usize, + #[builder(default = BackpressureMode::Block)] + pub failure_mode: BackpressureMode, + #[builder(default = IggyByteSize::from(32 * 1_048_576))] + pub max_buffer_size: IggyByteSize, + #[builder(default = default_shard_count() * 2)] + pub max_in_flight: usize, } -#[derive(Clone)] +#[derive(Clone, Builder)] pub struct SyncConfig { - pub batch_length: usize, - pub linger_time: Option<IggyDuration>, + #[builder(default = 1000)] + pub batch_length: u32, + #[builder(default = IggyDuration::from(1000))] + pub linger_time: IggyDuration, +} + +fn default_shard_count() -> usize { + let cpus = num_cpus::get(); + cpus.clamp(2, 16) } diff --git a/core/sdk/src/clients/producer_dispatcher.rs b/core/sdk/src/clients/producer_dispatcher.rs index c56df46d..12bb4e8a 100644 --- a/core/sdk/src/clients/producer_dispatcher.rs +++ b/core/sdk/src/clients/producer_dispatcher.rs @@ -74,21 +74,15 @@ impl ProducerDispatcher { shards.push(Shard::new(core.clone(), config.clone(), err_tx.clone())); } - let bytes_permit = match config.max_buffer_size { - Some(val) => val.as_bytes_u32() as usize, - None => usize::MAX, - }; - let slot_permit = match config.max_in_flight { - Some(val) => val, - None => usize::MAX, - }; + let bytes_permit = config.max_buffer_size.as_bytes_usize(); + let max_in_flight = config.max_in_flight; Self { shards, config, closed: AtomicBool::new(false), bytes_permit: Arc::new(Semaphore::new(bytes_permit)), - slots_permit: Arc::new(Semaphore::new(slot_permit)), + slots_permit: Arc::new(Semaphore::new(max_in_flight)), shutdown_notify, _join_handle: handle, } @@ -222,7 +216,6 @@ mod tests { use tokio::time::sleep; use crate::clients::producer::MockProducerCoreBackend; - use crate::clients::producer_builder::BackgroundBuilder; use crate::clients::producer_error_callback::ErrorCallback; use crate::clients::producer_sharding::Sharding; diff --git a/core/sdk/src/stream_builder/build/build_iggy_producer.rs b/core/sdk/src/stream_builder/build/build_iggy_producer.rs index afe8d122..6eabe919 100644 --- a/core/sdk/src/stream_builder/build/build_iggy_producer.rs +++ b/core/sdk/src/stream_builder/build/build_iggy_producer.rs @@ -18,6 +18,7 @@ use crate::clients::client::IggyClient; use crate::clients::producer::IggyProducer; +use crate::clients::producer_config::SyncConfig; use crate::prelude::{IggyError, IggyExpiry, MaxTopicSize}; use crate::stream_builder::IggyProducerConfig; use tracing::{error, trace}; @@ -65,7 +66,7 @@ pub(crate) async fn build_iggy_producer( IggyExpiry::ServerDefault, MaxTopicSize::ServerDefault, ) - .sync(|b| b.batch_length(batch_length).linger_time(linger_time)); + .sync(SyncConfig::builder().batch_length(batch_length).linger_time(linger_time).build()); if let Some(encryptor) = config.encryptor() { builder = builder.encryptor(encryptor);
