haze518 commented on code in PR #1838:
URL: https://github.com/apache/iggy/pull/1838#discussion_r2134992602
##########
core/sdk/src/clients/producer_builder.rs:
##########
@@ -15,27 +15,234 @@
// specific language governing permissions and limitations
// under the License.
-use super::MAX_BATCH_SIZE;
+use super::MAX_BATCH_LENGTH;
+use crate::clients::producer_config::{BackgroundConfig, BackpressureMode,
SyncConfig};
+use crate::clients::producer_error_callback::ErrorCallback;
+use crate::clients::producer_error_callback::LogErrorCallback;
+use crate::clients::producer_sharding::{BalancedSharding, Sharding};
use crate::prelude::IggyProducer;
use iggy_binary_protocol::Client;
use iggy_common::locking::IggySharedMut;
use iggy_common::{
- EncryptorKind, Identifier, IggyDuration, IggyExpiry, MaxTopicSize,
Partitioner, Partitioning,
+ EncryptorKind, Identifier, IggyByteSize, IggyDuration, IggyExpiry,
MaxTopicSize, Partitioner,
+ Partitioning,
};
use std::sync::Arc;
-#[derive(Debug)]
+pub struct BackgroundBuilder {
+ num_shards: Option<usize>,
+ batch_size: Option<usize>,
+ batch_length: Option<usize>,
+ failure_mode: Option<BackpressureMode>,
+ max_buffer_size: Option<IggyByteSize>,
+ linger_time: Option<IggyDuration>,
+ max_in_flight: Option<usize>,
+
+ error_callback: Box<dyn ErrorCallback>,
+ sharding: Box<dyn Sharding>,
+}
+
+impl Default for BackgroundBuilder {
+ fn default() -> Self {
+ let num_shards = default_shard_count();
+ BackgroundBuilder {
+ num_shards: Some(num_shards),
+ sharding: Box::new(BalancedSharding::default()),
+ error_callback: Box::new(LogErrorCallback),
+ batch_size: Some(1_048_576),
+ batch_length: Some(1000),
+ failure_mode: Some(BackpressureMode::Block),
+ max_buffer_size: Some(IggyByteSize::from(32 * 1_048_576)),
+ linger_time: Some(IggyDuration::from(1000)),
+ max_in_flight: Some(num_shards * num_shards),
+ }
+ }
+}
+
+impl BackgroundBuilder {
+ /// Sets the number of messages to batch before sending them, can be
combined with `interval`.
+ pub fn batch_length(self, batch_length: u32) -> Self {
+ Self {
+ batch_length: if batch_length == 0 {
+ None
+ } else {
+ Some(batch_length.min(MAX_BATCH_LENGTH as u32) as usize)
+ },
+ ..self
+ }
+ }
+
+ /// Clears the batch size.
+ pub fn without_batch_length(self) -> Self {
+ Self {
+ batch_length: None,
+ ..self
+ }
+ }
+
+ /// Sets the interval between sending the messages, can be combined with
`batch_length`.
+ pub fn linger_time(self, interval: IggyDuration) -> Self {
+ Self {
+ linger_time: Some(interval),
+ ..self
+ }
+ }
+
+ /// Clears the interval.
+ pub fn without_linger_time(self) -> Self {
+ Self {
+ linger_time: None,
+ ..self
+ }
+ }
+
+ /// Sets the number of shards (background workers).
+ pub fn num_shards(self, value: usize) -> Self {
+ Self {
+ num_shards: Some(value),
+ ..self
+ }
+ }
+
+ /// Sets the maximum size of a batch in bytes.
+ pub fn batch_size(self, value: usize) -> Self {
+ Self {
+ batch_size: Some(value),
+ ..self
+ }
+ }
+
+ /// Sets the sharding strategy.
+ /// You can pass a custom implementation that implements the `Sharding`
trait.
+ pub fn sharding(self, sharding: Box<dyn Sharding>) -> Self {
+ Self { sharding, ..self }
+ }
+
+ /// Sets the maximum buffer size for all in-flight messages (in bytes).
+ pub fn max_buffer_size(self, value: IggyByteSize) -> Self {
+ Self {
+ max_buffer_size: Some(value),
+ ..self
+ }
+ }
+
+ /// Sets the failure mode behavior (e.g., block, fail immediately,
timeout).
+ pub fn failure_mode(self, mode: BackpressureMode) -> Self {
+ Self {
+ failure_mode: Some(mode),
+ ..self
+ }
+ }
+
+ /// Sets the error callback for handling background sending errors.
+ pub fn error_callback(self, callback: Box<dyn ErrorCallback>) -> Self {
+ Self {
+ error_callback: callback,
+ ..self
+ }
+ }
+
+ /// Sets the maximum number of in-flight batches/messages.
+ pub fn max_in_flight(self, value: usize) -> Self {
+ Self {
+ max_in_flight: Some(value),
+ ..self
+ }
+ }
+
+ pub fn build(self) -> BackgroundConfig {
+ BackgroundConfig {
+ num_shards: self.num_shards.unwrap_or(8),
+ batch_size: self.batch_size,
+ batch_length: self.batch_length,
+ failure_mode: self.failure_mode.unwrap_or(BackpressureMode::Block),
+ max_buffer_size: self.max_buffer_size,
+ linger_time: self.linger_time.unwrap_or(IggyDuration::from(1000)),
+ error_callback: Arc::new(self.error_callback),
+ sharding: self.sharding,
+ max_in_flight: self.max_in_flight,
+ }
+ }
+}
+
+pub struct SyncBuilder {
+ batch_length: Option<usize>,
+ linger_time: Option<IggyDuration>,
+}
+
+impl Default for SyncBuilder {
+ fn default() -> Self {
+ Self {
+ batch_length: Some(1000),
+ linger_time: Some(IggyDuration::from(1000)),
+ }
+ }
+}
+
+impl SyncBuilder {
Review Comment:
removed SyncBuilder and BackgroundBuilder, and added bon for SyncConfig /
BackgroundConfig
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]