spetz commented on code in PR #1838:
URL: https://github.com/apache/iggy/pull/1838#discussion_r2132135842


##########
core/sdk/src/clients/producer_builder.rs:
##########
@@ -226,21 +398,46 @@ impl IggyProducerBuilder {
         }
     }
 
-    /// Builds the producer.
+    /// Configures the producer to use synchronous (immediate) sending mode.
     ///
-    /// Note: After building the producer, `init()` must be invoked before 
producing messages.
+    /// In sync mode, messages are sent immediately on `.send()` without 
background buffering.
+    ///
+    /// # Arguments
+    /// * `f` - A closure that modifies the `SyncBuilder` configuration.
+    pub fn sync<F>(mut self, f: F) -> Self
+    where
+        F: FnOnce(SyncBuilder) -> SyncBuilder,
+    {
+        let cfg = f(SyncBuilder::default()).build();
+        self.mode = SendMode::Sync(cfg);
+        self
+    }
+
+    /// Configures the producer to use background (asynchronous) sending mode.
+    ///
+    /// In background mode, messages are buffered and sent in batches via 
background tasks.
+    ///
+    /// # Arguments
+    /// * `f` - A closure that modifies the `BackgroundBuilder` configuration.
+    pub fn background<F>(mut self, f: F) -> Self

Review Comment:
   Same here, regarding `f` arg.



##########
core/sdk/src/clients/producer_builder.rs:
##########
@@ -249,6 +446,12 @@ impl IggyProducerBuilder {
             self.topic_max_size,
             self.send_retries_count,
             self.send_retries_interval,
+            self.mode,
         )
     }
 }
