This is an automated email from the ASF dual-hosted git repository.

bashirbekov pushed a commit to branch feat/add-background-send-new
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit 28947c5e899d5dbdf2dd3c16f93119101d0a1fbb
Author: haze518 <[email protected]>
AuthorDate: Sun Jun 8 10:41:20 2025 +0600

    del
---
 core/sdk/src/clients/producer.rs                   |   7 +-
 core/sdk/src/clients/producer_builder.rs           | 438 +++++++++++----------
 core/sdk/src/clients/producer_config.rs            |  39 +-
 core/sdk/src/clients/producer_dispatcher.rs        |  13 +-
 .../stream_builder/build/build_iggy_producer.rs    |   3 +-
 5 files changed, 260 insertions(+), 240 deletions(-)

diff --git a/core/sdk/src/clients/producer.rs b/core/sdk/src/clients/producer.rs
index 58f38e2b..1303eecc 100644
--- a/core/sdk/src/clients/producer.rs
+++ b/core/sdk/src/clients/producer.rs
@@ -378,16 +378,13 @@ impl ProducerCoreBackend for ProducerCore {
 
         match &self.sync_config {
             Some(cfg) => {
-                let linger_time_micros = match cfg.linger_time {
-                    Some(t) => t.as_micros(),
-                    None => 0,
-                };
+                let linger_time_micros = cfg.linger_time.as_micros();
                 if linger_time_micros > 0 {
                     Self::wait_before_sending(linger_time_micros, 
self.last_sent_at.load(ORDERING))
                         .await;
                 }
 
-                let max = cfg.batch_length;
+                let max = cfg.batch_length as usize;
                 let mut index = 0;
                 while index < msgs.len() {
                     let end = (index + max).min(msgs.len());
diff --git a/core/sdk/src/clients/producer_builder.rs 
b/core/sdk/src/clients/producer_builder.rs
index 431f94ee..1ab5d3fd 100644
--- a/core/sdk/src/clients/producer_builder.rs
+++ b/core/sdk/src/clients/producer_builder.rs
@@ -16,11 +16,12 @@
 // under the License.
 
 use super::MAX_BATCH_LENGTH;
-use crate::clients::producer_config::{BackgroundConfig, BackpressureMode, 
SyncConfig};
+use crate::clients::producer_config::{BackgroundConfig, BackpressureMode, 
SyncConfig, SyncConfigBuilder};
 use crate::clients::producer_error_callback::ErrorCallback;
 use crate::clients::producer_error_callback::LogErrorCallback;
 use crate::clients::producer_sharding::{BalancedSharding, Sharding};
 use crate::prelude::IggyProducer;
+use bon::builder;
 use iggy_binary_protocol::Client;
 use iggy_common::locking::IggySharedMut;
 use iggy_common::{
@@ -28,201 +29,202 @@ use iggy_common::{
     Partitioning,
 };
 use std::sync::Arc;
-
-pub struct BackgroundBuilder {
-    num_shards: Option<usize>,
-    batch_size: Option<usize>,
-    batch_length: Option<usize>,
-    failure_mode: Option<BackpressureMode>,
-    max_buffer_size: Option<IggyByteSize>,
-    linger_time: Option<IggyDuration>,
-    max_in_flight: Option<usize>,
-
-    error_callback: Box<dyn ErrorCallback>,
-    sharding: Box<dyn Sharding>,
-}
-
-impl Default for BackgroundBuilder {
-    fn default() -> Self {
-        let num_shards = default_shard_count();
-        BackgroundBuilder {
-            num_shards: Some(num_shards),
-            sharding: Box::new(BalancedSharding::default()),
-            error_callback: Box::new(LogErrorCallback),
-            batch_size: Some(1_048_576),
-            batch_length: Some(1000),
-            failure_mode: Some(BackpressureMode::Block),
-            max_buffer_size: Some(IggyByteSize::from(32 * 1_048_576)),
-            linger_time: Some(IggyDuration::from(1000)),
-            max_in_flight: Some(num_shards * num_shards),
-        }
-    }
-}
-
-impl BackgroundBuilder {
-    /// Sets the number of messages to batch before sending them, can be 
combined with `interval`.
-    pub fn batch_length(self, batch_length: u32) -> Self {
-        Self {
-            batch_length: if batch_length == 0 {
-                None
-            } else {
-                Some(batch_length.min(MAX_BATCH_LENGTH as u32) as usize)
-            },
-            ..self
-        }
-    }
-
-    /// Clears the batch size.
-    pub fn without_batch_length(self) -> Self {
-        Self {
-            batch_length: None,
-            ..self
-        }
-    }
-
-    /// Sets the interval between sending the messages, can be combined with 
`batch_length`.
-    pub fn linger_time(self, interval: IggyDuration) -> Self {
-        Self {
-            linger_time: Some(interval),
-            ..self
-        }
-    }
-
-    /// Clears the interval.
-    pub fn without_linger_time(self) -> Self {
-        Self {
-            linger_time: None,
-            ..self
-        }
-    }
-
-    /// Sets the number of shards (background workers).
-    pub fn num_shards(self, value: usize) -> Self {
-        Self {
-            num_shards: Some(value),
-            ..self
-        }
-    }
-
-    /// Sets the maximum size of a batch in bytes.
-    pub fn batch_size(self, value: usize) -> Self {
-        Self {
-            batch_size: Some(value),
-            ..self
-        }
-    }
-
-    /// Sets the sharding strategy.
-    /// You can pass a custom implementation that implements the `Sharding` 
trait.
-    pub fn sharding(self, sharding: Box<dyn Sharding>) -> Self {
-        Self { sharding, ..self }
-    }
-
-    /// Sets the maximum buffer size for all in-flight messages (in bytes).
-    pub fn max_buffer_size(self, value: IggyByteSize) -> Self {
-        Self {
-            max_buffer_size: Some(value),
-            ..self
-        }
-    }
-
-    /// Sets the failure mode behavior (e.g., block, fail immediately, 
timeout).
-    pub fn failure_mode(self, mode: BackpressureMode) -> Self {
-        Self {
-            failure_mode: Some(mode),
-            ..self
-        }
-    }
-
-    /// Sets the error callback for handling background sending errors.
-    pub fn error_callback(self, callback: Box<dyn ErrorCallback>) -> Self {
-        Self {
-            error_callback: callback,
-            ..self
-        }
-    }
-
-    /// Sets the maximum number of in-flight batches/messages.
-    pub fn max_in_flight(self, value: usize) -> Self {
-        Self {
-            max_in_flight: Some(value),
-            ..self
-        }
-    }
-
-    pub fn build(self) -> BackgroundConfig {
-        BackgroundConfig {
-            num_shards: self.num_shards.unwrap_or(8),
-            batch_size: self.batch_size,
-            batch_length: self.batch_length,
-            failure_mode: self.failure_mode.unwrap_or(BackpressureMode::Block),
-            max_buffer_size: self.max_buffer_size,
-            linger_time: self.linger_time.unwrap_or(IggyDuration::from(1000)),
-            error_callback: Arc::new(self.error_callback),
-            sharding: self.sharding,
-            max_in_flight: self.max_in_flight,
-        }
-    }
-}
-
-pub struct SyncBuilder {
-    batch_length: Option<usize>,
-    linger_time: Option<IggyDuration>,
-}
-
-impl Default for SyncBuilder {
-    fn default() -> Self {
-        Self {
-            batch_length: Some(1000),
-            linger_time: Some(IggyDuration::from(1000)),
-        }
-    }
-}
-
-impl SyncBuilder {
-    /// Sets the number of messages to batch before sending them, can be 
combined with `interval`.
-    pub fn batch_length(self, batch_length: u32) -> Self {
-        Self {
-            batch_length: if batch_length == 0 {
-                None
-            } else {
-                Some(batch_length.min(MAX_BATCH_LENGTH as u32) as usize)
-            },
-            ..self
-        }
-    }
-
-    /// Clears the batch size.
-    pub fn without_batch_length(self) -> Self {
-        Self {
-            batch_length: None,
-            ..self
-        }
-    }
-
-    /// Sets the interval between sending the messages, can be combined with 
`batch_length`.
-    pub fn linger_time(self, interval: IggyDuration) -> Self {
-        Self {
-            linger_time: Some(interval),
-            ..self
-        }
-    }
-
-    /// Clears the interval.
-    pub fn without_linger_time(self) -> Self {
-        Self {
-            linger_time: None,
-            ..self
-        }
-    }
-
-    pub fn build(self) -> SyncConfig {
-        SyncConfig {
-            batch_length: self.batch_length.unwrap_or(MAX_BATCH_LENGTH),
-            linger_time: self.linger_time,
-        }
-    }
-}
+use bon::Builder;
+
+// pub struct BackgroundBuilder {
+//     num_shards: Option<usize>,
+//     batch_size: Option<usize>,
+//     batch_length: Option<usize>,
+//     failure_mode: Option<BackpressureMode>,
+//     max_buffer_size: Option<IggyByteSize>,
+//     linger_time: Option<IggyDuration>,
+//     max_in_flight: Option<usize>,
+//     error_callback: Box<dyn ErrorCallback>,
+//     sharding: Box<dyn Sharding>,
+// }
+
+// impl Default for BackgroundBuilder {
+//     fn default() -> Self {
+//         let num_shards = default_shard_count();
+//         BackgroundBuilder {
+//             num_shards: Some(num_shards),
+//             sharding: Box::new(BalancedSharding::default()),
+//             error_callback: Box::new(LogErrorCallback),
+//             batch_size: Some(1_048_576),
+//             batch_length: Some(1000),
+//             failure_mode: Some(BackpressureMode::Block),
+//             max_buffer_size: Some(IggyByteSize::from(32 * 1_048_576)),
+//             linger_time: Some(IggyDuration::from(1000)),
+//             max_in_flight: Some(num_shards * num_shards),
+//         }
+//     }
+// }
+
+// impl BackgroundBuilder {
+//     /// Sets the number of messages to batch before sending them, can be 
combined with `interval`.
+//     pub fn batch_length(self, batch_length: u32) -> Self {
+//         Self {
+//             batch_length: if batch_length == 0 {
+//                 None
+//             } else {
+//                 Some(batch_length.min(MAX_BATCH_LENGTH as u32) as usize)
+//             },
+//             ..self
+//         }
+//     }
+
+//     /// Clears the batch size.
+//     pub fn without_batch_length(self) -> Self {
+//         Self {
+//             batch_length: None,
+//             ..self
+//         }
+//     }
+
+//     /// Sets the interval between sending the messages, can be combined 
with `batch_length`.
+//     pub fn linger_time(self, interval: IggyDuration) -> Self {
+//         Self {
+//             linger_time: Some(interval),
+//             ..self
+//         }
+//     }
+
+//     /// Clears the interval.
+//     pub fn without_linger_time(self) -> Self {
+//         Self {
+//             linger_time: None,
+//             ..self
+//         }
+//     }
+
+//     /// Sets the number of shards (background workers).
+//     pub fn num_shards(self, value: usize) -> Self {
+//         Self {
+//             num_shards: Some(value),
+//             ..self
+//         }
+//     }
+
+//     /// Sets the maximum size of a batch in bytes.
+//     pub fn batch_size(self, value: usize) -> Self {
+//         Self {
+//             batch_size: Some(value),
+//             ..self
+//         }
+//     }
+
+//     /// Sets the sharding strategy.
+//     /// You can pass a custom implementation that implements the `Sharding` 
trait.
+//     pub fn sharding(self, sharding: Box<dyn Sharding>) -> Self {
+//         Self { sharding, ..self }
+//     }
+
+//     /// Sets the maximum buffer size for all in-flight messages (in bytes).
+//     pub fn max_buffer_size(self, value: IggyByteSize) -> Self {
+//         Self {
+//             max_buffer_size: Some(value),
+//             ..self
+//         }
+//     }
+
+//     /// Sets the failure mode behavior (e.g., block, fail immediately, 
timeout).
+//     pub fn failure_mode(self, mode: BackpressureMode) -> Self {
+//         Self {
+//             failure_mode: Some(mode),
+//             ..self
+//         }
+//     }
+
+//     /// Sets the error callback for handling background sending errors.
+//     pub fn error_callback(self, callback: Box<dyn ErrorCallback>) -> Self {
+//         Self {
+//             error_callback: callback,
+//             ..self
+//         }
+//     }
+
+//     /// Sets the maximum number of in-flight batches/messages.
+//     pub fn max_in_flight(self, value: usize) -> Self {
+//         Self {
+//             max_in_flight: Some(value),
+//             ..self
+//         }
+//     }
+
+//     pub fn build(self) -> BackgroundConfig {
+//         BackgroundConfig {
+//             num_shards: self.num_shards.unwrap_or(8),
+//             batch_size: self.batch_size,
+//             batch_length: self.batch_length,
+//             failure_mode: 
self.failure_mode.unwrap_or(BackpressureMode::Block),
+//             max_buffer_size: self.max_buffer_size,
+//             linger_time: 
self.linger_time.unwrap_or(IggyDuration::from(1000)),
+//             error_callback: Arc::new(self.error_callback),
+//             sharding: self.sharding,
+//             max_in_flight: self.max_in_flight,
+//         }
+//     }
+// }
+
+// #[derive(Builder)]
+// pub struct SyncBuilder {
+//     batch_length: Option<usize>,
+//     linger_time: Option<IggyDuration>,
+// }
+
+// impl Default for SyncBuilder {
+//     fn default() -> Self {
+//         Self {
+//             batch_length: Some(1000),
+//             linger_time: Some(IggyDuration::from(1000)),
+//         }
+//     }
+// }
+
+// impl SyncBuilder {
+//     /// Sets the number of messages to batch before sending them, can be 
combined with `interval`.
+//     pub fn batch_length(self, batch_length: u32) -> Self {
+//         Self {
+//             batch_length: if batch_length == 0 {
+//                 None
+//             } else {
+//                 Some(batch_length.min(MAX_BATCH_LENGTH as u32) as usize)
+//             },
+//             ..self
+//         }
+//     }
+
+//     /// Clears the batch size.
+//     pub fn without_batch_length(self) -> Self {
+//         Self {
+//             batch_length: None,
+//             ..self
+//         }
+//     }
+
+//     /// Sets the interval between sending the messages, can be combined 
with `batch_length`.
+//     pub fn linger_time(self, interval: IggyDuration) -> Self {
+//         Self {
+//             linger_time: Some(interval),
+//             ..self
+//         }
+//     }
+
+//     /// Clears the interval.
+//     pub fn without_linger_time(self) -> Self {
+//         Self {
+//             linger_time: None,
+//             ..self
+//         }
+//     }
+
+//     pub fn build(self) -> SyncConfig {
+//         SyncConfig {
+//             batch_length: self.batch_length.unwrap_or(MAX_BATCH_LENGTH),
+//             linger_time: self.linger_time,
+//         }
+//     }
+// }
 
 pub enum SendMode {
     Sync(SyncConfig),
@@ -231,7 +233,7 @@ pub enum SendMode {
 
 impl Default for SendMode {
     fn default() -> Self {
-        SendMode::Sync(SyncBuilder::default().build())
+        SendMode::Sync(SyncConfig::builder().build())
     }
 }
 
@@ -398,20 +400,30 @@ impl IggyProducerBuilder {
         }
     }
 
+    pub fn sync(mut self, config: SyncConfig) -> Self {
+        self.mode = SendMode::Sync(config);
+        self
+    }
+
+    pub fn background(mut self, config: BackgroundConfig) -> Self {
+        self.mode = SendMode::Background(config);
+        self
+    }
+
     /// Configures the producer to use synchronous (immediate) sending mode.
     ///
     /// In sync mode, messages are sent immediately on `.send()` without 
background buffering.
     ///
     /// # Arguments
     /// * `f` - A closure that modifies the `SyncBuilder` configuration.
-    pub fn sync<F>(mut self, f: F) -> Self
-    where
-        F: FnOnce(SyncBuilder) -> SyncBuilder,
-    {
-        let cfg = f(SyncBuilder::default()).build();
-        self.mode = SendMode::Sync(cfg);
-        self
-    }
+    // pub fn sync<F>(mut self, f: F) -> Self
+    // where
+    //     F: FnOnce(SyncConfigBuilder) -> SyncConfigBuilder,
+    // {
+    //     let cfg = f(SyncConfig::builder()).build();
+    //     self.mode = SendMode::Sync(cfg);
+    //     self
+    // }
 
     /// Configures the producer to use background (asynchronous) sending mode.
     ///
@@ -419,14 +431,14 @@ impl IggyProducerBuilder {
     ///
     /// # Arguments
     /// * `f` - A closure that modifies the `BackgroundBuilder` configuration.
-    pub fn background<F>(mut self, f: F) -> Self
-    where
-        F: FnOnce(BackgroundBuilder) -> BackgroundBuilder,
-    {
-        let cfg = f(BackgroundBuilder::default()).build();
-        self.mode = SendMode::Background(cfg);
-        self
-    }
+    // pub fn background<F>(mut self, f: F) -> Self
+    // where
+    //     F: FnOnce(BackgroundBuilder) -> BackgroundBuilder,
+    // {
+    //     let cfg = f(BackgroundBuilder::default()).build();
+    //     self.mode = SendMode::Background(cfg);
+    //     self
+    // }
 
     pub fn build(self) -> IggyProducer {
         IggyProducer::new(
diff --git a/core/sdk/src/clients/producer_config.rs 
b/core/sdk/src/clients/producer_config.rs
index 0ba0e6ef..2848e986 100644
--- a/core/sdk/src/clients/producer_config.rs
+++ b/core/sdk/src/clients/producer_config.rs
@@ -17,10 +17,11 @@
  */
 use std::sync::Arc;
 
+use bon::Builder;
 use iggy_common::{IggyByteSize, IggyDuration};
 
-use crate::clients::producer_error_callback::ErrorCallback;
-use crate::clients::producer_sharding::Sharding;
+use crate::clients::producer_error_callback::{ErrorCallback, LogErrorCallback};
+use crate::clients::producer_sharding::{BalancedSharding, Sharding};
 
 #[derive(Debug, Clone)]
 /// Determines how the `send_messages` API should behave when problem is 
encountered
@@ -33,21 +34,37 @@ pub enum BackpressureMode {
     FailImmediately,
 }
 
-#[derive(Debug)]
+#[derive(Debug, Builder)]
 pub struct BackgroundConfig {
+    #[builder(default = default_shard_count())]
     pub num_shards: usize,
-    pub batch_size: Option<usize>,
-    pub batch_length: Option<usize>,
-    pub failure_mode: BackpressureMode,
-    pub max_buffer_size: Option<IggyByteSize>,
-    pub max_in_flight: Option<usize>,
+    #[builder(default = IggyDuration::from(1000))]
     pub linger_time: IggyDuration,
+    #[builder(default = Arc::new(Box::new(LogErrorCallback)))]
     pub error_callback: Arc<Box<dyn ErrorCallback + Send + Sync>>,
+    #[builder(default = Box::new(BalancedSharding::default()))]
     pub sharding: Box<dyn Sharding + Send + Sync>,
+    #[builder(default = 1_048_576)]
+    pub batch_size: usize,
+    #[builder(default = 1000)]
+    pub batch_length: usize,
+    #[builder(default = BackpressureMode::Block)]
+    pub failure_mode: BackpressureMode,
+    #[builder(default = IggyByteSize::from(32 * 1_048_576))]
+    pub max_buffer_size: IggyByteSize,
+    #[builder(default = default_shard_count() * 2)]
+    pub max_in_flight: usize,
 }
 
-#[derive(Clone)]
+#[derive(Clone, Builder)]
 pub struct SyncConfig {
-    pub batch_length: usize,
-    pub linger_time: Option<IggyDuration>,
+    #[builder(default = 1000)]
+    pub batch_length: u32,
+    #[builder(default = IggyDuration::from(1000))]
+    pub linger_time: IggyDuration,
+}
+
+fn default_shard_count() -> usize {
+    let cpus = num_cpus::get();
+    cpus.clamp(2, 16)
 }
diff --git a/core/sdk/src/clients/producer_dispatcher.rs 
b/core/sdk/src/clients/producer_dispatcher.rs
index c56df46d..12bb4e8a 100644
--- a/core/sdk/src/clients/producer_dispatcher.rs
+++ b/core/sdk/src/clients/producer_dispatcher.rs
@@ -74,21 +74,15 @@ impl ProducerDispatcher {
             shards.push(Shard::new(core.clone(), config.clone(), 
err_tx.clone()));
         }
 
-        let bytes_permit = match config.max_buffer_size {
-            Some(val) => val.as_bytes_u32() as usize,
-            None => usize::MAX,
-        };
-        let slot_permit = match config.max_in_flight {
-            Some(val) => val,
-            None => usize::MAX,
-        };
+        let bytes_permit = config.max_buffer_size.as_bytes_usize();
+        let max_in_flight = config.max_in_flight;
 
         Self {
             shards,
             config,
             closed: AtomicBool::new(false),
             bytes_permit: Arc::new(Semaphore::new(bytes_permit)),
-            slots_permit: Arc::new(Semaphore::new(slot_permit)),
+            slots_permit: Arc::new(Semaphore::new(max_in_flight)),
             shutdown_notify,
             _join_handle: handle,
         }
@@ -222,7 +216,6 @@ mod tests {
     use tokio::time::sleep;
 
     use crate::clients::producer::MockProducerCoreBackend;
-    use crate::clients::producer_builder::BackgroundBuilder;
     use crate::clients::producer_error_callback::ErrorCallback;
     use crate::clients::producer_sharding::Sharding;
 
diff --git a/core/sdk/src/stream_builder/build/build_iggy_producer.rs 
b/core/sdk/src/stream_builder/build/build_iggy_producer.rs
index afe8d122..6eabe919 100644
--- a/core/sdk/src/stream_builder/build/build_iggy_producer.rs
+++ b/core/sdk/src/stream_builder/build/build_iggy_producer.rs
@@ -18,6 +18,7 @@
 
 use crate::clients::client::IggyClient;
 use crate::clients::producer::IggyProducer;
+use crate::clients::producer_config::SyncConfig;
 use crate::prelude::{IggyError, IggyExpiry, MaxTopicSize};
 use crate::stream_builder::IggyProducerConfig;
 use tracing::{error, trace};
@@ -65,7 +66,7 @@ pub(crate) async fn build_iggy_producer(
             IggyExpiry::ServerDefault,
             MaxTopicSize::ServerDefault,
         )
-        .sync(|b| b.batch_length(batch_length).linger_time(linger_time));
+        
.sync(SyncConfig::builder().batch_length(batch_length).linger_time(linger_time).build());
 
     if let Some(encryptor) = config.encryptor() {
         builder = builder.encryptor(encryptor);

Reply via email to