This is an automated email from the ASF dual-hosted git repository.

bashirbekov pushed a commit to branch feat/add-background-send
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit 41e924464df29c9e0470403b226578261b7e65b4
Author: haze518 <[email protected]>
AuthorDate: Thu Jun 5 14:01:00 2025 +0600

    add background send option
---
 Cargo.lock                                         |   2 +
 Cargo.toml                                         |   4 -
 core/common/src/error/iggy_error.rs                |  21 +-
 core/common/src/types/message/iggy_message.rs      |   2 +-
 core/connectors/runtime/src/configs.rs             |   4 +-
 core/connectors/runtime/src/sink.rs                |   2 +-
 core/connectors/runtime/src/source.rs              |  11 +-
 core/examples/src/multi-tenant/producer/main.rs    |   8 +-
 core/examples/src/new-sdk/producer/main.rs         |   9 +-
 core/examples/src/sink-data-producer/main.rs       |   8 +-
 core/sdk/Cargo.toml                                |   4 +
 core/sdk/src/clients/mod.rs                        |   6 +-
 core/sdk/src/clients/producer.rs                   | 523 +++++++++++----------
 core/sdk/src/clients/producer_builder.rs           | 299 ++++++++++--
 core/sdk/src/clients/producer_config.rs            |  53 +++
 core/sdk/src/clients/producer_dispatcher.rs        | 447 ++++++++++++++++++
 core/sdk/src/clients/producer_error_callback.rs    |  67 +++
 core/sdk/src/clients/producer_sharding.rs          | 437 +++++++++++++++++
 .../stream_builder/build/build_iggy_producer.rs    |   7 +-
 19 files changed, 1601 insertions(+), 313 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index b1556a42..8ef9cd57 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -3591,6 +3591,8 @@ dependencies = [
  "futures-util",
  "iggy_binary_protocol",
  "iggy_common",
+ "mockall",
+ "num_cpus",
  "quinn",
  "reqwest",
  "reqwest-middleware",
diff --git a/Cargo.toml b/Cargo.toml
index 1f2053f7..d9ea29f0 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -80,7 +80,6 @@ comfy-table = "7.1.4"
 crc32fast = "1.4.2"
 crossbeam = "0.8.4"
 dashmap = "6.1.0"
-derive_builder = "0.20.2"
 derive_more = { version = "2.0.1", features = ["full"] }
 derive-new = "0.7.0"
 dirs = "6.0.0"
@@ -171,7 +170,4 @@ iggy = { path = "core/sdk", version = "0.7.0" }
 server = { path = "core/server" }
 integration = { path = "core/integration" }
 bench-report = { path = "core/bench/report" }
-bench-runner = { path = "core/bench/runner" }
-bench-dashboard-frontend = { path = "core/bench/dashboard/frontend" }
-bench-dashboard-server = { path = "core/bench/dashboard/server" }
 bench-dashboard-shared = { path = "core/bench/dashboard/shared" }
diff --git a/core/common/src/error/iggy_error.rs 
b/core/common/src/error/iggy_error.rs
index b293270b..45917460 100644
--- a/core/common/src/error/iggy_error.rs
+++ b/core/common/src/error/iggy_error.rs
@@ -16,8 +16,10 @@
  * under the License.
  */
 
-use crate::utils::byte_size::IggyByteSize;
+use std::sync::Arc;
+
 use crate::utils::topic_size::MaxTopicSize;
+use crate::{IggyMessage, utils::byte_size::IggyByteSize};
 
 use strum::{EnumDiscriminants, FromRepr, IntoStaticStr};
 use thiserror::Error;
@@ -363,6 +365,23 @@ pub enum IggyError {
     TooSmallMessage(u32, u32) = 4037,
     #[error("Cannot sed messages due to client disconnection")]
     CannotSendMessagesDueToClientDisconnection = 4050,
+    #[error("Background send error")]
+    BackgroundSendError = 4051,
+    #[error("Background send timeout")]
+    BackgroundSendTimeout = 4052,
+    #[error("Background send buffer is full")]
+    BackgroundSendBufferFull = 4053,
+    #[error("Background worker disconnected")]
+    BackgroundWorkerDisconnected = 4054,
+    #[error("Background send buffer overflow")]
+    BackgroundSendBufferOverflow = 4055,
+    #[error("Producer send failed")]
+    ProducerSendFailed {
+        cause: String,
+        failed: Arc<Vec<IggyMessage>>,
+    } = 4056,
+    #[error("Producer closed")]
+    ProducerClosed = 4057,
     #[error("Invalid offset: {0}")]
     InvalidOffset(u64) = 4100,
     #[error("Consumer group with ID: {0} for topic with ID: {1} was not 
found.")]
diff --git a/core/common/src/types/message/iggy_message.rs 
b/core/common/src/types/message/iggy_message.rs
index 54c3a841..ddd4076f 100644
--- a/core/common/src/types/message/iggy_message.rs
+++ b/core/common/src/types/message/iggy_message.rs
@@ -105,7 +105,7 @@ pub const MAX_USER_HEADERS_SIZE: u32 = 100 * 1000;
 ///     .build()
 ///     .unwrap();
 /// ```
-#[derive(Debug, PartialEq)]
+#[derive(Debug, PartialEq, Default)]
 pub struct IggyMessage {
     /// Message metadata
     pub header: IggyMessageHeader,
diff --git a/core/connectors/runtime/src/configs.rs 
b/core/connectors/runtime/src/configs.rs
index aa6d8fa8..aafc9a48 100644
--- a/core/connectors/runtime/src/configs.rs
+++ b/core/connectors/runtime/src/configs.rs
@@ -60,8 +60,8 @@ pub struct StreamProducerConfig {
     pub stream: String,
     pub topic: String,
     pub schema: Schema,
-    pub batch_size: Option<u32>,
-    pub send_interval: Option<String>,
+    pub batch_length: Option<u32>,
+    pub linger_time: Option<String>,
 }
 
 #[derive(Debug, Serialize, Deserialize)]
diff --git a/core/connectors/runtime/src/sink.rs 
b/core/connectors/runtime/src/sink.rs
index 17b014eb..78008011 100644
--- a/core/connectors/runtime/src/sink.rs
+++ b/core/connectors/runtime/src/sink.rs
@@ -125,7 +125,7 @@ pub async fn init(
                     .auto_join_consumer_group()
                     .polling_strategy(PollingStrategy::next())
                     .poll_interval(poll_interval)
-                    .batch_size(batch_size)
+                    .batch_length(batch_size)
                     .build();
 
                 consumer.init().await?;
diff --git a/core/connectors/runtime/src/source.rs 
b/core/connectors/runtime/src/source.rs
index 9cde82ca..ac903a6c 100644
--- a/core/connectors/runtime/src/source.rs
+++ b/core/connectors/runtime/src/source.rs
@@ -112,14 +112,13 @@ pub async fn init(
             .expect("Failed to get source plugin");
 
         for stream in config.streams {
-            let send_interval =
-                
IggyDuration::from_str(&stream.send_interval.unwrap_or("5ms".to_owned()))
+            let linger_time =
+                
IggyDuration::from_str(&stream.linger_time.unwrap_or("5ms".to_owned()))
                     .expect("Invalid send interval");
-            let batch_size = stream.batch_size.unwrap_or(1000);
-            let mut producer = iggy_client
+            let batch_length = stream.batch_length.unwrap_or(1000);
+            let producer = iggy_client
                 .producer(&stream.stream, &stream.topic)?
-                .send_interval(send_interval)
-                .batch_size(batch_size)
+                .sync(|b| 
b.batch_length(batch_length).linger_time(linger_time))
                 .build();
 
             producer.init().await?;
diff --git a/core/examples/src/multi-tenant/producer/main.rs 
b/core/examples/src/multi-tenant/producer/main.rs
index 28dc9759..5867cbb8 100644
--- a/core/examples/src/multi-tenant/producer/main.rs
+++ b/core/examples/src/multi-tenant/producer/main.rs
@@ -260,10 +260,12 @@ async fn create_producers(
     let mut producers = Vec::new();
     for topic in topics {
         for id in 1..=producers_count {
-            let mut producer = client
+            let producer = client
                 .producer(stream, topic)?
-                .batch_length(batch_length)
-                .linger_time(IggyDuration::from_str(interval).expect("Invalid 
duration"))
+                .sync(|b| {
+                    b.batch_length(batch_length)
+                        
.linger_time(IggyDuration::from_str(interval).expect("Invalid duration"))
+                })
                 .partitioning(Partitioning::balanced())
                 .create_topic_if_not_exists(
                     partitions_count,
diff --git a/core/examples/src/new-sdk/producer/main.rs 
b/core/examples/src/new-sdk/producer/main.rs
index 52f60e7c..c30f3e4d 100644
--- a/core/examples/src/new-sdk/producer/main.rs
+++ b/core/examples/src/new-sdk/producer/main.rs
@@ -43,10 +43,13 @@ async fn main() -> anyhow::Result<(), Box<dyn Error>> {
     let client = client_provider::get_raw_client(client_provider_config, 
false).await?;
     let client = IggyClient::builder().with_client(client).build()?;
     client.connect().await?;
-    let mut producer = client
+    let interval = IggyDuration::from_str(&args.interval)?;
+    let producer = client
         .producer(&args.stream_id, &args.topic_id)?
-        .batch_length(args.messages_per_batch)
-        .linger_time(IggyDuration::from_str(&args.interval)?)
+        .sync(|b| {
+            b.batch_length(args.messages_per_batch)
+                .linger_time(interval)
+        })
         .partitioning(Partitioning::balanced())
         .create_topic_if_not_exists(
             3,
diff --git a/core/examples/src/sink-data-producer/main.rs 
b/core/examples/src/sink-data-producer/main.rs
index 2607d955..f678bb5f 100644
--- a/core/examples/src/sink-data-producer/main.rs
+++ b/core/examples/src/sink-data-producer/main.rs
@@ -55,10 +55,12 @@ async fn main() -> Result<(), DataProducerError> {
     let stream = env::var("IGGY_STREAM").unwrap_or("qw".to_owned());
     let topic = env::var("IGGY_TOPIC").unwrap_or("records".to_owned());
     let client = create_client(&address, &username, &password).await?;
-    let mut producer = client
+    let producer = client
         .producer(&stream, &topic)?
-        .batch_size(1000)
-        .send_interval(IggyDuration::from_str("5ms").unwrap())
+        .sync(|b| {
+            b.batch_length(1000)
+                .linger_time(IggyDuration::from_str("5ms").unwrap())
+        })
         .partitioning(Partitioning::balanced())
         .build();
     producer.init().await?;
diff --git a/core/sdk/Cargo.toml b/core/sdk/Cargo.toml
index c763af12..ebc8f1b4 100644
--- a/core/sdk/Cargo.toml
+++ b/core/sdk/Cargo.toml
@@ -48,6 +48,7 @@ futures = { workspace = true }
 futures-util = { workspace = true }
 iggy_binary_protocol = { workspace = true }
 iggy_common = { workspace = true }
+num_cpus = "1.16.0"
 quinn = { workspace = true }
 reqwest = { workspace = true }
 reqwest-middleware = { workspace = true }
@@ -59,3 +60,6 @@ tokio-rustls = { workspace = true }
 tracing = { workspace = true }
 trait-variant = { workspace = true }
 webpki-roots = { workspace = true }
+
+[dev-dependencies]
+mockall = { workspace = true }
diff --git a/core/sdk/src/clients/mod.rs b/core/sdk/src/clients/mod.rs
index be854f1b..e50bc443 100644
--- a/core/sdk/src/clients/mod.rs
+++ b/core/sdk/src/clients/mod.rs
@@ -32,6 +32,10 @@ pub mod consumer;
 pub mod consumer_builder;
 pub mod producer;
 pub mod producer_builder;
+pub mod producer_config;
+pub mod producer_dispatcher;
+pub mod producer_error_callback;
+pub mod producer_sharding;
 
 const ORDERING: std::sync::atomic::Ordering = 
std::sync::atomic::Ordering::SeqCst;
-const MAX_BATCH_SIZE: usize = 1000000;
+const MAX_BATCH_LENGTH: usize = 1000000;
diff --git a/core/sdk/src/clients/producer.rs b/core/sdk/src/clients/producer.rs
index f3706e3a..58f38e2b 100644
--- a/core/sdk/src/clients/producer.rs
+++ b/core/sdk/src/clients/producer.rs
@@ -15,8 +15,11 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-use super::{MAX_BATCH_SIZE, ORDERING};
+use super::ORDERING;
 
+use crate::clients::producer_builder::SendMode;
+use crate::clients::producer_config::SyncConfig;
+use crate::clients::producer_dispatcher::ProducerDispatcher;
 use bytes::Bytes;
 use futures_util::StreamExt;
 use iggy_binary_protocol::Client;
@@ -26,27 +29,37 @@ use iggy_common::{
     IggyError, IggyExpiry, IggyMessage, IggyTimestamp, MaxTopicSize, 
Partitioner, Partitioning,
 };
 use std::sync::Arc;
+use std::sync::atomic::Ordering;
 use std::sync::atomic::{AtomicBool, AtomicU64};
 use std::time::Duration;
 use tokio::time::{Interval, sleep};
 use tracing::{error, info, trace, warn};
 
-unsafe impl Send for IggyProducer {}
-unsafe impl Sync for IggyProducer {}
+#[cfg(test)]
+use mockall::automock;
 
-pub struct IggyProducer {
-    initialized: bool,
+#[cfg_attr(test, automock)]
+pub trait ProducerCoreBackend: Send + Sync + 'static {
+    fn send_internal(
+        &self,
+        stream: &Identifier,
+        topic: &Identifier,
+        msgs: Vec<IggyMessage>,
+        partitioning: Option<Arc<Partitioning>>,
+    ) -> impl Future<Output = Result<(), IggyError>> + Send;
+}
+
+pub struct ProducerCore {
+    initialized: AtomicBool,
     can_send: Arc<AtomicBool>,
     client: Arc<IggySharedMut<Box<dyn Client>>>,
     stream_id: Arc<Identifier>,
     stream_name: String,
     topic_id: Arc<Identifier>,
     topic_name: String,
-    batch_length: Option<usize>,
     partitioning: Option<Arc<Partitioning>>,
     encryptor: Option<Arc<EncryptorKind>>,
     partitioner: Option<Arc<dyn Partitioner>>,
-    linger_time_micros: u64,
     create_stream_if_not_exists: bool,
     create_topic_if_not_exists: bool,
     topic_partitions_count: u32,
@@ -54,74 +67,15 @@ pub struct IggyProducer {
     topic_message_expiry: IggyExpiry,
     topic_max_size: MaxTopicSize,
     default_partitioning: Arc<Partitioning>,
-    can_send_immediately: bool,
     last_sent_at: Arc<AtomicU64>,
     send_retries_count: Option<u32>,
     send_retries_interval: Option<IggyDuration>,
+    sync_config: Option<SyncConfig>,
 }
 
-impl IggyProducer {
-    #[allow(clippy::too_many_arguments)]
-    pub(crate) fn new(
-        client: IggySharedMut<Box<dyn Client>>,
-        stream: Identifier,
-        stream_name: String,
-        topic: Identifier,
-        topic_name: String,
-        batch_length: Option<usize>,
-        partitioning: Option<Partitioning>,
-        encryptor: Option<Arc<EncryptorKind>>,
-        partitioner: Option<Arc<dyn Partitioner>>,
-        interval: Option<IggyDuration>,
-        create_stream_if_not_exists: bool,
-        create_topic_if_not_exists: bool,
-        topic_partitions_count: u32,
-        topic_replication_factor: Option<u8>,
-        topic_message_expiry: IggyExpiry,
-        topic_max_size: MaxTopicSize,
-        send_retries_count: Option<u32>,
-        send_retries_interval: Option<IggyDuration>,
-    ) -> Self {
-        Self {
-            initialized: false,
-            client: Arc::new(client),
-            can_send: Arc::new(AtomicBool::new(true)),
-            stream_id: Arc::new(stream),
-            stream_name,
-            topic_id: Arc::new(topic),
-            topic_name,
-            batch_length,
-            partitioning: partitioning.map(Arc::new),
-            encryptor,
-            partitioner,
-            linger_time_micros: interval.map_or(0, |i| i.as_micros()),
-            create_stream_if_not_exists,
-            create_topic_if_not_exists,
-            topic_partitions_count,
-            topic_replication_factor,
-            topic_message_expiry,
-            topic_max_size,
-            default_partitioning: Arc::new(Partitioning::balanced()),
-            can_send_immediately: interval.is_none(),
-            last_sent_at: Arc::new(AtomicU64::new(0)),
-            send_retries_count,
-            send_retries_interval,
-        }
-    }
-
-    pub fn stream(&self) -> &Identifier {
-        &self.stream_id
-    }
-
-    pub fn topic(&self) -> &Identifier {
-        &self.topic_id
-    }
-
-    /// Initializes the producer by subscribing to diagnostic events, creating 
the stream and topic if they do not exist etc.
-    ///
-    /// Note: This method must be invoked before producing messages.
-    pub async fn init(&mut self) -> Result<(), IggyError> {
-        if self.initialized {
+impl ProducerCore {
+    pub async fn init(&self) -> Result<(), IggyError> {
+        if self.initialized.load(Ordering::SeqCst) {
             return Ok(());
         }
 
@@ -179,7 +133,9 @@ impl IggyProducer {
                 .await?;
         }
 
-        self.initialized = true;
+        let _ = self
+            .initialized
+            .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst);
         info!("Producer has been initialized for stream: {stream_id} and 
topic: {topic_id}.");
         Ok(())
     }
@@ -221,171 +177,6 @@ impl IggyProducer {
         });
     }
 
-    pub async fn send(&self, messages: Vec<IggyMessage>) -> Result<(), 
IggyError> {
-        if messages.is_empty() {
-            trace!("No messages to send.");
-            return Ok(());
-        }
-
-        if self.can_send_immediately {
-            return self
-                .send_immediately(&self.stream_id, &self.topic_id, messages, 
None)
-                .await;
-        }
-
-        self.send_buffered(
-            self.stream_id.clone(),
-            self.topic_id.clone(),
-            messages,
-            None,
-        )
-        .await
-    }
-
-    pub async fn send_one(&self, message: IggyMessage) -> Result<(), 
IggyError> {
-        self.send(vec![message]).await
-    }
-
-    pub async fn send_with_partitioning(
-        &self,
-        messages: Vec<IggyMessage>,
-        partitioning: Option<Arc<Partitioning>>,
-    ) -> Result<(), IggyError> {
-        if messages.is_empty() {
-            trace!("No messages to send.");
-            return Ok(());
-        }
-
-        if self.can_send_immediately {
-            return self
-                .send_immediately(&self.stream_id, &self.topic_id, messages, 
partitioning)
-                .await;
-        }
-
-        self.send_buffered(
-            self.stream_id.clone(),
-            self.topic_id.clone(),
-            messages,
-            partitioning,
-        )
-        .await
-    }
-
-    pub async fn send_to(
-        &self,
-        stream: Arc<Identifier>,
-        topic: Arc<Identifier>,
-        messages: Vec<IggyMessage>,
-        partitioning: Option<Arc<Partitioning>>,
-    ) -> Result<(), IggyError> {
-        if messages.is_empty() {
-            trace!("No messages to send.");
-            return Ok(());
-        }
-
-        if self.can_send_immediately {
-            return self
-                .send_immediately(&self.stream_id, &self.topic_id, messages, 
partitioning)
-                .await;
-        }
-
-        self.send_buffered(stream, topic, messages, partitioning)
-            .await
-    }
-
-    async fn send_buffered(
-        &self,
-        stream: Arc<Identifier>,
-        topic: Arc<Identifier>,
-        mut messages: Vec<IggyMessage>,
-        partitioning: Option<Arc<Partitioning>>,
-    ) -> Result<(), IggyError> {
-        self.encrypt_messages(&mut messages)?;
-        let partitioning = self.get_partitioning(&stream, &topic, &messages, 
partitioning)?;
-        let batch_length = self.batch_length.unwrap_or(MAX_BATCH_SIZE);
-        let batches = messages.chunks_mut(batch_length);
-        let mut current_batch = 1;
-        let batches_count = batches.len();
-        for batch in batches {
-            if self.linger_time_micros > 0 {
-                Self::wait_before_sending(
-                    self.linger_time_micros,
-                    self.last_sent_at.load(ORDERING),
-                )
-                .await;
-            }
-
-            let messages_count = batch.len();
-            trace!(
-                "Sending {messages_count} messages 
({current_batch}/{batches_count} batch(es))..."
-            );
-            self.last_sent_at
-                .store(IggyTimestamp::now().into(), ORDERING);
-            self.try_send_messages(&self.stream_id, &self.topic_id, 
&partitioning, batch)
-                .await?;
-            trace!("Sent {messages_count} messages 
({current_batch}/{batches_count} batch(es)).");
-            current_batch += 1;
-        }
-        Ok(())
-    }
-
-    async fn send_immediately(
-        &self,
-        stream: &Identifier,
-        topic: &Identifier,
-        mut messages: Vec<IggyMessage>,
-        partitioning: Option<Arc<Partitioning>>,
-    ) -> Result<(), IggyError> {
-        trace!("No batch size specified, sending messages immediately.");
-        self.encrypt_messages(&mut messages)?;
-        let partitioning = self.get_partitioning(stream, topic, &messages, 
partitioning)?;
-        let batch_length = self.batch_length.unwrap_or(MAX_BATCH_SIZE);
-        if messages.len() <= batch_length {
-            self.last_sent_at
-                .store(IggyTimestamp::now().into(), ORDERING);
-            self.try_send_messages(stream, topic, &partitioning, &mut messages)
-                .await?;
-            return Ok(());
-        }
-
-        for batch in messages.chunks_mut(batch_length) {
-            self.last_sent_at
-                .store(IggyTimestamp::now().into(), ORDERING);
-            self.try_send_messages(stream, topic, &partitioning, batch)
-                .await?;
-        }
-        Ok(())
-    }
-
-    async fn wait_before_sending(interval: u64, last_sent_at: u64) {
-        if interval == 0 {
-            return;
-        }
-
-        let now: u64 = IggyTimestamp::now().into();
-        let elapsed = now - last_sent_at;
-        if elapsed >= interval {
-            trace!("No need to wait before sending messages. {now} - 
{last_sent_at} = {elapsed}");
-            return;
-        }
-
-        let remaining = interval - elapsed;
-        trace!(
-            "Waiting for {remaining} microseconds before sending messages... 
{interval} - {elapsed} = {remaining}"
-        );
-        sleep(Duration::from_micros(remaining)).await;
-    }
-
-    fn encrypt_messages(&self, messages: &mut [IggyMessage]) -> Result<(), 
IggyError> {
-        if let Some(encryptor) = &self.encryptor {
-            for message in messages {
-                message.payload = 
Bytes::from(encryptor.encrypt(&message.payload)?);
-                message.header.payload_length = message.payload.len() as u32;
-            }
-        }
-        Ok(())
-    }
-
     async fn try_send_messages(
         &self,
         stream: &Identifier,
@@ -505,6 +296,16 @@ impl IggyProducer {
         }
     }
 
+    fn encrypt_messages(&self, messages: &mut [IggyMessage]) -> Result<(), 
IggyError> {
+        if let Some(encryptor) = &self.encryptor {
+            for message in messages {
+                message.payload = 
Bytes::from(encryptor.encrypt(&message.payload)?);
+                message.header.payload_length = message.payload.len() as u32;
+            }
+        }
+        Ok(())
+    }
+
     fn get_partitioning(
         &self,
         stream: &Identifier,
@@ -525,4 +326,254 @@ impl IggyProducer {
             }))
         }
     }
+
+    async fn wait_before_sending(interval: u64, last_sent_at: u64) {
+        if interval == 0 {
+            return;
+        }
+
+        let now: u64 = IggyTimestamp::now().into();
+        let elapsed = now - last_sent_at;
+        if elapsed >= interval {
+            trace!("No need to wait before sending messages. {now} - 
{last_sent_at} = {elapsed}");
+            return;
+        }
+
+        let remaining = interval - elapsed;
+        trace!(
+            "Waiting for {remaining} microseconds before sending messages... 
{interval} - {elapsed} = {remaining}"
+        );
+        sleep(Duration::from_micros(remaining)).await;
+    }
+}
+
+impl ProducerCoreBackend for ProducerCore {
+    async fn send_internal(
+        &self,
+        stream: &Identifier,
+        topic: &Identifier,
+        mut msgs: Vec<IggyMessage>,
+        partitioning: Option<Arc<Partitioning>>,
+    ) -> Result<(), IggyError> {
+        if msgs.is_empty() {
+            return Ok(());
+        }
+
+        if let Err(err) = self.encrypt_messages(&mut msgs) {
+            return Err(IggyError::ProducerSendFailed {
+                cause: err.to_string(),
+                failed: Arc::new(msgs),
+            });
+        }
+
+        let part = match self.get_partitioning(stream, topic, &msgs, 
partitioning.clone()) {
+            Ok(p) => p,
+            Err(err) => {
+                return Err(IggyError::ProducerSendFailed {
+                    cause: err.to_string(),
+                    failed: Arc::new(msgs),
+                });
+            }
+        };
+
+        match &self.sync_config {
+            Some(cfg) => {
+                let linger_time_micros = match cfg.linger_time {
+                    Some(t) => t.as_micros(),
+                    None => 0,
+                };
+                if linger_time_micros > 0 {
+                    Self::wait_before_sending(linger_time_micros, 
self.last_sent_at.load(ORDERING))
+                        .await;
+                }
+
+                let max = cfg.batch_length;
+                let mut index = 0;
+                while index < msgs.len() {
+                    let end = (index + max).min(msgs.len());
+                    let chunk = &mut msgs[index..end];
+
+                    if let Err(err) = self.try_send_messages(stream, topic, 
&part, chunk).await {
+                        let failed_tail = msgs.split_off(index);
+                        return Err(IggyError::ProducerSendFailed {
+                            cause: err.to_string(),
+                            failed: Arc::new(failed_tail),
+                        });
+                    }
+                    self.last_sent_at
+                        .store(IggyTimestamp::now().into(), ORDERING);
+                    index = end;
+                }
+            }
+            // background send on
+            _ => {
+                self.try_send_messages(stream, topic, &part, &mut msgs)
+                    .await
+                    .map_err(|err| IggyError::ProducerSendFailed {
+                        cause: err.to_string(),
+                        failed: Arc::new(msgs),
+                    })?;
+                self.last_sent_at
+                    .store(IggyTimestamp::now().into(), ORDERING);
+            }
+        }
+
+        Ok(())
+    }
+}
+
+unsafe impl Send for IggyProducer {}
+unsafe impl Sync for IggyProducer {}
+
+pub struct IggyProducer {
+    core: Arc<ProducerCore>,
+    dispatcher: Option<ProducerDispatcher>,
+}
+
+impl IggyProducer {
+    #[allow(clippy::too_many_arguments)]
+    pub(crate) fn new(
+        client: IggySharedMut<Box<dyn Client>>,
+        stream: Identifier,
+        stream_name: String,
+        topic: Identifier,
+        topic_name: String,
+        partitioning: Option<Partitioning>,
+        encryptor: Option<Arc<EncryptorKind>>,
+        partitioner: Option<Arc<dyn Partitioner>>,
+        create_stream_if_not_exists: bool,
+        create_topic_if_not_exists: bool,
+        topic_partitions_count: u32,
+        topic_replication_factor: Option<u8>,
+        topic_message_expiry: IggyExpiry,
+        topic_max_size: MaxTopicSize,
+        send_retries_count: Option<u32>,
+        send_retries_interval: Option<IggyDuration>,
+        mode: SendMode,
+    ) -> Self {
+        let core = Arc::new(ProducerCore {
+            initialized: AtomicBool::new(true),
+            client: Arc::new(client),
+            can_send: Arc::new(AtomicBool::new(true)),
+            stream_id: Arc::new(stream),
+            stream_name,
+            topic_id: Arc::new(topic),
+            topic_name,
+            partitioning: partitioning.map(Arc::new),
+            encryptor,
+            partitioner,
+            create_stream_if_not_exists,
+            create_topic_if_not_exists,
+            topic_partitions_count,
+            topic_replication_factor,
+            topic_message_expiry,
+            topic_max_size,
+            default_partitioning: Arc::new(Partitioning::balanced()),
+            last_sent_at: Arc::new(AtomicU64::new(0)),
+            send_retries_count,
+            send_retries_interval,
+            sync_config: match mode {
+                SendMode::Sync(ref cfg) => Some(cfg.clone()),
+                _ => None,
+            },
+        });
+        let dispatcher = match mode {
+            SendMode::Background(cfg) => 
Some(ProducerDispatcher::new(core.clone(), cfg)),
+            _ => None,
+        };
+
+        Self { core, dispatcher }
+    }
+
+    pub fn stream(&self) -> &Identifier {
+        &self.core.stream_id
+    }
+
+    pub fn topic(&self) -> &Identifier {
+        &self.core.topic_id
+    }
+
+    /// Initializes the producer by subscribing to diagnostic events, creating 
the stream and topic if they do not exist etc.
+    ///
+    /// Note: This method must be invoked before producing messages.
+    pub async fn init(&self) -> Result<(), IggyError> {
+        self.core.init().await
+    }
+
+    pub async fn send(&self, messages: Vec<IggyMessage>) -> Result<(), 
IggyError> {
+        if messages.is_empty() {
+            trace!("No messages to send.");
+            return Ok(());
+        }
+
+        let stream_id = self.core.stream_id.clone();
+        let topic_id = self.core.topic_id.clone();
+
+        match &self.dispatcher {
+            Some(disp) => disp.dispatch(messages, stream_id, topic_id, 
None).await,
+            None => {
+                self.core
+                    .send_internal(&stream_id, &topic_id, messages, None)
+                    .await
+            }
+        }
+    }
+
+    pub async fn send_one(&self, message: IggyMessage) -> Result<(), 
IggyError> {
+        self.send(vec![message]).await
+    }
+
+    pub async fn send_with_partitioning(
+        &self,
+        messages: Vec<IggyMessage>,
+        partitioning: Option<Arc<Partitioning>>,
+    ) -> Result<(), IggyError> {
+        if messages.is_empty() {
+            trace!("No messages to send.");
+            return Ok(());
+        }
+
+        let stream_id = self.core.stream_id.clone();
+        let topic_id = self.core.topic_id.clone();
+
+        match &self.dispatcher {
+            Some(disp) => {
+                disp.dispatch(messages, stream_id, topic_id, partitioning)
+                    .await
+            }
+            None => {
+                self.core
+                    .send_internal(&stream_id, &topic_id, messages, 
partitioning)
+                    .await
+            }
+        }
+    }
+
+    pub async fn send_to(
+        &self,
+        stream: Arc<Identifier>,
+        topic: Arc<Identifier>,
+        messages: Vec<IggyMessage>,
+        partitioning: Option<Arc<Partitioning>>,
+    ) -> Result<(), IggyError> {
+        if messages.is_empty() {
+            trace!("No messages to send.");
+            return Ok(());
+        }
+
+        match &self.dispatcher {
+            Some(disp) => disp.dispatch(messages, stream, topic, 
partitioning).await,
+            None => {
+                self.core
+                    .send_internal(&stream, &topic, messages, partitioning)
+                    .await
+            }
+        }
+    }
+
+    pub async fn shutdown(self) {
+        if let Some(disp) = self.dispatcher {
+            disp.shutdown().await;
+        }
+    }
 }
diff --git a/core/sdk/src/clients/producer_builder.rs 
b/core/sdk/src/clients/producer_builder.rs
index 76ecfeea..431f94ee 100644
--- a/core/sdk/src/clients/producer_builder.rs
+++ b/core/sdk/src/clients/producer_builder.rs
@@ -15,27 +15,234 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use super::MAX_BATCH_SIZE;
+use super::MAX_BATCH_LENGTH;
+use crate::clients::producer_config::{BackgroundConfig, BackpressureMode, 
SyncConfig};
+use crate::clients::producer_error_callback::ErrorCallback;
+use crate::clients::producer_error_callback::LogErrorCallback;
+use crate::clients::producer_sharding::{BalancedSharding, Sharding};
 use crate::prelude::IggyProducer;
 use iggy_binary_protocol::Client;
 use iggy_common::locking::IggySharedMut;
 use iggy_common::{
-    EncryptorKind, Identifier, IggyDuration, IggyExpiry, MaxTopicSize, 
Partitioner, Partitioning,
+    EncryptorKind, Identifier, IggyByteSize, IggyDuration, IggyExpiry, 
MaxTopicSize, Partitioner,
+    Partitioning,
 };
 use std::sync::Arc;
 
-#[derive(Debug)]
+pub struct BackgroundBuilder {
+    num_shards: Option<usize>,
+    batch_size: Option<usize>,
+    batch_length: Option<usize>,
+    failure_mode: Option<BackpressureMode>,
+    max_buffer_size: Option<IggyByteSize>,
+    linger_time: Option<IggyDuration>,
+    max_in_flight: Option<usize>,
+
+    error_callback: Box<dyn ErrorCallback>,
+    sharding: Box<dyn Sharding>,
+}
+
+impl Default for BackgroundBuilder {
+    fn default() -> Self {
+        let num_shards = default_shard_count();
+        BackgroundBuilder {
+            num_shards: Some(num_shards),
+            sharding: Box::new(BalancedSharding::default()),
+            error_callback: Box::new(LogErrorCallback),
+            batch_size: Some(1_048_576),
+            batch_length: Some(1000),
+            failure_mode: Some(BackpressureMode::Block),
+            max_buffer_size: Some(IggyByteSize::from(32 * 1_048_576)),
+            linger_time: Some(IggyDuration::from(1000)),
+            max_in_flight: Some(num_shards * num_shards),
+        }
+    }
+}
+
+impl BackgroundBuilder {
+    /// Sets the number of messages to batch before sending them, can be 
combined with `interval`.
+    pub fn batch_length(self, batch_length: u32) -> Self {
+        Self {
+            batch_length: if batch_length == 0 {
+                None
+            } else {
+                Some(batch_length.min(MAX_BATCH_LENGTH as u32) as usize)
+            },
+            ..self
+        }
+    }
+
+    /// Clears the batch size.
+    pub fn without_batch_length(self) -> Self {
+        Self {
+            batch_length: None,
+            ..self
+        }
+    }
+
+    /// Sets the interval between sending the messages, can be combined with 
`batch_length`.
+    pub fn linger_time(self, interval: IggyDuration) -> Self {
+        Self {
+            linger_time: Some(interval),
+            ..self
+        }
+    }
+
+    /// Clears the interval.
+    pub fn without_linger_time(self) -> Self {
+        Self {
+            linger_time: None,
+            ..self
+        }
+    }
+
+    /// Sets the number of shards (background workers).
+    pub fn num_shards(self, value: usize) -> Self {
+        Self {
+            num_shards: Some(value),
+            ..self
+        }
+    }
+
+    /// Sets the maximum size of a batch in bytes.
+    pub fn batch_size(self, value: usize) -> Self {
+        Self {
+            batch_size: Some(value),
+            ..self
+        }
+    }
+
+    /// Sets the sharding strategy.
+    /// You can pass a custom implementation that implements the `Sharding` 
trait.
+    pub fn sharding(self, sharding: Box<dyn Sharding>) -> Self {
+        Self { sharding, ..self }
+    }
+
+    /// Sets the maximum buffer size for all in-flight messages (in bytes).
+    pub fn max_buffer_size(self, value: IggyByteSize) -> Self {
+        Self {
+            max_buffer_size: Some(value),
+            ..self
+        }
+    }
+
+    /// Sets the failure mode behavior (e.g., block, fail immediately, 
timeout).
+    pub fn failure_mode(self, mode: BackpressureMode) -> Self {
+        Self {
+            failure_mode: Some(mode),
+            ..self
+        }
+    }
+
+    /// Sets the error callback for handling background sending errors.
+    pub fn error_callback(self, callback: Box<dyn ErrorCallback>) -> Self {
+        Self {
+            error_callback: callback,
+            ..self
+        }
+    }
+
+    /// Sets the maximum number of in-flight batches/messages.
+    pub fn max_in_flight(self, value: usize) -> Self {
+        Self {
+            max_in_flight: Some(value),
+            ..self
+        }
+    }
+
+    pub fn build(self) -> BackgroundConfig {
+        BackgroundConfig {
+            num_shards: self.num_shards.unwrap_or(8),
+            batch_size: self.batch_size,
+            batch_length: self.batch_length,
+            failure_mode: self.failure_mode.unwrap_or(BackpressureMode::Block),
+            max_buffer_size: self.max_buffer_size,
+            linger_time: self.linger_time.unwrap_or(IggyDuration::from(1000)),
+            error_callback: Arc::new(self.error_callback),
+            sharding: self.sharding,
+            max_in_flight: self.max_in_flight,
+        }
+    }
+}
+
+pub struct SyncBuilder {
+    batch_length: Option<usize>,
+    linger_time: Option<IggyDuration>,
+}
+
+impl Default for SyncBuilder {
+    fn default() -> Self {
+        Self {
+            batch_length: Some(1000),
+            linger_time: Some(IggyDuration::from(1000)),
+        }
+    }
+}
+
+impl SyncBuilder {
+    /// Sets the number of messages to batch before sending them, can be 
combined with `interval`.
+    pub fn batch_length(self, batch_length: u32) -> Self {
+        Self {
+            batch_length: if batch_length == 0 {
+                None
+            } else {
+                Some(batch_length.min(MAX_BATCH_LENGTH as u32) as usize)
+            },
+            ..self
+        }
+    }
+
+    /// Clears the batch size.
+    pub fn without_batch_length(self) -> Self {
+        Self {
+            batch_length: None,
+            ..self
+        }
+    }
+
+    /// Sets the interval between sending the messages, can be combined with 
`batch_length`.
+    pub fn linger_time(self, interval: IggyDuration) -> Self {
+        Self {
+            linger_time: Some(interval),
+            ..self
+        }
+    }
+
+    /// Clears the interval.
+    pub fn without_linger_time(self) -> Self {
+        Self {
+            linger_time: None,
+            ..self
+        }
+    }
+
+    pub fn build(self) -> SyncConfig {
+        SyncConfig {
+            batch_length: self.batch_length.unwrap_or(MAX_BATCH_LENGTH),
+            linger_time: self.linger_time,
+        }
+    }
+}
+
+pub enum SendMode {
+    Sync(SyncConfig),
+    Background(BackgroundConfig),
+}
+
+impl Default for SendMode {
+    fn default() -> Self {
+        SendMode::Sync(SyncBuilder::default().build())
+    }
+}
+
 pub struct IggyProducerBuilder {
     client: IggySharedMut<Box<dyn Client>>,
     stream: Identifier,
     stream_name: String,
     topic: Identifier,
     topic_name: String,
-    batch_length: Option<usize>,
-    partitioning: Option<Partitioning>,
     encryptor: Option<Arc<EncryptorKind>>,
     partitioner: Option<Arc<dyn Partitioner>>,
-    linger_time: Option<IggyDuration>,
     create_stream_if_not_exists: bool,
     create_topic_if_not_exists: bool,
     topic_partitions_count: u32,
@@ -44,6 +251,8 @@ pub struct IggyProducerBuilder {
     send_retries_interval: Option<IggyDuration>,
     topic_message_expiry: IggyExpiry,
     topic_max_size: MaxTopicSize,
+    partitioning: Option<Partitioning>,
+    mode: SendMode,
 }
 
 impl IggyProducerBuilder {
@@ -63,11 +272,9 @@ impl IggyProducerBuilder {
             stream_name,
             topic,
             topic_name,
-            batch_length: Some(1000),
             partitioning: None,
             encryptor,
             partitioner,
-            linger_time: Some(IggyDuration::from(1000)),
             create_stream_if_not_exists: true,
             create_topic_if_not_exists: true,
             topic_partitions_count: 1,
@@ -76,6 +283,7 @@ impl IggyProducerBuilder {
             topic_max_size: MaxTopicSize::ServerDefault,
             send_retries_count: Some(3),
             send_retries_interval: Some(IggyDuration::ONE_SECOND),
+            mode: SendMode::default(),
         }
     }
 
@@ -89,42 +297,6 @@ impl IggyProducerBuilder {
         Self { topic, ..self }
     }
 
-    /// Sets the number of messages to batch before sending them, can be 
combined with `interval`.
-    pub fn batch_length(self, batch_length: u32) -> Self {
-        Self {
-            batch_length: if batch_length == 0 {
-                None
-            } else {
-                Some(batch_length.min(MAX_BATCH_SIZE as u32) as usize)
-            },
-            ..self
-        }
-    }
-
-    /// Clears the batch size.
-    pub fn without_batch_length(self) -> Self {
-        Self {
-            batch_length: None,
-            ..self
-        }
-    }
-
-    /// Sets the interval between sending the messages, can be combined with 
`batch_length`.
-    pub fn linger_time(self, interval: IggyDuration) -> Self {
-        Self {
-            linger_time: Some(interval),
-            ..self
-        }
-    }
-
-    /// Clears the interval.
-    pub fn without_linger_time(self) -> Self {
-        Self {
-            linger_time: None,
-            ..self
-        }
-    }
-
     /// Sets the encryptor for encrypting the messages' payloads.
     pub fn encryptor(self, encryptor: Arc<EncryptorKind>) -> Self {
         Self {
@@ -226,9 +398,36 @@ impl IggyProducerBuilder {
         }
     }
 
-    /// Builds the producer.
+    /// Configures the producer to use synchronous (immediate) sending mode.
+    ///
+    /// In sync mode, messages are sent immediately on `.send()` without 
background buffering.
+    ///
+    /// # Arguments
+    /// * `f` - A closure that modifies the `SyncBuilder` configuration.
+    pub fn sync<F>(mut self, f: F) -> Self
+    where
+        F: FnOnce(SyncBuilder) -> SyncBuilder,
+    {
+        let cfg = f(SyncBuilder::default()).build();
+        self.mode = SendMode::Sync(cfg);
+        self
+    }
+
+    /// Configures the producer to use background (asynchronous) sending mode.
     ///
-    /// Note: After building the producer, `init()` must be invoked before 
producing messages.
+    /// In background mode, messages are buffered and sent in batches via 
background tasks.
+    ///
+    /// # Arguments
+    /// * `f` - A closure that modifies the `BackgroundBuilder` configuration.
+    pub fn background<F>(mut self, f: F) -> Self
+    where
+        F: FnOnce(BackgroundBuilder) -> BackgroundBuilder,
+    {
+        let cfg = f(BackgroundBuilder::default()).build();
+        self.mode = SendMode::Background(cfg);
+        self
+    }
+
     pub fn build(self) -> IggyProducer {
         IggyProducer::new(
             self.client,
@@ -236,11 +435,9 @@ impl IggyProducerBuilder {
             self.stream_name,
             self.topic,
             self.topic_name,
-            self.batch_length,
             self.partitioning,
             self.encryptor,
             self.partitioner,
-            self.linger_time,
             self.create_stream_if_not_exists,
             self.create_topic_if_not_exists,
             self.topic_partitions_count,
@@ -249,6 +446,12 @@ impl IggyProducerBuilder {
             self.topic_max_size,
             self.send_retries_count,
             self.send_retries_interval,
+            self.mode,
         )
     }
 }
+
+fn default_shard_count() -> usize {
+    let cpus = num_cpus::get();
+    cpus.clamp(2, 16)
+}
diff --git a/core/sdk/src/clients/producer_config.rs 
b/core/sdk/src/clients/producer_config.rs
new file mode 100644
index 00000000..0ba0e6ef
--- /dev/null
+++ b/core/sdk/src/clients/producer_config.rs
@@ -0,0 +1,53 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use std::sync::Arc;
+
+use iggy_common::{IggyByteSize, IggyDuration};
+
+use crate::clients::producer_error_callback::ErrorCallback;
+use crate::clients::producer_sharding::Sharding;
+
+#[derive(Debug, Clone)]
+/// Determines how the `send_messages` API should behave when problem is 
encountered
+pub enum BackpressureMode {
+    /// Block until the send succeeds
+    Block,
+    /// Block with a timeout, after which the send fails
+    BlockWithTimeout(IggyDuration),
+    /// Fail immediately without retrying
+    FailImmediately,
+}
+
+#[derive(Debug)]
+pub struct BackgroundConfig {
+    pub num_shards: usize,
+    pub batch_size: Option<usize>,
+    pub batch_length: Option<usize>,
+    pub failure_mode: BackpressureMode,
+    pub max_buffer_size: Option<IggyByteSize>,
+    pub max_in_flight: Option<usize>,
+    pub linger_time: IggyDuration,
+    pub error_callback: Arc<Box<dyn ErrorCallback + Send + Sync>>,
+    pub sharding: Box<dyn Sharding + Send + Sync>,
+}
+
+#[derive(Clone)]
+pub struct SyncConfig {
+    pub batch_length: usize,
+    pub linger_time: Option<IggyDuration>,
+}
diff --git a/core/sdk/src/clients/producer_dispatcher.rs 
b/core/sdk/src/clients/producer_dispatcher.rs
new file mode 100644
index 00000000..c56df46d
--- /dev/null
+++ b/core/sdk/src/clients/producer_dispatcher.rs
@@ -0,0 +1,447 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use crate::clients::producer::ProducerCoreBackend;
+use crate::clients::producer_config::{BackgroundConfig, BackpressureMode};
+use crate::clients::producer_error_callback::ErrorCtx;
+use crate::clients::producer_sharding::{Shard, ShardMessage, 
ShardMessageWithPermits};
+use futures::FutureExt;
+use iggy_common::{Identifier, IggyError, IggyMessage, Partitioning, Sizeable};
+use std::sync::Arc;
+use std::sync::atomic::{AtomicBool, Ordering};
+use tokio::sync::Semaphore;
+
+use tokio::{sync::Notify, task::JoinHandle};
+
+pub struct ProducerDispatcher {
+    shards: Vec<Shard>,
+    config: Arc<BackgroundConfig>,
+    closed: AtomicBool,
+    slots_permit: Arc<Semaphore>,
+    bytes_permit: Arc<Semaphore>,
+    shutdown_notify: Arc<Notify>,
+    _join_handle: JoinHandle<()>,
+}
+
+impl ProducerDispatcher {
+    pub fn new(core: Arc<impl ProducerCoreBackend>, config: BackgroundConfig) 
-> Self {
+        let mut shards = Vec::with_capacity(config.num_shards);
+        let config = Arc::new(config);
+        let shutdown_notify = Arc::new(Notify::new());
+
+        let (err_tx, err_rx) = flume::unbounded::<ErrorCtx>();
+        let err_callback = config.error_callback.clone();
+        let shutdown_notify_clone = shutdown_notify.clone();
+        let handle = tokio::spawn(async move {
+            loop {
+                tokio::select! {
+                    maybe_message = err_rx.recv_async() => {
+                        match maybe_message {
+                            Ok(ctx) => {
+                                if let Err(panic) = 
std::panic::AssertUnwindSafe(err_callback.call(ctx))
+                                    .catch_unwind()
+                                    .await
+                                {
+                                    tracing::error!("error_callback panicked: 
{:?}", panic);
+                                }
+                            }
+                            Err(_) => break
+                        }
+                    }
+                    _ = shutdown_notify_clone.notified() => {
+                        tracing::debug!("error-callback worker finished");
+                        break
+                    }
+                }
+            }
+        });
+
+        for _ in 0..config.num_shards {
+            shards.push(Shard::new(core.clone(), config.clone(), 
err_tx.clone()));
+        }
+
+        let bytes_permit = match config.max_buffer_size {
+            Some(val) => val.as_bytes_u32() as usize,
+            None => usize::MAX,
+        };
+        let slot_permit = match config.max_in_flight {
+            Some(val) => val,
+            None => usize::MAX,
+        };
+
+        Self {
+            shards,
+            config,
+            closed: AtomicBool::new(false),
+            bytes_permit: Arc::new(Semaphore::new(bytes_permit)),
+            slots_permit: Arc::new(Semaphore::new(slot_permit)),
+            shutdown_notify,
+            _join_handle: handle,
+        }
+    }
+
+    pub async fn dispatch(
+        &self,
+        messages: Vec<IggyMessage>,
+        stream: Arc<Identifier>,
+        topic: Arc<Identifier>,
+        partitioning: Option<Arc<Partitioning>>,
+    ) -> Result<(), IggyError> {
+        if self.closed.load(Ordering::Relaxed) {
+            return Err(IggyError::ProducerClosed);
+        }
+
+        let shard_message = ShardMessage {
+            messages,
+            stream,
+            topic,
+            partitioning,
+        };
+        let batch_bytes = shard_message.get_size_bytes();
+
+        let permit_bytes = match self
+            .bytes_permit
+            .clone()
+            .try_acquire_many_owned(batch_bytes.as_bytes_u32())
+        {
+            Ok(perm) => perm,
+            Err(_) => match self.config.failure_mode {
+                BackpressureMode::FailImmediately => {
+                    return Err(IggyError::BackgroundSendBufferOverflow);
+                }
+                BackpressureMode::Block => self
+                    .bytes_permit
+                    .clone()
+                    .acquire_many_owned(batch_bytes.as_bytes_u32())
+                    .await
+                    .map_err(|_| IggyError::BackgroundSendError)?,
+                BackpressureMode::BlockWithTimeout(timeout_dur) => {
+                    match tokio::time::timeout(
+                        timeout_dur.get_duration(),
+                        self.bytes_permit
+                            .clone()
+                            .acquire_many_owned(batch_bytes.as_bytes_u32()),
+                    )
+                    .await
+                    {
+                        Ok(Ok(perm)) => perm,
+                        Ok(Err(_)) => return 
Err(IggyError::BackgroundSendError),
+                        Err(_) => return Err(IggyError::BackgroundSendTimeout),
+                    }
+                }
+            },
+        };
+
+        let permit_slot = match self.slots_permit.clone().try_acquire_owned() {
+            Ok(perm) => perm,
+            Err(_) => match self.config.failure_mode {
+                BackpressureMode::FailImmediately => {
+                    drop(permit_bytes);
+                    return Err(IggyError::BackgroundSendError);
+                }
+                BackpressureMode::Block => match 
self.slots_permit.clone().acquire_owned().await {
+                    Ok(perm) => perm,
+                    Err(_) => {
+                        drop(permit_bytes);
+                        return Err(IggyError::BackgroundSendError);
+                    }
+                },
+                BackpressureMode::BlockWithTimeout(timeout_dur) => {
+                    match tokio::time::timeout(
+                        timeout_dur.get_duration(),
+                        self.slots_permit.clone().acquire_owned(),
+                    )
+                    .await
+                    {
+                        Ok(Ok(perm)) => perm,
+                        Ok(Err(_)) => {
+                            drop(permit_bytes);
+                            return Err(IggyError::BackgroundSendError);
+                        }
+                        Err(_) => {
+                            drop(permit_bytes);
+                            return Err(IggyError::BackgroundSendTimeout);
+                        }
+                    }
+                }
+            },
+        };
+
+        let shard_ix = self.config.sharding.pick_shard(
+            self.shards.len(),
+            &shard_message.messages,
+            &shard_message.stream,
+            &shard_message.topic,
+        );
+        debug_assert!(shard_ix < self.shards.len());
+        let shard = &self.shards[shard_ix];
+
+        shard
+            .send(ShardMessageWithPermits::new(
+                shard_message,
+                permit_bytes,
+                permit_slot,
+            ))
+            .await
+    }
+
+    pub async fn shutdown(&self) {
+        if self.closed.swap(true, Ordering::Relaxed) {
+            return;
+        }
+
+        for shard in &self.shards {
+            shard.shutdown().await;
+        }
+
+        self.shutdown_notify.notify_waiters();
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use std::pin::Pin;
+    use std::sync::atomic::AtomicUsize;
+    use std::time::Duration;
+
+    use bytes::Bytes;
+    use tokio::time::sleep;
+
+    use crate::clients::producer::MockProducerCoreBackend;
+    use crate::clients::producer_builder::BackgroundBuilder;
+    use crate::clients::producer_error_callback::ErrorCallback;
+    use crate::clients::producer_sharding::Sharding;
+
+    use super::*;
+
+    fn dummy_identifier() -> Arc<Identifier> {
+        Arc::new(Identifier::numeric(1).unwrap())
+    }
+
+    fn dummy_message(size: usize) -> IggyMessage {
+        IggyMessage::builder()
+            .payload(Bytes::from(vec![0u8; size]))
+            .build()
+            .unwrap()
+    }
+
+    #[tokio::test]
+    async fn test_dispatch_successful() {
+        let mut mock = MockProducerCoreBackend::new();
+        mock.expect_send_internal()
+            .times(1)
+            .returning(|_, _, _, _| Box::pin(async { Ok(()) }));
+
+        let msg = dummy_message(5);
+        let config = BackgroundBuilder::default()
+            .max_buffer_size(100.into())
+            .max_in_flight(10)
+            .num_shards(1)
+            .build();
+
+        let dispatcher = ProducerDispatcher::new(Arc::new(mock), config);
+
+        let result = dispatcher
+            .dispatch(vec![msg], dummy_identifier(), dummy_identifier(), None)
+            .await;
+
+        sleep(Duration::from_millis(100)).await;
+        assert!(result.is_ok());
+    }
+
+    #[tokio::test]
+    async fn test_dispatch_fails_on_buffer_overflow_immediate() {
+        let mock = MockProducerCoreBackend::new();
+
+        let msg = dummy_message(200);
+        let config = BackgroundBuilder::default()
+            .max_buffer_size(100.into())
+            .failure_mode(BackpressureMode::FailImmediately)
+            .num_shards(1)
+            .build();
+
+        let dispatcher = ProducerDispatcher::new(Arc::new(mock), config);
+
+        let result = dispatcher
+            .dispatch(vec![msg], dummy_identifier(), dummy_identifier(), None)
+            .await;
+
+        assert!(matches!(
+            result,
+            Err(IggyError::BackgroundSendBufferOverflow)
+        ));
+    }
+
+    #[tokio::test]
+    async fn test_dispatch_times_out_on_block_with_timeout() {
+        let mock = MockProducerCoreBackend::new();
+
+        let msg = dummy_message(200);
+        let config = BackgroundBuilder::default()
+            .max_buffer_size(100.into())
+            .max_in_flight(1)
+            .failure_mode(BackpressureMode::BlockWithTimeout(
+                Duration::from_millis(50).into(),
+            ))
+            .num_shards(1)
+            .build();
+
+        let dispatcher = ProducerDispatcher::new(Arc::new(mock), config);
+
+        let _ = dispatcher
+            .bytes_permit
+            .clone()
+            .acquire_many_owned(100)
+            .await;
+
+        let result = dispatcher
+            .dispatch(vec![msg], dummy_identifier(), dummy_identifier(), None)
+            .await;
+
+        assert!(matches!(result, Err(IggyError::BackgroundSendTimeout)));
+    }
+
+    #[tokio::test]
+    async fn test_dispatch_waits_and_succeeds_on_block_mode() {
+        let mut mock = MockProducerCoreBackend::new();
+        mock.expect_send_internal()
+            .times(1)
+            .returning(|_, _, _, _| Box::pin(async { Ok(()) }));
+
+        let msg = ShardMessage {
+            stream: dummy_identifier(),
+            topic: dummy_identifier(),
+            messages: vec![dummy_message(5)],
+            partitioning: None,
+        };
+
+        let config = BackgroundBuilder::default()
+            .max_buffer_size(msg.get_size_bytes())
+            .max_in_flight(1)
+            .failure_mode(BackpressureMode::Block)
+            .num_shards(1)
+            .build();
+
+        let dispatcher = ProducerDispatcher::new(Arc::new(mock), config);
+
+        let _block = dispatcher
+            .bytes_permit
+            .clone()
+            .acquire_many_owned(msg.get_size_bytes().as_bytes_u32())
+            .await
+            .unwrap();
+
+        let msg_clone = ShardMessage {
+            stream: msg.stream.clone(),
+            topic: msg.topic.clone(),
+            messages: msg.messages,
+            partitioning: msg.partitioning.clone(),
+        };
+
+        tokio::spawn(async move {
+            tokio::time::sleep(Duration::from_millis(100)).await;
+            drop(_block);
+        });
+
+        let result = dispatcher
+            .dispatch(
+                msg_clone.messages,
+                msg_clone.topic,
+                msg_clone.stream,
+                msg_clone.partitioning,
+            )
+            .await;
+
+        tokio::time::sleep(Duration::from_millis(200)).await;
+        assert!(result.is_ok());
+    }
+
+    #[derive(Clone, Debug)]
+    struct TestSharding {
+        called: Arc<AtomicUsize>,
+    }
+
+    impl Sharding for TestSharding {
+        fn pick_shard(
+            &self,
+            num_shards: usize,
+            _messages: &[IggyMessage],
+            _stream: &Identifier,
+            _topic: &Identifier,
+        ) -> usize {
+            self.called.fetch_add(1, Ordering::SeqCst);
+            num_shards - 1
+        }
+    }
+
+    #[derive(Clone, Debug)]
+    struct TestErrorCallback {
+        called: Arc<AtomicUsize>,
+    }
+
+    impl ErrorCallback for TestErrorCallback {
+        fn call(&self, _ctx: ErrorCtx) -> Pin<Box<dyn Future<Output = ()> + 
Send + 'static>> {
+            self.called.fetch_add(1, Ordering::SeqCst);
+            Box::pin(async {})
+        }
+    }
+
+    #[tokio::test]
+    async fn test_custom_sharding_and_error_callback() {
+        let mut mock = MockProducerCoreBackend::new();
+        mock.expect_send_internal()
+            .times(1)
+            .returning(|_, _, _, _| {
+                Box::pin(async {
+                    Err(IggyError::ProducerSendFailed {
+                        cause: "some_error".to_string(),
+                        failed: Arc::new(vec![dummy_message(10)]),
+                    })
+                })
+            });
+
+        let sharding_called = Arc::new(AtomicUsize::new(0));
+        let error_called = Arc::new(AtomicUsize::new(0));
+
+        let config = BackgroundBuilder::default()
+            .num_shards(1)
+            .error_callback(Box::new(TestErrorCallback {
+                called: error_called.clone(),
+            }))
+            .sharding(Box::new(TestSharding {
+                called: sharding_called.clone(),
+            }))
+            .build();
+
+        let dispatcher = ProducerDispatcher::new(Arc::new(mock), config);
+
+        let result = dispatcher
+            .dispatch(
+                vec![dummy_message(10)],
+                dummy_identifier(),
+                dummy_identifier(),
+                None,
+            )
+            .await;
+
+        tokio::time::sleep(Duration::from_millis(100)).await;
+
+        assert!(result.is_ok());
+        assert_eq!(sharding_called.load(Ordering::SeqCst), 1);
+        assert_eq!(error_called.load(Ordering::SeqCst), 1);
+    }
+}
diff --git a/core/sdk/src/clients/producer_error_callback.rs 
b/core/sdk/src/clients/producer_error_callback.rs
new file mode 100644
index 00000000..59a6dc45
--- /dev/null
+++ b/core/sdk/src/clients/producer_error_callback.rs
@@ -0,0 +1,67 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use std::fmt::Debug;
+use std::pin::Pin;
+use std::sync::Arc;
+
+use iggy_common::{Identifier, IggyMessage, Partitioning};
+use tracing::error;
+
+#[derive(Debug)]
+pub struct ErrorCtx {
+    pub cause: String,
+    pub stream: Arc<Identifier>,
+    pub topic: Arc<Identifier>,
+    pub partitioning: Option<Arc<Partitioning>>,
+    pub messages: Arc<Vec<IggyMessage>>,
+}
+
+/// A trait for handling background sending errors.
+///
+/// This is used when a message batch fails to send in an asynchronous 
background task.
+/// Implementors can define custom logic such as logging, retrying, alerting, 
etc.
+pub trait ErrorCallback: Send + Sync + Debug + 'static {
+    fn call(&self, ctx: ErrorCtx) -> Pin<Box<dyn Future<Output = ()> + Send + 
'static>>;
+}
+
+/// Default implementation of [`ErrorCallback`] that logs the error using 
`tracing::error!`.
+///
+/// Logs include stream, topic, optional partitioning, number of messages, and 
the cause.
+#[derive(Debug, Default)]
+pub struct LogErrorCallback;
+
+impl ErrorCallback for LogErrorCallback {
+    fn call(&self, ctx: ErrorCtx) -> Pin<Box<dyn Future<Output = ()> + Send + 
'static>> {
+        Box::pin(async move {
+            let partitioning = ctx
+                .partitioning
+                .as_ref()
+                .map(|p| format!("{:?}", p))
+                .unwrap_or_else(|| "None".to_string());
+
+            error!(
+                cause = ctx.cause,
+                stream = %ctx.stream,
+                topic = %ctx.topic,
+                partitioning = %partitioning,
+                num_messages = ctx.messages.len(),
+                "Failed to send messages in background task",
+            );
+        })
+    }
+}
diff --git a/core/sdk/src/clients/producer_sharding.rs 
b/core/sdk/src/clients/producer_sharding.rs
new file mode 100644
index 00000000..ab3757ec
--- /dev/null
+++ b/core/sdk/src/clients/producer_sharding.rs
@@ -0,0 +1,437 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use std::sync::Arc;
+use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
+
+use iggy_common::{Identifier, IggyByteSize, IggyError, IggyMessage, 
Partitioning, Sizeable};
+use tokio::sync::{Notify, OwnedSemaphorePermit};
+use tokio::task::JoinHandle;
+use tracing::error;
+
+use crate::clients::producer::ProducerCoreBackend;
+use crate::clients::producer_config::BackgroundConfig;
+use crate::clients::producer_error_callback::ErrorCtx;
+
+/// A strategy for distributing messages across shards.
+///
+/// Implementors of this trait define how to choose a shard for a given batch 
of messages.
+/// This allows customizing message routing based on message content, 
stream/topic identifiers,
+/// or round-robin load balancing.
+pub trait Sharding: Send + Sync + std::fmt::Debug + 'static {
+    fn pick_shard(
+        &self,
+        num_shards: usize,
+        messages: &[IggyMessage],
+        stream: &Identifier,
+        topic: &Identifier,
+    ) -> usize;
+}
+
+/// A simple round-robin sharding strategy.
+/// Distributes messages evenly across all shards by incrementing an atomic 
counter.
+#[derive(Default, Debug)]
+pub struct BalancedSharding {
+    counter: AtomicUsize,
+}
+
+impl Sharding for BalancedSharding {
+    /// Picks the next shard in a round-robin fashion.
+    fn pick_shard(
+        &self,
+        num_shards: usize,
+        _: &[IggyMessage],
+        _: &Identifier,
+        _: &Identifier,
+    ) -> usize {
+        self.counter.fetch_add(1, Ordering::Relaxed) % num_shards
+    }
+}
+
+#[derive(Debug)]
+pub struct ShardMessage {
+    pub stream: Arc<Identifier>,
+    pub topic: Arc<Identifier>,
+    pub messages: Vec<IggyMessage>,
+    pub partitioning: Option<Arc<Partitioning>>,
+}
+
+impl Sizeable for ShardMessage {
+    fn get_size_bytes(&self) -> IggyByteSize {
+        let mut total = IggyByteSize::new(0);
+        total += self.stream.get_size_bytes();
+        total += self.topic.get_size_bytes();
+        for msg in &self.messages {
+            total += msg.get_size_bytes();
+        }
+        total
+    }
+}
+
+pub struct ShardMessageWithPermits {
+    pub inner: ShardMessage,
+    _bytes_permit: Option<OwnedSemaphorePermit>,
+    _slot_permit: Option<OwnedSemaphorePermit>,
+}
+
+impl ShardMessageWithPermits {
+    pub fn new(
+        msg: ShardMessage,
+        permit_bytes: OwnedSemaphorePermit,
+        permit_slot: OwnedSemaphorePermit,
+    ) -> Self {
+        Self {
+            inner: msg,
+            _bytes_permit: Some(permit_bytes),
+            _slot_permit: Some(permit_slot),
+        }
+    }
+}
+
+pub struct Shard {
+    tx: flume::Sender<ShardMessageWithPermits>,
+    shutdown_notify: Arc<Notify>,
+    closed: Arc<AtomicBool>,
+    _handle: JoinHandle<()>,
+}
+
+impl Shard {
+    pub fn new(
+        core: Arc<impl ProducerCoreBackend>,
+        config: Arc<BackgroundConfig>,
+        err_sender: flume::Sender<ErrorCtx>,
+    ) -> Self {
+        let (tx, rx) = flume::bounded::<ShardMessageWithPermits>(256);
+        let shutdown_notify = Arc::new(Notify::new());
+        let closed = Arc::new(AtomicBool::new(false));
+
+        let shutdown_notify_clone = shutdown_notify.clone();
+        let closed_clone = closed.clone();
+        let handle = tokio::spawn(async move {
+            let mut buffer = Vec::new();
+            let mut buffer_bytes = 0;
+            let mut last_flush = tokio::time::Instant::now();
+
+            loop {
+                let deadline = last_flush + config.linger_time.get_duration();
+                tokio::select! {
+                    maybe_msg = rx.recv_async() => {
+                        match maybe_msg {
+                            Ok(msg) => {
+                                buffer_bytes += 
msg.inner.get_size_bytes().as_bytes_usize();
+                                buffer.push(msg);
+
+                                let exceed_batch_len = 
config.batch_length.is_some_and(|len| buffer.len() >= len);
+                                let exceed_batch_size = 
config.batch_size.is_some_and(|size| buffer_bytes >= size);
+
+                                if exceed_batch_len || exceed_batch_size {
+                                    Self::flush_buffer(&core, &mut buffer, 
&mut buffer_bytes, &err_sender).await;
+                                    last_flush = tokio::time::Instant::now();
+                                }
+                            }
+                            Err(_) => break,
+                        }
+                    }
+                    _ = tokio::time::sleep_until(deadline) => {
+                        if !buffer.is_empty() {
+                            Self::flush_buffer(&core, &mut buffer, &mut 
buffer_bytes, &err_sender).await;
+                            last_flush = tokio::time::Instant::now();
+                        }
+                    }
+                    _ = shutdown_notify_clone.notified() => {
+                        closed_clone.store(true, Ordering::Release);
+                        if !buffer.is_empty() {
+                            Self::flush_buffer(&core, &mut buffer, &mut 
buffer_bytes, &err_sender).await;
+                        }
+                        break;
+                    }
+                }
+            }
+        });
+
+        Self {
+            tx,
+            shutdown_notify,
+            closed,
+            _handle: handle,
+        }
+    }
+
+    async fn flush_buffer(
+        core: &Arc<impl ProducerCoreBackend>,
+        buffer: &mut Vec<ShardMessageWithPermits>,
+        buffer_bytes: &mut usize,
+        err_sender: &flume::Sender<ErrorCtx>,
+    ) {
+        for msg in buffer.drain(..) {
+            let result = core
+                .send_internal(
+                    &msg.inner.stream,
+                    &msg.inner.topic,
+                    msg.inner.messages,
+                    msg.inner.partitioning.clone(),
+                )
+                .await;
+
+            if let Err(err) = result {
+                if let IggyError::ProducerSendFailed { failed, cause } = &err {
+                    let ctx = ErrorCtx {
+                        cause: cause.clone(),
+                        stream: msg.inner.stream,
+                        topic: msg.inner.topic,
+                        partitioning: msg.inner.partitioning,
+                        messages: failed.clone(),
+                    };
+                    let _ = err_sender.send_async(ctx).await;
+                } else {
+                    tracing::error!("background send failed: {err}");
+                }
+            }
+        }
+        *buffer_bytes = 0;
+    }
+
+    pub(crate) async fn send(&self, message: ShardMessageWithPermits) -> 
Result<(), IggyError> {
+        if self.closed.load(Ordering::Acquire) {
+            return Err(IggyError::ProducerClosed);
+        }
+
+        self.tx.send_async(message).await.map_err(|e| {
+            error!("Failed to send_async: {e}");
+            IggyError::BackgroundSendError
+        })
+    }
+
+    pub(crate) async fn shutdown(&self) {
+        self.shutdown_notify.notify_waiters();
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use std::time::Duration;
+
+    use bytes::Bytes;
+    use iggy_common::IggyDuration;
+    use tokio::{
+        sync::{Notify, Semaphore},
+        time::sleep,
+    };
+
+    use super::*;
+    use crate::clients::{producer::MockProducerCoreBackend, 
producer_builder::BackgroundBuilder};
+
+    fn dummy_identifier() -> Arc<Identifier> {
+        Arc::new(Identifier::numeric(1).unwrap())
+    }
+
+    fn dummy_message(size: usize) -> IggyMessage {
+        IggyMessage::builder()
+            .payload(Bytes::from(vec![0u8; size]))
+            .build()
+            .unwrap()
+    }
+
+    #[tokio::test]
+    async fn test_shard_flushes_by_batch_length() {
+        let mut mock = MockProducerCoreBackend::new();
+        mock.expect_send_internal()
+            .times(10)
+            .returning(|_, _, _, _| Box::pin(async { Ok(()) }));
+
+        let bb = BackgroundBuilder::default()
+            .batch_length(10)
+            .linger_time(IggyDuration::new_from_secs(1))
+            .batch_size(10_000);
+        let config = Arc::new(bb.build());
+
+        let (permit_bytes, permit_slot) = (
+            Arc::new(Semaphore::new(100_000)),
+            Arc::new(Semaphore::new(100)),
+        );
+
+        let shard = Shard::new(Arc::new(mock), config, flume::unbounded().0);
+
+        for _ in 0..10 {
+            let message = ShardMessage {
+                stream: dummy_identifier(),
+                topic: dummy_identifier(),
+                messages: vec![dummy_message(1)],
+                partitioning: None,
+            };
+            let wrapped = ShardMessageWithPermits::new(
+                message,
+                permit_bytes.clone().acquire_many_owned(1).await.unwrap(),
+                permit_slot.clone().acquire_owned().await.unwrap(),
+            );
+            shard.send(wrapped).await.unwrap();
+        }
+
+        sleep(Duration::from_millis(100)).await;
+    }
+
+    #[tokio::test]
+    async fn test_shard_flushes_by_batch_size() {
+        let mut mock = MockProducerCoreBackend::new();
+        mock.expect_send_internal()
+            .times(1)
+            .returning(|_, _, _, _| Box::pin(async { Ok(()) }));
+
+        let bb = BackgroundBuilder::default()
+            .batch_length(1000)
+            .linger_time(IggyDuration::new_from_secs(1))
+            .batch_size(10_000);
+        let config = Arc::new(bb.build());
+
+        let (permit_bytes, permit_slot) = (
+            Arc::new(Semaphore::new(10_000)),
+            Arc::new(Semaphore::new(100)),
+        );
+
+        let shard = Shard::new(Arc::new(mock), config, flume::unbounded().0);
+
+        let message = ShardMessage {
+            stream: dummy_identifier(),
+            topic: dummy_identifier(),
+            messages: vec![dummy_message(10_000)],
+            partitioning: None,
+        };
+        let wrapped = ShardMessageWithPermits::new(
+            message,
+            permit_bytes
+                .clone()
+                .acquire_many_owned(10_000)
+                .await
+                .unwrap(),
+            permit_slot.clone().acquire_owned().await.unwrap(),
+        );
+        shard.send(wrapped).await.unwrap();
+
+        sleep(Duration::from_millis(100)).await;
+    }
+
+    #[tokio::test]
+    async fn test_shard_flushes_by_timeout() {
+        let mut mock = MockProducerCoreBackend::new();
+        mock.expect_send_internal()
+            .times(1)
+            .returning(|_, _, _, _| Box::pin(async { Ok(()) }));
+
+        let bb = BackgroundBuilder::default()
+            .batch_length(10)
+            .linger_time(IggyDuration::new(Duration::from_millis(50)))
+            .batch_size(10_000);
+        let config = Arc::new(bb.build());
+
+        let (permit_bytes, permit_slot) = (
+            Arc::new(Semaphore::new(10_000)),
+            Arc::new(Semaphore::new(100)),
+        );
+
+        let shard = Shard::new(Arc::new(mock), config, flume::unbounded().0);
+
+        let message = ShardMessage {
+            stream: dummy_identifier(),
+            topic: dummy_identifier(),
+            messages: vec![dummy_message(1)],
+            partitioning: None,
+        };
+        let wrapped = ShardMessageWithPermits::new(
+            message,
+            permit_bytes.clone().acquire_many_owned(1).await.unwrap(),
+            permit_slot.clone().acquire_owned().await.unwrap(),
+        );
+        shard.send(wrapped).await.unwrap();
+
+        sleep(Duration::from_millis(100)).await;
+    }
+
+    #[tokio::test]
+    async fn test_shard_forwards_error() {
+        let mut mock = MockProducerCoreBackend::new();
+        let error = IggyError::ProducerSendFailed {
+            failed: Arc::new(vec![dummy_message(1)]),
+            cause: "test error".into(),
+        };
+
+        mock.expect_send_internal().returning(move |_, _, _, _| {
+            let err = error.clone();
+            Box::pin(async move { Err(err) })
+        });
+
+        let (err_tx, err_rx) = flume::unbounded();
+        let bb = BackgroundBuilder::default();
+        let config = Arc::new(bb.build());
+
+        let (permit_bytes, permit_slot) = (
+            Arc::new(Semaphore::new(10_000)),
+            Arc::new(Semaphore::new(100)),
+        );
+
+        let shard = Shard::new(Arc::new(mock), config, err_tx);
+
+        let message = ShardMessage {
+            stream: dummy_identifier(),
+            topic: dummy_identifier(),
+            messages: vec![dummy_message(1)],
+            partitioning: None,
+        };
+        let wrapped = ShardMessageWithPermits::new(
+            message,
+            permit_bytes.clone().acquire_many_owned(1).await.unwrap(),
+            permit_slot.clone().acquire_owned().await.unwrap(),
+        );
+        shard.send(wrapped).await.unwrap();
+
+        let err_ctx = err_rx.recv_async().await.unwrap();
+        assert_eq!(err_ctx.cause, "test error");
+        assert_eq!(err_ctx.messages.len(), 1);
+    }
+
+    #[tokio::test]
+    async fn test_shard_send_error_on_closed_channel() {
+        let (tx, rx) = flume::bounded::<ShardMessageWithPermits>(1);
+        drop(rx);
+
+        let shard = Shard {
+            tx,
+            shutdown_notify: Arc::new(Notify::new()),
+            closed: Arc::new(AtomicBool::new(false)),
+            _handle: tokio::spawn(async {}),
+        };
+
+        let (permit_bytes, permit_slot) = (
+            Arc::new(Semaphore::new(10_000)),
+            Arc::new(Semaphore::new(100)),
+        );
+
+        let message = ShardMessage {
+            stream: dummy_identifier(),
+            topic: dummy_identifier(),
+            messages: vec![dummy_message(1)],
+            partitioning: None,
+        };
+        let wrapped = ShardMessageWithPermits::new(
+            message,
+            permit_bytes.clone().acquire_many_owned(1).await.unwrap(),
+            permit_slot.clone().acquire_owned().await.unwrap(),
+        );
+
+        let result = shard.send(wrapped).await;
+        assert!(matches!(result, Err(IggyError::BackgroundSendError)));
+    }
+}
diff --git a/core/sdk/src/stream_builder/build/build_iggy_producer.rs 
b/core/sdk/src/stream_builder/build/build_iggy_producer.rs
index 9b9a1cdf..afe8d122 100644
--- a/core/sdk/src/stream_builder/build/build_iggy_producer.rs
+++ b/core/sdk/src/stream_builder/build/build_iggy_producer.rs
@@ -56,8 +56,6 @@ pub(crate) async fn build_iggy_producer(
     trace!("Build iggy producer");
     let mut builder = client
         .producer(stream, topic)?
-        .batch_length(batch_length)
-        .linger_time(linger_time)
         .partitioning(partitioning)
         .create_stream_if_not_exists()
         .send_retries(send_retries, send_retries_interval)
@@ -66,14 +64,15 @@ pub(crate) async fn build_iggy_producer(
             topic_replication_factor,
             IggyExpiry::ServerDefault,
             MaxTopicSize::ServerDefault,
-        );
+        )
+        .sync(|b| b.batch_length(batch_length).linger_time(linger_time));
 
     if let Some(encryptor) = config.encryptor() {
         builder = builder.encryptor(encryptor);
     }
 
     trace!("Initialize iggy producer");
-    let mut producer = builder.build();
+    let producer = builder.build();
     producer.init().await.map_err(|err| {
         error!("Failed to initialize consumer: {err}");
         err

Reply via email to