hubcio commented on code in PR #1838:
URL: https://github.com/apache/iggy/pull/1838#discussion_r2132532321


##########
core/sdk/src/clients/producer_builder.rs:
##########
@@ -15,27 +15,234 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use super::MAX_BATCH_SIZE;
+use super::MAX_BATCH_LENGTH;
+use crate::clients::producer_config::{BackgroundConfig, BackpressureMode, 
SyncConfig};
+use crate::clients::producer_error_callback::ErrorCallback;
+use crate::clients::producer_error_callback::LogErrorCallback;
+use crate::clients::producer_sharding::{BalancedSharding, Sharding};
 use crate::prelude::IggyProducer;
 use iggy_binary_protocol::Client;
 use iggy_common::locking::IggySharedMut;
 use iggy_common::{
-    EncryptorKind, Identifier, IggyDuration, IggyExpiry, MaxTopicSize, 
Partitioner, Partitioning,
+    EncryptorKind, Identifier, IggyByteSize, IggyDuration, IggyExpiry, 
MaxTopicSize, Partitioner,
+    Partitioning,
 };
 use std::sync::Arc;
 
-#[derive(Debug)]
+pub struct BackgroundBuilder {
+    num_shards: Option<usize>,
+    batch_size: Option<usize>,
+    batch_length: Option<usize>,
+    failure_mode: Option<BackpressureMode>,
+    max_buffer_size: Option<IggyByteSize>,
+    linger_time: Option<IggyDuration>,
+    max_in_flight: Option<usize>,
+
+    error_callback: Box<dyn ErrorCallback>,
+    sharding: Box<dyn Sharding>,
+}
+
+impl Default for BackgroundBuilder {
+    fn default() -> Self {
+        let num_shards = default_shard_count();
+        BackgroundBuilder {
+            num_shards: Some(num_shards),
+            sharding: Box::new(BalancedSharding::default()),
+            error_callback: Box::new(LogErrorCallback),
+            batch_size: Some(1_048_576),
+            batch_length: Some(1000),
+            failure_mode: Some(BackpressureMode::Block),
+            max_buffer_size: Some(IggyByteSize::from(32 * 1_048_576)),
+            linger_time: Some(IggyDuration::from(1000)),
+            max_in_flight: Some(num_shards * num_shards),
+        }
+    }
+}
+
+impl BackgroundBuilder {
+    /// Sets the number of messages to batch before sending them, can be 
combined with `interval`.
+    pub fn batch_length(self, batch_length: u32) -> Self {
+        Self {
+            batch_length: if batch_length == 0 {
+                None
+            } else {
+                Some(batch_length.min(MAX_BATCH_LENGTH as u32) as usize)
+            },
+            ..self
+        }
+    }
+
+    /// Clears the batch size.
+    pub fn without_batch_length(self) -> Self {
+        Self {
+            batch_length: None,
+            ..self
+        }
+    }
+
+    /// Sets the interval between sending the messages, can be combined with 
`batch_length`.
+    pub fn linger_time(self, interval: IggyDuration) -> Self {
+        Self {
+            linger_time: Some(interval),
+            ..self
+        }
+    }
+
+    /// Clears the interval.
+    pub fn without_linger_time(self) -> Self {
+        Self {
+            linger_time: None,
+            ..self
+        }
+    }
+
+    /// Sets the number of shards (background workers).
+    pub fn num_shards(self, value: usize) -> Self {
+        Self {
+            num_shards: Some(value),
+            ..self
+        }
+    }
+
+    /// Sets the maximum size of a batch in bytes.
+    pub fn batch_size(self, value: usize) -> Self {
+        Self {
+            batch_size: Some(value),
+            ..self
+        }
+    }
+
+    /// Sets the sharding strategy.
+    /// You can pass a custom implementation that implements the `Sharding` 
trait.
+    pub fn sharding(self, sharding: Box<dyn Sharding>) -> Self {
+        Self { sharding, ..self }
+    }
+
+    /// Sets the maximum buffer size for all in-flight messages (in bytes).
+    pub fn max_buffer_size(self, value: IggyByteSize) -> Self {
+        Self {
+            max_buffer_size: Some(value),
+            ..self
+        }
+    }
+
+    /// Sets the failure mode behavior (e.g., block, fail immediately, 
timeout).
+    pub fn failure_mode(self, mode: BackpressureMode) -> Self {
+        Self {
+            failure_mode: Some(mode),
+            ..self
+        }
+    }
+
+    /// Sets the error callback for handling background sending errors.
+    pub fn error_callback(self, callback: Box<dyn ErrorCallback>) -> Self {
+        Self {
+            error_callback: callback,
+            ..self
+        }
+    }
+
+    /// Sets the maximum number of in-flight batches/messages.
+    pub fn max_in_flight(self, value: usize) -> Self {
+        Self {
+            max_in_flight: Some(value),
+            ..self
+        }
+    }
+
+    pub fn build(self) -> BackgroundConfig {
+        BackgroundConfig {
+            num_shards: self.num_shards.unwrap_or(8),
+            batch_size: self.batch_size,
+            batch_length: self.batch_length,
+            failure_mode: self.failure_mode.unwrap_or(BackpressureMode::Block),
+            max_buffer_size: self.max_buffer_size,
+            linger_time: self.linger_time.unwrap_or(IggyDuration::from(1000)),
+            error_callback: Arc::new(self.error_callback),
+            sharding: self.sharding,
+            max_in_flight: self.max_in_flight,
+        }
+    }
+}
+
+pub struct SyncBuilder {
+    batch_length: Option<usize>,
+    linger_time: Option<IggyDuration>,
+}
+
+impl Default for SyncBuilder {
+    fn default() -> Self {
+        Self {
+            batch_length: Some(1000),
+            linger_time: Some(IggyDuration::from(1000)),
+        }
+    }
+}
+
+impl SyncBuilder {

Review Comment:
   please use bon builder, same as in `IggyMessage`



##########
core/sdk/src/clients/producer_builder.rs:
##########
@@ -15,27 +15,234 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use super::MAX_BATCH_SIZE;
+use super::MAX_BATCH_LENGTH;
+use crate::clients::producer_config::{BackgroundConfig, BackpressureMode, 
SyncConfig};
+use crate::clients::producer_error_callback::ErrorCallback;
+use crate::clients::producer_error_callback::LogErrorCallback;
+use crate::clients::producer_sharding::{BalancedSharding, Sharding};
 use crate::prelude::IggyProducer;
 use iggy_binary_protocol::Client;
 use iggy_common::locking::IggySharedMut;
 use iggy_common::{
-    EncryptorKind, Identifier, IggyDuration, IggyExpiry, MaxTopicSize, 
Partitioner, Partitioning,
+    EncryptorKind, Identifier, IggyByteSize, IggyDuration, IggyExpiry, 
MaxTopicSize, Partitioner,
+    Partitioning,
 };
 use std::sync::Arc;
 
-#[derive(Debug)]
+pub struct BackgroundBuilder {
+    num_shards: Option<usize>,
+    batch_size: Option<usize>,
+    batch_length: Option<usize>,
+    failure_mode: Option<BackpressureMode>,
+    max_buffer_size: Option<IggyByteSize>,
+    linger_time: Option<IggyDuration>,
+    max_in_flight: Option<usize>,
+
+    error_callback: Box<dyn ErrorCallback>,
+    sharding: Box<dyn Sharding>,
+}
+
+impl Default for BackgroundBuilder {
+    fn default() -> Self {
+        let num_shards = default_shard_count();
+        BackgroundBuilder {
+            num_shards: Some(num_shards),

Review Comment:
   what's the reasoning behind sharding?



##########
core/sdk/src/clients/mod.rs:
##########
@@ -32,6 +32,10 @@ pub mod consumer;
 pub mod consumer_builder;
 pub mod producer;
 pub mod producer_builder;
+pub mod producer_config;
+pub mod producer_dispatcher;
+pub mod producer_error_callback;
+pub mod producer_sharding;
 
 const ORDERING: std::sync::atomic::Ordering = 
std::sync::atomic::Ordering::SeqCst;
-const MAX_BATCH_SIZE: usize = 1000000;
+const MAX_BATCH_LENGTH: usize = 1000000;

Review Comment:
   we can send batch which has million messages? 🦀 



##########
core/sdk/src/clients/producer_builder.rs:
##########
@@ -15,27 +15,234 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use super::MAX_BATCH_SIZE;
+use super::MAX_BATCH_LENGTH;
+use crate::clients::producer_config::{BackgroundConfig, BackpressureMode, 
SyncConfig};
+use crate::clients::producer_error_callback::ErrorCallback;
+use crate::clients::producer_error_callback::LogErrorCallback;
+use crate::clients::producer_sharding::{BalancedSharding, Sharding};
 use crate::prelude::IggyProducer;
 use iggy_binary_protocol::Client;
 use iggy_common::locking::IggySharedMut;
 use iggy_common::{
-    EncryptorKind, Identifier, IggyDuration, IggyExpiry, MaxTopicSize, 
Partitioner, Partitioning,
+    EncryptorKind, Identifier, IggyByteSize, IggyDuration, IggyExpiry, 
MaxTopicSize, Partitioner,
+    Partitioning,
 };
 use std::sync::Arc;
 
-#[derive(Debug)]
+pub struct BackgroundBuilder {

Review Comment:
   unfortunately async doesn't sound great in the Rust ecosystem. I think I 
like background.



##########
core/common/src/error/iggy_error.rs:
##########
@@ -16,8 +16,10 @@
  * under the License.
  */
 
-use crate::utils::byte_size::IggyByteSize;
+use std::sync::Arc;
+

Review Comment:
   please merge those empty lines in the `use` ;) i hate that rustfmt doesnt do 
that by default



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to