+
+fn default_shard_count() -> usize {
+    let cpus = num_cpus::get();
+    cpus.clamp(2, 16)

Review Comment:
   Would that be possible to go above 16 as max value e.g. on some very 
powerful machine?



##########
core/sdk/src/clients/producer_builder.rs:
##########
@@ -15,27 +15,234 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use super::MAX_BATCH_SIZE;
+use super::MAX_BATCH_LENGTH;
+use crate::clients::producer_config::{BackgroundConfig, BackpressureMode, 
SyncConfig};
+use crate::clients::producer_error_callback::ErrorCallback;
+use crate::clients::producer_error_callback::LogErrorCallback;
+use crate::clients::producer_sharding::{BalancedSharding, Sharding};
 use crate::prelude::IggyProducer;
 use iggy_binary_protocol::Client;
 use iggy_common::locking::IggySharedMut;
 use iggy_common::{
-    EncryptorKind, Identifier, IggyDuration, IggyExpiry, MaxTopicSize, 
Partitioner, Partitioning,
+    EncryptorKind, Identifier, IggyByteSize, IggyDuration, IggyExpiry, 
MaxTopicSize, Partitioner,
+    Partitioning,
 };
 use std::sync::Arc;
 
-#[derive(Debug)]
+pub struct BackgroundBuilder {
+    num_shards: Option<usize>,
+    batch_size: Option<usize>,
+    batch_length: Option<usize>,
+    failure_mode: Option<BackpressureMode>,
+    max_buffer_size: Option<IggyByteSize>,
+    linger_time: Option<IggyDuration>,
+    max_in_flight: Option<usize>,
+
+    error_callback: Box<dyn ErrorCallback>,
+    sharding: Box<dyn Sharding>,
+}
+
+impl Default for BackgroundBuilder {
+    fn default() -> Self {
+        let num_shards = default_shard_count();
+        BackgroundBuilder {
+            num_shards: Some(num_shards),
+            sharding: Box::new(BalancedSharding::default()),
+            error_callback: Box::new(LogErrorCallback),
+            batch_size: Some(1_048_576),

Review Comment:
   Maybe we could keep this number as const value? It's reused in the next line 
too.



##########
core/sdk/src/clients/producer_builder.rs:
##########
@@ -226,21 +398,46 @@ impl IggyProducerBuilder {
         }
     }
 
-    /// Builds the producer.
+    /// Configures the producer to use synchronous (immediate) sending mode.
     ///
-    /// Note: After building the producer, `init()` must be invoked before 
producing messages.
+    /// In sync mode, messages are sent immediately on `.send()` without 
background buffering.
+    ///
+    /// # Arguments
+    /// * `f` - A closure that modifies the `SyncBuilder` configuration.
+    pub fn sync<F>(mut self, f: F) -> Self

Review Comment:
   Maybe let's call the parameter something like `factory` (or similar), so 
it's a bit more meaningful.



##########
core/sdk/src/clients/mod.rs:
##########
@@ -32,6 +32,10 @@ pub mod consumer;
 pub mod consumer_builder;
 pub mod producer;
 pub mod producer_builder;
+pub mod producer_config;
+pub mod producer_dispatcher;
+pub mod producer_error_callback;
+pub mod producer_sharding;
 
 const ORDERING: std::sync::atomic::Ordering = 
std::sync::atomic::Ordering::SeqCst;
-const MAX_BATCH_SIZE: usize = 1000000;
+const MAX_BATCH_LENGTH: usize = 1000000;

Review Comment:
   One can always try :D



##########
core/sdk/src/clients/producer_builder.rs:
##########
@@ -15,27 +15,234 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use super::MAX_BATCH_SIZE;
+use super::MAX_BATCH_LENGTH;
+use crate::clients::producer_config::{BackgroundConfig, BackpressureMode, 
SyncConfig};
+use crate::clients::producer_error_callback::ErrorCallback;
+use crate::clients::producer_error_callback::LogErrorCallback;
+use crate::clients::producer_sharding::{BalancedSharding, Sharding};
 use crate::prelude::IggyProducer;
 use iggy_binary_protocol::Client;
 use iggy_common::locking::IggySharedMut;
 use iggy_common::{
-    EncryptorKind, Identifier, IggyDuration, IggyExpiry, MaxTopicSize, 
Partitioner, Partitioning,
+    EncryptorKind, Identifier, IggyByteSize, IggyDuration, IggyExpiry, 
MaxTopicSize, Partitioner,
+    Partitioning,
 };
 use std::sync::Arc;
 
-#[derive(Debug)]
+pub struct BackgroundBuilder {

Review Comment:
   Yeah, something like `direct`, `immediate`, `instant` etc. makes sense I 
think :)



##########
core/sdk/src/clients/producer_sharding.rs:
##########
@@ -0,0 +1,437 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use std::sync::Arc;
+use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
+
+use iggy_common::{Identifier, IggyByteSize, IggyError, IggyMessage, 
Partitioning, Sizeable};
+use tokio::sync::{Notify, OwnedSemaphorePermit};
+use tokio::task::JoinHandle;
+use tracing::error;
+
+use crate::clients::producer::ProducerCoreBackend;
+use crate::clients::producer_config::BackgroundConfig;
+use crate::clients::producer_error_callback::ErrorCtx;
+
+/// A strategy for distributing messages across shards.
+///
+/// Implementors of this trait define how to choose a shard for a given batch 
of messages.
+/// This allows customizing message routing based on message content, 
stream/topic identifiers,
+/// or round-robin load balancing.
+pub trait Sharding: Send + Sync + std::fmt::Debug + 'static {
+    fn pick_shard(
+        &self,
+        num_shards: usize,
+        messages: &[IggyMessage],
+        stream: &Identifier,
+        topic: &Identifier,
+    ) -> usize;
+}
+
+/// A simple round-robin sharding strategy.
+/// Distributes messages evenly across all shards by incrementing an atomic 
counter.
+#[derive(Default, Debug)]
+pub struct BalancedSharding {
+    counter: AtomicUsize,
+}
+
+impl Sharding for BalancedSharding {
+    /// Picks the next shard in a round-robin fashion.
+    fn pick_shard(
+        &self,
+        num_shards: usize,
+        _: &[IggyMessage],
+        _: &Identifier,
+        _: &Identifier,
+    ) -> usize {
+        self.counter.fetch_add(1, Ordering::Relaxed) % num_shards
+    }
+}
+
+#[derive(Debug)]
+pub struct ShardMessage {
+    pub stream: Arc<Identifier>,
+    pub topic: Arc<Identifier>,
+    pub messages: Vec<IggyMessage>,
+    pub partitioning: Option<Arc<Partitioning>>,
+}
+
+impl Sizeable for ShardMessage {
+    fn get_size_bytes(&self) -> IggyByteSize {
+        let mut total = IggyByteSize::new(0);
+        total += self.stream.get_size_bytes();
+        total += self.topic.get_size_bytes();
+        for msg in &self.messages {
+            total += msg.get_size_bytes();
+        }
+        total
+    }
+}
+
+pub struct ShardMessageWithPermits {
+    pub inner: ShardMessage,
+    _bytes_permit: Option<OwnedSemaphorePermit>,
+    _slot_permit: Option<OwnedSemaphorePermit>,
+}
+
+impl ShardMessageWithPermits {
+    pub fn new(
+        msg: ShardMessage,
+        permit_bytes: OwnedSemaphorePermit,
+        permit_slot: OwnedSemaphorePermit,
+    ) -> Self {
+        Self {
+            inner: msg,
+            _bytes_permit: Some(permit_bytes),
+            _slot_permit: Some(permit_slot),
+        }
+    }
+}
+
+pub struct Shard {
+    tx: flume::Sender<ShardMessageWithPermits>,
+    shutdown_notify: Arc<Notify>,
+    closed: Arc<AtomicBool>,
+    _handle: JoinHandle<()>,
+}
+
+impl Shard {
+    pub fn new(
+        core: Arc<impl ProducerCoreBackend>,
+        config: Arc<BackgroundConfig>,
+        err_sender: flume::Sender<ErrorCtx>,
+    ) -> Self {
+        let (tx, rx) = flume::bounded::<ShardMessageWithPermits>(256);
+        let shutdown_notify = Arc::new(Notify::new());
+        let closed = Arc::new(AtomicBool::new(false));
+
+        let shutdown_notify_clone = shutdown_notify.clone();
+        let closed_clone = closed.clone();
+        let handle = tokio::spawn(async move {
+            let mut buffer = Vec::new();
+            let mut buffer_bytes = 0;
+            let mut last_flush = tokio::time::Instant::now();
+
+            loop {
+                let deadline = last_flush + config.linger_time.get_duration();
+                tokio::select! {
+                    maybe_msg = rx.recv_async() => {
+                        match maybe_msg {
+                            Ok(msg) => {
+                                buffer_bytes += 
msg.inner.get_size_bytes().as_bytes_usize();
+                                buffer.push(msg);
+
+                                let exceed_batch_len = 
config.batch_length.is_some_and(|len| buffer.len() >= len);
+                                let exceed_batch_size = 
config.batch_size.is_some_and(|size| buffer_bytes >= size);
+

Review Comment:
   I think that we could add some useful debug level logs here, within 
flush_buffer() etc. just to see what's happening when this level is enabled.



##########
core/sdk/src/clients/producer_builder.rs:
##########
@@ -226,21 +198,31 @@ impl IggyProducerBuilder {
         }
     }
 
-    /// Builds the producer.
-    ///
-    /// Note: After building the producer, `init()` must be invoked before 
producing messages.
+    /// Sets the producer to use synchronous (direct) message sending.
+    /// This mode ensures that messages are sent immediately to the server
+    /// without being buffered or delayed.
+    pub fn sync(mut self, config: SyncConfig) -> Self {

Review Comment:
   Do we keep the sync name? The mention of `synchronous` might be a bit 
misleading, as it's the async function, not the blocking sync call.



##########
core/common/src/error/iggy_error.rs:
##########
@@ -363,6 +365,23 @@ pub enum IggyError {
     TooSmallMessage(u32, u32) = 4037,
     #[error("Cannot sed messages due to client disconnection")]
     CannotSendMessagesDueToClientDisconnection = 4050,
+    #[error("Background send error")]
+    BackgroundSendError = 4051,
+    #[error("Background send timeout")]
+    BackgroundSendTimeout = 4052,
+    #[error("Background send buffer is full")]
+    BackgroundSendBufferFull = 4053,
+    #[error("Background worker disconnected")]
+    BackgroundWorkerDisconnected = 4054,
+    #[error("Background send buffer overflow")]
+    BackgroundSendBufferOverflow = 4055,
+    #[error("Producer send failed")]
+    ProducerSendFailed {
+        cause: String,

Review Comment:
   Maybe the `cause` could be an enum on its own? It could have multiple 
variants, like the universal `IggyError` as well as the custom ones.



##########
core/sdk/src/clients/producer_error_callback.rs:
##########
@@ -0,0 +1,66 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use iggy_common::{Identifier, IggyMessage, Partitioning};
+use std::fmt::Debug;
+use std::pin::Pin;
+use std::sync::Arc;
+use tracing::error;
+
+#[derive(Debug)]
+pub struct ErrorCtx {
+    pub cause: String,
+    pub stream: Arc<Identifier>,
+    pub topic: Arc<Identifier>,
+    pub partitioning: Option<Arc<Partitioning>>,
+    pub messages: Arc<Vec<IggyMessage>>,
+}
+
+/// A trait for handling background sending errors.
+///
+/// This is used when a message batch fails to send in an asynchronous 
background task.
+/// Implementors can define custom logic such as logging, retrying, alerting, 
etc.
+pub trait ErrorCallback: Send + Sync + Debug + 'static {
+    fn call(&self, ctx: ErrorCtx) -> Pin<Box<dyn Future<Output = ()> + Send + 
'static>>;
+}
+
+/// Default implementation of [`ErrorCallback`] that logs the error using 
`tracing::error!`.
+///
+/// Logs include stream, topic, optional partitioning, number of messages, and 
the cause.
+#[derive(Debug, Default)]
+pub struct LogErrorCallback;
+
+impl ErrorCallback for LogErrorCallback {
+    fn call(&self, ctx: ErrorCtx) -> Pin<Box<dyn Future<Output = ()> + Send + 
'static>> {
+        Box::pin(async move {
+            let partitioning = ctx
+                .partitioning
+                .as_ref()
+                .map(|p| format!("{:?}", p))
+                .unwrap_or_else(|| "None".to_string());
+
+            error!(
+                cause = ctx.cause,
+                stream = %ctx.stream,
+                topic = %ctx.topic,
+                partitioning = %partitioning,
+                num_messages = ctx.messages.len(),
+                "Failed to send messages in background task",

Review Comment:
   Let's inlcude in the error message log also the stream and topic names (and 
the cause).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to