This is an automated email from the ASF dual-hosted git repository.

bashirbekov pushed a commit to branch feat/add-background-send
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/feat/add-background-send by 
this push:
     new 09107ec0 del
09107ec0 is described below

commit 09107ec05da0458a78dad1eb5aca51c8fcc1181d
Author: haze518 <[email protected]>
AuthorDate: Wed Jun 4 10:33:01 2025 +0600

    del
---
 Cargo.lock                                         |   1 +
 core/examples/src/multi-tenant/producer/main.rs    |   9 +-
 core/examples/src/new-sdk/producer/main.rs         |  10 +-
 core/integration/tests/mod.rs                      |   1 +
 core/integration/tests/sdk/mod.rs                  |   0
 core/sdk/Cargo.toml                                |   3 +
 core/sdk/src/clients/producer.rs                   | 172 ++++++------
 core/sdk/src/clients/producer_builder.rs           |  17 +-
 core/sdk/src/clients/producer_config.rs            |   2 +-
 core/sdk/src/clients/producer_dispatcher.rs        | 291 ++++++++++++++++-----
 core/sdk/src/clients/producer_sharding.rs          | 183 ++++++++++++-
 core/sdk/src/prelude.rs                            |   2 +-
 .../stream_builder/build/build_iggy_producer.rs    |   4 +-
 .../sdk/src/stream_builder/iggy_stream_producer.rs |   3 +-
 14 files changed, 536 insertions(+), 162 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 9f632ea7..39e25fcb 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -3547,6 +3547,7 @@ dependencies = [
  "futures-util",
  "iggy_binary_protocol",
  "iggy_common",
+ "mockall",
  "num_cpus",
  "quinn",
  "reqwest",
diff --git a/core/examples/src/multi-tenant/producer/main.rs 
b/core/examples/src/multi-tenant/producer/main.rs
index 28dc9759..f867faa7 100644
--- a/core/examples/src/multi-tenant/producer/main.rs
+++ b/core/examples/src/multi-tenant/producer/main.rs
@@ -260,10 +260,13 @@ async fn create_producers(
     let mut producers = Vec::new();
     for topic in topics {
         for id in 1..=producers_count {
-            let mut producer = client
+            let producer = client
                 .producer(stream, topic)?
-                .batch_length(batch_length)
-                .linger_time(IggyDuration::from_str(interval).expect("Invalid 
duration"))
+                .sync(|b|
+                        b
+                            .batch_length(batch_length)
+                            
.linger_time(IggyDuration::from_str(interval).expect("Invalid duration"))
+                )
                 .partitioning(Partitioning::balanced())
                 .create_topic_if_not_exists(
                     partitions_count,
diff --git a/core/examples/src/new-sdk/producer/main.rs 
b/core/examples/src/new-sdk/producer/main.rs
index 52f60e7c..0da9afea 100644
--- a/core/examples/src/new-sdk/producer/main.rs
+++ b/core/examples/src/new-sdk/producer/main.rs
@@ -43,10 +43,14 @@ async fn main() -> anyhow::Result<(), Box<dyn Error>> {
     let client = client_provider::get_raw_client(client_provider_config, 
false).await?;
     let client = IggyClient::builder().with_client(client).build()?;
     client.connect().await?;
-    let mut producer = client
+    let interval = IggyDuration::from_str(&args.interval)?;
+    let producer = client
         .producer(&args.stream_id, &args.topic_id)?
-        .batch_length(args.messages_per_batch)
-        .linger_time(IggyDuration::from_str(&args.interval)?)
+        .sync(|b|
+            b
+                .batch_length(args.messages_per_batch)
+                .linger_time(interval)
+        )
         .partitioning(Partitioning::balanced())
         .create_topic_if_not_exists(
             3,
diff --git a/core/integration/tests/mod.rs b/core/integration/tests/mod.rs
index 32c49b8d..e29f3d5e 100644
--- a/core/integration/tests/mod.rs
+++ b/core/integration/tests/mod.rs
@@ -34,6 +34,7 @@ mod examples;
 mod server;
 mod state;
 mod streaming;
+mod sdk;
 
 lazy_static! {
     static ref TESTS_FAILED: AtomicBool = AtomicBool::new(false);
diff --git a/core/integration/tests/sdk/mod.rs 
b/core/integration/tests/sdk/mod.rs
new file mode 100644
index 00000000..e69de29b
diff --git a/core/sdk/Cargo.toml b/core/sdk/Cargo.toml
index 32d1cb2f..fd42366e 100644
--- a/core/sdk/Cargo.toml
+++ b/core/sdk/Cargo.toml
@@ -60,3 +60,6 @@ tokio-rustls = { workspace = true }
 tracing = { workspace = true }
 trait-variant = { workspace = true }
 webpki-roots = { workspace = true }
+
+[dev-dependencies]
+mockall.workspace = true
diff --git a/core/sdk/src/clients/producer.rs b/core/sdk/src/clients/producer.rs
index 88a44204..c63ae01e 100644
--- a/core/sdk/src/clients/producer.rs
+++ b/core/sdk/src/clients/producer.rs
@@ -35,6 +35,20 @@ use std::time::Duration;
 use tokio::time::{Interval, sleep};
 use tracing::{error, info, trace, warn};
 
+#[cfg(test)]
+use mockall::automock;
+
+#[cfg_attr(test, automock)]
+pub trait ProducerCoreBackend: Send + Sync + 'static {
+    fn send_internal(
+        &self,
+        stream: &Identifier,
+        topic: &Identifier,
+        msgs: Vec<IggyMessage>,
+        partitioning: Option<Arc<Partitioning>>,
+    ) -> impl Future<Output = Result<(), IggyError>> + Send;
+}
+
 pub struct ProducerCore {
     initialized: AtomicBool,
     can_send: Arc<AtomicBool>,
@@ -168,82 +182,6 @@ impl ProducerCore {
         });
     }
 
-    pub(crate) async fn send_internal(
-        &self,
-        stream: &Identifier,
-        topic: &Identifier,
-        mut msgs: Vec<IggyMessage>,
-        partitioning: Option<Arc<Partitioning>>,
-    ) -> Result<(), IggyError> {
-        if msgs.is_empty() {
-            return Ok(());
-        }
-
-        if let Err(err) = self.encrypt_messages(&mut msgs) {
-            return Err(IggyError::ProducerSendFailed {
-                cause: err.to_string(),
-                failed: Arc::new(msgs),
-            });
-        }
-
-        let part = match self.get_partitioning(stream, topic, &msgs, 
partitioning.clone()) {
-            Ok(p) => p,
-            Err(err) => {
-                return Err(IggyError::ProducerSendFailed {
-                    cause: err.to_string(),
-                    failed: Arc::new(msgs),
-                });
-            }
-        };
-
-        match &self.sync_config {
-            Some(cfg) => {
-                let linger_time_micros = match cfg.linger_time {
-                    Some(t) => t.as_micros(),
-                    None => 0
-                };
-                if linger_time_micros > 0 {
-                    Self::wait_before_sending(
-                        linger_time_micros,
-                        self.last_sent_at.load(ORDERING),
-                    )
-                    .await;
-                }
-
-                let max = cfg.batch_length;
-                let mut index = 0;
-                while index < msgs.len() {
-                    let end = (index + max).min(msgs.len());
-                    let chunk = &mut msgs[index..end];
-
-                    if let Err(err) = self.try_send_messages(stream, topic, 
&part, chunk).await {
-                        let failed_tail = msgs.split_off(index);
-                        return Err(IggyError::ProducerSendFailed {
-                            cause: err.to_string(),
-                            failed: Arc::new(failed_tail),
-                        });
-                    }
-                    self.last_sent_at
-                        .store(IggyTimestamp::now().into(), ORDERING);
-                    index = end;
-                }
-            }
-            // background send on
-            _ => {
-                self.try_send_messages(stream, topic, &part, &mut msgs)
-                    .await
-                    .map_err(|err| IggyError::ProducerSendFailed {
-                        cause: err.to_string(),
-                        failed: Arc::new(msgs),
-                    })?;
-                self.last_sent_at
-                    .store(IggyTimestamp::now().into(), ORDERING);
-            }
-        }
-
-        Ok(())
-    }
-
     async fn try_send_messages(
         &self,
         stream: &Identifier,
@@ -414,6 +352,81 @@ impl ProducerCore {
     }
 }
 
+impl ProducerCoreBackend for ProducerCore {
+    async fn send_internal(
+        &self,
+        stream: &Identifier,
+        topic: &Identifier,
+        mut msgs: Vec<IggyMessage>,
+        partitioning: Option<Arc<Partitioning>>,
+    ) -> Result<(), IggyError> {
+        if msgs.is_empty() {
+            return Ok(());
+        }
+
+        if let Err(err) = self.encrypt_messages(&mut msgs) {
+            return Err(IggyError::ProducerSendFailed {
+                cause: err.to_string(),
+                failed: Arc::new(msgs),
+            });
+        }
+
+        let part = match self.get_partitioning(stream, topic, &msgs, 
partitioning.clone()) {
+            Ok(p) => p,
+            Err(err) => {
+                return Err(IggyError::ProducerSendFailed {
+                    cause: err.to_string(),
+                    failed: Arc::new(msgs),
+                });
+            }
+        };
+
+        match &self.sync_config {
+            Some(cfg) => {
+                let linger_time_micros = match cfg.linger_time {
+                    Some(t) => t.as_micros(),
+                    None => 0,
+                };
+                if linger_time_micros > 0 {
+                    Self::wait_before_sending(linger_time_micros, 
self.last_sent_at.load(ORDERING))
+                        .await;
+                }
+
+                let max = cfg.batch_length;
+                let mut index = 0;
+                while index < msgs.len() {
+                    let end = (index + max).min(msgs.len());
+                    let chunk = &mut msgs[index..end];
+
+                    if let Err(err) = self.try_send_messages(stream, topic, 
&part, chunk).await {
+                        let failed_tail = msgs.split_off(index);
+                        return Err(IggyError::ProducerSendFailed {
+                            cause: err.to_string(),
+                            failed: Arc::new(failed_tail),
+                        });
+                    }
+                    self.last_sent_at
+                        .store(IggyTimestamp::now().into(), ORDERING);
+                    index = end;
+                }
+            }
+            // background send on
+            _ => {
+                self.try_send_messages(stream, topic, &part, &mut msgs)
+                    .await
+                    .map_err(|err| IggyError::ProducerSendFailed {
+                        cause: err.to_string(),
+                        failed: Arc::new(msgs),
+                    })?;
+                self.last_sent_at
+                    .store(IggyTimestamp::now().into(), ORDERING);
+            }
+        }
+
+        Ok(())
+    }
+}
+
 unsafe impl Send for IggyProducer {}
 unsafe impl Sync for IggyProducer {}
 
@@ -466,7 +479,7 @@ impl IggyProducer {
             send_retries_interval,
             sync_config: match mode {
                 SendMode::Sync(ref cfg) => Some(cfg.clone()),
-                _ => None
+                _ => None,
             },
         });
         let dispatcher = match mode {
@@ -568,8 +581,3 @@ impl IggyProducer {
         }
     }
 }
-
-fn default_shard_count() -> usize {
-    let cpus = num_cpus::get();
-    cpus.clamp(2, 16)
-}
diff --git a/core/sdk/src/clients/producer_builder.rs 
b/core/sdk/src/clients/producer_builder.rs
index 6f3e001b..bc2184d7 100644
--- a/core/sdk/src/clients/producer_builder.rs
+++ b/core/sdk/src/clients/producer_builder.rs
@@ -19,7 +19,8 @@ use super::MAX_BATCH_LENGTH;
 use crate::clients::producer_config::{BackgroundConfig, BackpressureMode, 
SyncConfig};
 use crate::clients::producer_error_callback::LogErrorCallback;
 use crate::clients::producer_sharding::{BalancedSharding, Sharding};
-use crate::prelude::{ErrorCallback, IggyProducer};
+use crate::clients::producer_error_callback::ErrorCallback;
+use crate::prelude::IggyProducer;
 use iggy_binary_protocol::Client;
 use iggy_common::locking::IggySharedMut;
 use iggy_common::{
@@ -99,6 +100,20 @@ impl BackgroundBuilder {
         }
     }
 
+    pub fn batch_size(self, value: usize) -> Self {
+        Self {
+            batch_size: Some(value),
+            ..self
+        }
+    }
+
+    pub fn sharding(self, sharding: Box<dyn Sharding>) -> Self {
+        Self {
+            sharding,
+            ..self
+        }
+    }
+
     pub fn max_buffer_size(self, value: IggyByteSize) -> Self {
         Self {
             max_buffer_size: Some(value),
diff --git a/core/sdk/src/clients/producer_config.rs 
b/core/sdk/src/clients/producer_config.rs
index 0865a05c..2a94bca1 100644
--- a/core/sdk/src/clients/producer_config.rs
+++ b/core/sdk/src/clients/producer_config.rs
@@ -19,8 +19,8 @@ use std::sync::Arc;
 
 use iggy_common::{IggyByteSize, IggyDuration};
 
+use crate::clients::producer_error_callback::ErrorCallback;
 use crate::clients::producer_sharding::Sharding;
-use crate::prelude::ErrorCallback;
 
 #[derive(Debug, Clone)]
 /// Determines how the `send_messages` API should behave when problem is 
encountered
diff --git a/core/sdk/src/clients/producer_dispatcher.rs 
b/core/sdk/src/clients/producer_dispatcher.rs
index 0cc38aa2..c318e34b 100644
--- a/core/sdk/src/clients/producer_dispatcher.rs
+++ b/core/sdk/src/clients/producer_dispatcher.rs
@@ -15,16 +15,14 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-use crate::clients::producer::ProducerCore;
+use crate::clients::producer::{ProducerCore, ProducerCoreBackend};
 use crate::clients::producer_config::{BackgroundConfig, BackpressureMode};
 use crate::clients::producer_error_callback::ErrorCtx;
 use crate::clients::producer_sharding::{Shard, ShardMessage};
+use futures::FutureExt;
+use iggy_common::{BytesSerializable, Identifier, IggyError, IggyMessage, 
Partitioning, SendMessages, Sizeable};
 use std::sync::Arc;
 use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
-use futures::FutureExt;
-use iggy_common::{
-    Identifier, IggyError, IggyMessage, Partitioning, Sizeable,
-};
 
 use tokio::{sync::Notify, task::JoinHandle};
 
@@ -34,13 +32,14 @@ pub struct ProducerDispatcher {
     notify: Arc<Notify>,
     config: Arc<BackgroundConfig>,
     closed: AtomicBool,
+    sender: flume::Sender<ShardMessage>,
     _join_handle: JoinHandle<()>,
 }
 
 impl ProducerDispatcher {
-    pub fn new(core: Arc<ProducerCore>, config: BackgroundConfig) -> Self {
+    pub fn new(core: Arc<impl ProducerCoreBackend>, config: BackgroundConfig) 
-> Self {
         let mut shards = Vec::with_capacity(config.num_shards);
-        let current_buffered_size = Arc::new(AtomicUsize::new(0));
+        let global_buffer = Arc::new(AtomicUsize::new(0));
         let notify = Arc::new(Notify::new());
         let config = Arc::new(config);
 
@@ -58,10 +57,78 @@ impl ProducerDispatcher {
             tracing::debug!("error-callback worker finished");
         });
 
+        let (tx, rx) = flume::bounded::<ShardMessage>(1);
+        let gb = global_buffer.clone();
+        let cfg = config.clone();
+        let not = notify.clone();
+        let _handle = tokio::spawn(async move {
+            loop {
+                match rx.recv_async().await {
+                    Ok(msg) => {
+                        let mut reserved = gb.load(Ordering::Relaxed);
+                        if let Some(buffer_size) = cfg.max_buffer_size {
+                            let batch_bytes = msg.get_size_bytes();
+                            if batch_bytes.as_bytes_usize() > 
buffer_size.as_bytes_usize() {
+                                return 
Err(IggyError::BackgroundSendBufferOverflow);
+                            }
+                            loop {
+                                if buffer_size.as_bytes_usize() != 0
+                                    && reserved + batch_bytes.as_bytes_usize()
+                                        >= buffer_size.as_bytes_usize()
+                                {
+                                    match cfg.failure_mode {
+                                        BackpressureMode::Block => {
+                                            not.notified().await;
+                                            continue;
+                                        }
+                                        BackpressureMode::BlockWithTimeout(t) 
=> {
+                                            if tokio::time::timeout(
+                                                t.get_duration(),
+                                                not.notified(),
+                                            )
+                                            .await
+                                            .is_err()
+                                            {
+                                                return 
Err(IggyError::BackgroundSendTimeout);
+                                            }
+                                            continue;
+                                        }
+                                        BackpressureMode::FailImmediately => {
+                                            return 
Err(IggyError::BackgroundSendBufferOverflow);
+                                        }
+                                    };
+                                }
+                                match gb.compare_exchange(
+                                    reserved,
+                                    reserved + batch_bytes.as_bytes_usize(),
+                                    Ordering::AcqRel,
+                                    Ordering::Acquire,
+                                ) {
+                                    Ok(_) => break,
+                                    Err(v) => reserved = v,
+                                }
+                            }
+                        }
+
+                        let shard_ix = cfg.sharding.pick_shard(
+                            &self.shards,
+                            &msg.messages,
+                            &msg.stream,
+                            &msg.topic,
+                        );
+                        debug_assert!(shard_ix < self.shards.len());
+                        let shard = &self.shards[shard_ix];
+                        shard.send(shard_message).await
+                    }
+                    Err(_) => break,
+                }
+            }
+        });
+
         for _ in 0..config.num_shards {
             shards.push(Shard::new(
                 core.clone(),
-                current_buffered_size.clone(),
+                global_buffer.clone(),
                 config.clone(),
                 notify.clone(),
                 err_tx.clone(),
@@ -70,7 +137,7 @@ impl ProducerDispatcher {
 
         Self {
             shards,
-            global_buffer: current_buffered_size,
+            global_buffer,
             config,
             notify,
             closed: AtomicBool::new(false),
@@ -89,63 +156,12 @@ impl ProducerDispatcher {
             return Err(IggyError::ProducerClosed);
         }
 
-        let shard_message = ShardMessage {
+        self.sender.send_async(ShardMessage {
             messages,
             stream,
             topic,
             partitioning,
-        };
-        let batch_bytes = shard_message.get_size_bytes();
-
-        let mut reserved = self.global_buffer.load(Ordering::Relaxed);
-        if let Some(buffer_size) = &self.config.max_buffer_size {
-            if batch_bytes.as_bytes_usize() > buffer_size.as_bytes_usize() {
-                return Err(IggyError::BackgroundSendBufferOverflow);
-            }
-            loop {
-                if buffer_size.as_bytes_usize() != 0
-                    && reserved + batch_bytes.as_bytes_usize() > 
buffer_size.as_bytes_usize()
-                {
-                    match self.config.failure_mode {
-                        BackpressureMode::Block => {
-                            self.notify.notified().await;
-                            continue;
-                        }
-                        BackpressureMode::BlockWithTimeout(t) => {
-                            if tokio::time::timeout(t.get_duration(), 
self.notify.notified())
-                                .await
-                                .is_err()
-                            {
-                                return Err(IggyError::BackgroundSendTimeout);
-                            }
-                            continue;
-                        }
-                        BackpressureMode::FailImmediately => {
-                            return 
Err(IggyError::BackgroundSendBufferOverflow);
-                        }
-                    };
-                }
-                match self.global_buffer.compare_exchange(
-                    reserved,
-                    reserved + batch_bytes.as_bytes_usize(),
-                    Ordering::AcqRel,
-                    Ordering::Acquire,
-                ) {
-                    Ok(_) => break,
-                    Err(v) => reserved = v,
-                }
-            }
-        }
-
-        let shard_ix = self.config.sharding.pick_shard(
-            &self.shards,
-            &shard_message.messages,
-            &shard_message.stream,
-            &shard_message.topic,
-        );
-        debug_assert!(shard_ix < self.shards.len());
-        let shard = &self.shards[shard_ix];
-        shard.send(shard_message).await
+        }).await.map_err(|_| IggyError::BackgroundSendError)
     }
 
     pub async fn shutdown(mut self) {
@@ -163,3 +179,154 @@ impl ProducerDispatcher {
         debug_assert_eq!(self.global_buffer.load(Ordering::Relaxed), 0);
     }
 }
+
+#[cfg(test)]
+mod tests {
+    use std::time::Duration;
+
+    use bytes::Bytes;
+    use tokio::sync::Mutex;
+    use tokio::time::{sleep, timeout};
+
+    use crate::clients::producer::MockProducerCoreBackend;
+    use crate::clients::producer_builder::BackgroundBuilder;
+
+    use super::*;
+
+    fn dummy_identifier() -> Arc<Identifier> {
+        Arc::new(Identifier::numeric(1).unwrap())
+    }
+
+    fn dummy_message(size: usize) -> IggyMessage {
+        IggyMessage::builder()
+            .payload(Bytes::from(vec![0u8; size]))
+            .build()
+            .unwrap()
+    }
+
+    #[tokio::test]
+    async fn test_dispatch_successful() {
+        let mut mock = MockProducerCoreBackend::new();
+        mock.expect_send_internal()
+            .times(1)
+            .returning(|_, _, _, _| Box::pin(async { Ok(()) }));
+
+        let msg = ShardMessage {
+            stream: dummy_identifier(),
+            topic: dummy_identifier(),
+            messages: vec![dummy_message(5)],
+            partitioning: None,
+        };
+        let config = BackgroundBuilder::default()
+            .max_buffer_size(msg.get_size_bytes() + 100.into())
+            .num_shards(1)
+            .build();
+
+        let dispatcher = ProducerDispatcher::new(Arc::new(mock), config);
+
+        let result = dispatcher
+            .dispatch(msg.messages, msg.topic, msg.stream, msg.partitioning)
+            .await;
+
+        sleep(Duration::from_millis(100)).await;
+
+        assert!(result.is_ok());
+    }
+
+    #[tokio::test]
+    async fn test_dispatch_buffer_overflow_immediate_fail() {
+        let mock = MockProducerCoreBackend::new();
+
+        let msg = ShardMessage {
+            stream: dummy_identifier(),
+            topic: dummy_identifier(),
+            messages: vec![dummy_message(5)],
+            partitioning: None,
+        };
+        let config = BackgroundBuilder::default()
+            .max_buffer_size(msg.get_size_bytes() - 10.into())
+            .failure_mode(BackpressureMode::FailImmediately)
+            .num_shards(1)
+            .build();
+
+        let dispatcher = ProducerDispatcher::new(Arc::new(mock), config);
+
+        let result = dispatcher
+            .dispatch(msg.messages, msg.topic, msg.stream, msg.partitioning)
+            .await;
+
+        assert!(matches!(
+            result,
+            Err(IggyError::BackgroundSendBufferOverflow)
+        ));
+    }
+
+    #[tokio::test]
+    async fn test_dispatch_buffer_overflow_block_with_timeout() {
+        let mock = MockProducerCoreBackend::new();
+
+        let msg = ShardMessage {
+            stream: dummy_identifier(),
+            topic: dummy_identifier(),
+            messages: vec![dummy_message(5)],
+            partitioning: None,
+        };
+        let config = BackgroundBuilder::default()
+            .max_buffer_size(msg.get_size_bytes() + 100.into())
+            .failure_mode(BackpressureMode::BlockWithTimeout(
+                Duration::from_millis(100).into(),
+            ))
+            .num_shards(1)
+            .build();
+
+        let dispatcher = ProducerDispatcher::new(Arc::new(mock), config);
+        dispatcher.global_buffer.store(100, Ordering::Relaxed);
+
+        let result = dispatcher
+            .dispatch(msg.messages, msg.topic, msg.stream, msg.partitioning)
+            .await;
+
+        assert!(matches!(result, Err(IggyError::BackgroundSendTimeout)));
+    }
+
+    #[tokio::test]
+    async fn test_dispatch_buffer_overflow_block_then_continue() {
+        let mut mock = MockProducerCoreBackend::new();
+        mock.expect_send_internal()
+            .times(1)
+            .returning(|_, _, _, _| Box::pin(async { Ok(()) }));
+
+        let msg = ShardMessage {
+            stream: dummy_identifier(),
+            topic: dummy_identifier(),
+            messages: vec![dummy_message(5)],
+            partitioning: None,
+        };
+        let config = BackgroundBuilder::default()
+            .max_buffer_size(msg.get_size_bytes() + 100.into())
+            .failure_mode(BackpressureMode::Block)
+            .num_shards(1)
+            .build();
+
+        let dispatcher = ProducerDispatcher::new(Arc::new(mock), config);
+        dispatcher.global_buffer.store(50, Ordering::Relaxed);
+
+        let notify = dispatcher.notify.clone();
+
+        let global_buffer = dispatcher.global_buffer.clone();
+        tokio::spawn(async move {
+            tokio::time::sleep(Duration::from_millis(150)).await;
+            global_buffer.fetch_sub(50, Ordering::Relaxed);
+            notify.notify_waiters();
+        });
+
+        let result = dispatcher
+            .dispatch(msg.messages, msg.topic, msg.stream, msg.partitioning)
+            .await;
+
+        dispatcher.notify.notified().await;
+        sleep(Duration::from_millis(100)).await;
+
+        assert!(result.is_ok())
+    }
+}
diff --git a/core/sdk/src/clients/producer_sharding.rs 
b/core/sdk/src/clients/producer_sharding.rs
index 077809fb..3ae5f682 100644
--- a/core/sdk/src/clients/producer_sharding.rs
+++ b/core/sdk/src/clients/producer_sharding.rs
@@ -23,7 +23,7 @@ use tokio::sync::Notify;
 use tokio::task::JoinHandle;
 use tracing::error;
 
-use crate::clients::producer::ProducerCore;
+use crate::clients::producer::ProducerCoreBackend;
 use crate::clients::producer_config::BackgroundConfig;
 use crate::clients::producer_error_callback::ErrorCtx;
 
@@ -87,14 +87,14 @@ impl Sizeable for ShardMessage {
     }
 }
 
-pub(crate) struct Shard {
+pub struct Shard {
     tx: flume::Sender<ShardMessage>,
     _handle: JoinHandle<()>,
 }
 
 impl Shard {
     pub fn new(
-        core: Arc<ProducerCore>,
+        core: Arc<impl ProducerCoreBackend>,
         global_buffer: Arc<AtomicUsize>,
         config: Arc<BackgroundConfig>,
         notify: Arc<Notify>,
@@ -146,7 +146,7 @@ impl Shard {
     }
 
     async fn flush_buffer(
-        core: &Arc<ProducerCore>,
+        core: &Arc<impl ProducerCoreBackend>,
         buffer: &mut Vec<ShardMessage>,
         buffer_bytes: &mut usize,
         global_buffer: &Arc<AtomicUsize>,
@@ -198,3 +198,178 @@ impl Shard {
         let _ = self._handle.await;
     }
 }
+
+#[cfg(test)]
+mod tests {
+    use std::time::Duration;
+
+    use bytes::Bytes;
+    use iggy_common::IggyDuration;
+    use tokio::{sync::Notify, time::sleep};
+
+    use super::*;
+    use crate::clients::{producer::MockProducerCoreBackend, 
producer_builder::BackgroundBuilder};
+
+    fn dummy_identifier() -> Arc<Identifier> {
+        Arc::new(Identifier::numeric(1).unwrap())
+    }
+
+    fn dummy_message(size: usize) -> IggyMessage {
+        IggyMessage::builder()
+            .payload(Bytes::from(vec![0u8; size]))
+            .build()
+            .unwrap()
+    }
+
+    #[tokio::test]
+    async fn test_shard_flushes_by_batch_length() {
+        let mut mock = MockProducerCoreBackend::new();
+        mock.expect_send_internal()
+            .times(10)
+            .returning(|_, _, _, _| Box::pin(async { Ok(()) }));
+
+        let bb = BackgroundBuilder::default()
+            .batch_length(10)
+            .linger_time(IggyDuration::new_from_secs(1))
+            .batch_size(10_000);
+        let shard = Shard::new(
+            Arc::new(mock),
+            Arc::new(AtomicUsize::new(0)),
+            Arc::new(bb.build()),
+            Arc::new(Notify::new()),
+            flume::unbounded().0,
+        );
+
+        for _ in 0..10 {
+            shard
+                .send(ShardMessage {
+                    stream: dummy_identifier(),
+                    topic: dummy_identifier(),
+                    messages: vec![dummy_message(1)],
+                    partitioning: None,
+                })
+                .await
+                .unwrap();
+        }
+
+        sleep(Duration::from_millis(100)).await;
+    }
+
+    #[tokio::test]
+    async fn test_shard_flushes_by_batch_size() {
+        let mut mock = MockProducerCoreBackend::new();
+        mock.expect_send_internal()
+            .times(1)
+            .returning(|_, _, _, _| Box::pin(async { Ok(()) }));
+
+        let bb = BackgroundBuilder::default()
+            .batch_length(1000)
+            .linger_time(IggyDuration::new_from_secs(1))
+            .batch_size(10_000);
+        let shard = Shard::new(
+            Arc::new(mock),
+            Arc::new(AtomicUsize::new(0)),
+            Arc::new(bb.build()),
+            Arc::new(Notify::new()),
+            flume::unbounded().0,
+        );
+
+        shard
+            .send(ShardMessage {
+                stream: dummy_identifier(),
+                topic: dummy_identifier(),
+                messages: vec![dummy_message(10_000)],
+                partitioning: None,
+            })
+            .await
+            .unwrap();
+
+        sleep(Duration::from_millis(100)).await;
+    }
+
+    #[tokio::test]
+    async fn test_shard_flushes_by_timeout() {
+        let mut mock = MockProducerCoreBackend::new();
+        mock.expect_send_internal().times(1).returning(|_, _, _, _| 
Box::pin(async { Ok(()) }));
+
+        let bb = BackgroundBuilder::default()
+            .batch_length(10)
+            .linger_time(IggyDuration::new(Duration::from_millis(50)))
+            .batch_size(10_000);
+        let shard = Shard::new(
+            Arc::new(mock),
+            Arc::new(AtomicUsize::new(0)),
+            Arc::new(bb.build()),
+            Arc::new(Notify::new()),
+            flume::unbounded().0,
+        );
+
+        shard
+            .send(ShardMessage {
+                stream: dummy_identifier(),
+                topic: dummy_identifier(),
+                messages: vec![dummy_message(1)],
+                partitioning: None,
+            })
+            .await
+            .unwrap();
+
+        sleep(Duration::from_millis(100)).await;
+    }
+
+    #[tokio::test]
+    async fn test_shard_forwards_error() {
+        let mut mock = MockProducerCoreBackend::new();
+        let error = IggyError::ProducerSendFailed {
+            failed: Arc::new(vec![dummy_message(1)]),
+            cause: "test error".into(),
+        };
+
+        mock.expect_send_internal().returning(move |_, _, _, _| {
+            let err = error.clone();
+            Box::pin(async move { Err(err) })
+        });
+
+        let (err_tx, err_rx) = flume::unbounded();
+        let bb = BackgroundBuilder::default();
+        let shard = Shard::new(
+            Arc::new(mock),
+            Arc::new(AtomicUsize::new(0)),
+            Arc::new(bb.build()),
+            Arc::new(Notify::new()),
+            err_tx,
+        );
+
+        shard.send(ShardMessage {
+            stream: dummy_identifier(),
+            topic: dummy_identifier(),
+            messages: vec![dummy_message(1)],
+            partitioning: None,
+        }).await.unwrap();
+
+        let err_ctx = err_rx.recv_async().await.unwrap();
+        assert_eq!(err_ctx.cause, "test error");
+        assert_eq!(err_ctx.messages.len(), 1);
+    }
+
+    #[tokio::test]
+    async fn test_shard_send_error_on_closed_channel() {
+        let (tx, rx) = flume::bounded::<ShardMessage>(1);
+
+        drop(rx);
+
+        let shard = Shard {
+            tx,
+            _handle: tokio::spawn(async {}),
+        };
+
+        let result = shard.send(ShardMessage {
+            stream: dummy_identifier(),
+            topic: dummy_identifier(),
+            messages: vec![dummy_message(1)],
+            partitioning: None,
+        }).await;
+
+        assert!(matches!(result, Err(IggyError::BackgroundSendError)));
+    }
+}
diff --git a/core/sdk/src/prelude.rs b/core/sdk/src/prelude.rs
index 9b13f218..3d87cc82 100644
--- a/core/sdk/src/prelude.rs
+++ b/core/sdk/src/prelude.rs
@@ -35,7 +35,7 @@ pub use crate::clients::consumer::{
     AutoCommit, AutoCommitAfter, AutoCommitWhen, IggyConsumer, ReceivedMessage,
 };
 pub use crate::clients::consumer_builder::IggyConsumerBuilder;
-pub use crate::clients::producer::{IggyProducer, ErrorCallback};
+pub use crate::clients::producer::IggyProducer;
 pub use crate::clients::producer_builder::IggyProducerBuilder;
 pub use crate::consumer_ext::IggyConsumerMessageExt;
 pub use crate::stream_builder::IggyConsumerConfig;
diff --git a/core/sdk/src/stream_builder/build/build_iggy_producer.rs 
b/core/sdk/src/stream_builder/build/build_iggy_producer.rs
index 85409c5f..eb6b2167 100644
--- a/core/sdk/src/stream_builder/build/build_iggy_producer.rs
+++ b/core/sdk/src/stream_builder/build/build_iggy_producer.rs
@@ -17,9 +17,7 @@
  */
 
 use crate::clients::client::IggyClient;
-use crate::clients::producer::{ErrorCallback, IggyProducer};
-use crate::clients::producer_builder::{SendMode, SyncBuilder};
-use crate::clients::producer_dispatcher::Sharding;
+use crate::clients::producer::IggyProducer;
 use crate::prelude::{IggyError, IggyExpiry, MaxTopicSize};
 use crate::stream_builder::IggyProducerConfig;
 use tracing::{error, trace};
diff --git a/core/sdk/src/stream_builder/iggy_stream_producer.rs 
b/core/sdk/src/stream_builder/iggy_stream_producer.rs
index 0a7174ce..31669518 100644
--- a/core/sdk/src/stream_builder/iggy_stream_producer.rs
+++ b/core/sdk/src/stream_builder/iggy_stream_producer.rs
@@ -17,8 +17,7 @@
  */
 
 use crate::clients::client::IggyClient;
-use crate::clients::producer::{IggyProducer, ErrorCallback};
-use crate::clients::producer_dispatcher::Sharding;
+use crate::clients::producer::IggyProducer;
 use crate::prelude::{IggyError, SystemClient};
 use crate::stream_builder::{IggyProducerConfig, build};
 use tracing::trace;


Reply via email to