This is an automated email from the ASF dual-hosted git repository.

bashirbekov pushed a commit to branch feat/add-background-send
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit 1570c5d033ad15db8c8365c2d18437801af04615
Author: haze518 <[email protected]>
AuthorDate: Wed May 21 10:10:06 2025 +0600

    del
---
 core/common/src/error/iggy_error.rs                |   8 +
 core/common/src/types/message/iggy_message.rs      |   2 +-
 core/integration/tests/examples/mod.rs             |   1 +
 .../tests/examples/test_new_publisher.rs           |  87 ++++
 core/sdk/src/clients/mod.rs                        |   1 +
 core/sdk/src/clients/producer.rs                   | 450 +++++++++++++++------
 core/sdk/src/clients/producer_builder.rs           |  11 +
 core/sdk/src/clients/send_mode.rs                  |  27 ++
 8 files changed, 458 insertions(+), 129 deletions(-)

diff --git a/core/common/src/error/iggy_error.rs 
b/core/common/src/error/iggy_error.rs
index b8076e2e..5e812288 100644
--- a/core/common/src/error/iggy_error.rs
+++ b/core/common/src/error/iggy_error.rs
@@ -361,6 +361,14 @@ pub enum IggyError {
     TooSmallMessage(u32, u32) = 4037,
     #[error("Cannot sed messages due to client disconnection")]
     CannotSendMessagesDueToClientDisconnection = 4050,
+    #[error("Background send error")]
+    BackgroundSendError = 4051,
+    #[error("Background send timeout")]
+    BackgroundSendTimeout = 4052,
+    #[error("Background send buffer is full")]
+    BackgroundSendBufferFull = 4053,
+    #[error("Background worker disconnected")]
+    BackgroundWorkerDisconnected = 4054,
     #[error("Invalid offset: {0}")]
     InvalidOffset(u64) = 4100,
     #[error("Consumer group with ID: {0} for topic with ID: {1} was not 
found.")]
diff --git a/core/common/src/types/message/iggy_message.rs 
b/core/common/src/types/message/iggy_message.rs
index 54c3a841..ddd4076f 100644
--- a/core/common/src/types/message/iggy_message.rs
+++ b/core/common/src/types/message/iggy_message.rs
@@ -105,7 +105,7 @@ pub const MAX_USER_HEADERS_SIZE: u32 = 100 * 1000;
 ///     .build()
 ///     .unwrap();
 /// ```
-#[derive(Debug, PartialEq)]
+#[derive(Debug, PartialEq, Default)]
 pub struct IggyMessage {
     /// Message metadata
     pub header: IggyMessageHeader,
diff --git a/core/integration/tests/examples/mod.rs 
b/core/integration/tests/examples/mod.rs
index 015eaa87..8d960e66 100644
--- a/core/integration/tests/examples/mod.rs
+++ b/core/integration/tests/examples/mod.rs
@@ -20,6 +20,7 @@ mod test_basic;
 mod test_getting_started;
 mod test_message_envelope;
 mod test_message_headers;
+mod test_new_publisher;
 
 use assert_cmd::Command;
 use iggy::clients::client::IggyClient;
diff --git a/core/integration/tests/examples/test_new_publisher.rs 
b/core/integration/tests/examples/test_new_publisher.rs
new file mode 100644
index 00000000..ac8af5dc
--- /dev/null
+++ b/core/integration/tests/examples/test_new_publisher.rs
@@ -0,0 +1,87 @@
+// temporary file, do not forget to delete!
+
+use std::str::FromStr;
+use std::sync::Arc;
+
+use iggy::clients::send_mode::{BackgroundConfig, BackpressureMode, SendMode};
+use iggy::prelude::defaults::*;
+use iggy::prelude::*;
+use iggy::{clients::client::IggyClient, prelude::TcpClient};
+use iggy_common::TcpClientConfig;
+use integration::test_server::{IpAddrKind, TestServer};
+
+#[tokio::test]
+async fn test_new_publisher() {
+    // new
+    let mut server = TestServer::new(None, true, None, IpAddrKind::V4);
+    server.start();
+
+    let tcp_client_config = TcpClientConfig {
+        server_address: server.get_raw_tcp_addr().unwrap(),
+        ..TcpClientConfig::default()
+    };
+    let client = 
Box::new(TcpClient::create(Arc::new(tcp_client_config)).unwrap());
+    let client = IggyClient::create(client, None, None);
+
+    // setup
+    client.connect().await.unwrap();
+    let ping_result = client.ping().await;
+    assert!(ping_result.is_ok(), "Failed to ping server");
+
+    client
+        .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD)
+        .await
+        .unwrap();
+    client
+        .create_stream("sample-stream", Some(1))
+        .await
+        .unwrap();
+    client
+        .create_topic(
+            &1.try_into().unwrap(),
+            "sample-topic",
+            1,
+            CompressionAlgorithm::default(),
+            None,
+            None,
+            IggyExpiry::NeverExpire,
+            MaxTopicSize::ServerDefault,
+        )
+        .await
+        .unwrap();
+
+    let mut producer = client
+        .producer("1", "1")
+        .unwrap()
+        .batch_length(10)
+        .send_mode(SendMode::Background(BackgroundConfig{
+            max_in_flight: 10,
+            in_flight_timeout: None,
+            batch_size: None,
+            failure_mode: BackpressureMode::Block,
+        }))
+        .build();
+    
+    producer.init().await.unwrap();
+
+    // produce
+    let mut send_batches = 0;
+
+    let batch_limit = 100;
+    let mut count = 0;
+    loop {
+        if send_batches >= batch_limit {
+            return
+        }
+
+        let mut messages = Vec::new();
+        for _ in 0..10 {
+            let message = 
IggyMessage::from_str(format!("{count}").as_str()).unwrap();
+            messages.push(message);
+            count += 1;
+        }
+
+        producer.send(messages).await.unwrap();
+        send_batches += 1;
+    }
+}
diff --git a/core/sdk/src/clients/mod.rs b/core/sdk/src/clients/mod.rs
index be854f1b..fc872904 100644
--- a/core/sdk/src/clients/mod.rs
+++ b/core/sdk/src/clients/mod.rs
@@ -32,6 +32,7 @@ pub mod consumer;
 pub mod consumer_builder;
 pub mod producer;
 pub mod producer_builder;
+pub mod send_mode;
 
 const ORDERING: std::sync::atomic::Ordering = 
std::sync::atomic::Ordering::SeqCst;
 const MAX_BATCH_SIZE: usize = 1000000;
diff --git a/core/sdk/src/clients/producer.rs b/core/sdk/src/clients/producer.rs
index f3706e3a..93d64322 100644
--- a/core/sdk/src/clients/producer.rs
+++ b/core/sdk/src/clients/producer.rs
@@ -1,3 +1,4 @@
+use super::send_mode::{BackpressureMode, SendMode};
 /* Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -28,6 +29,8 @@ use iggy_common::{
 use std::sync::Arc;
 use std::sync::atomic::{AtomicBool, AtomicU64};
 use std::time::Duration;
+use tokio::sync::Semaphore;
+use tokio::task::JoinHandle;
 use tokio::time::{Interval, sleep};
 use tracing::{error, info, trace, warn};
 
@@ -44,6 +47,7 @@ pub struct IggyProducer {
     topic_name: String,
     batch_length: Option<usize>,
     partitioning: Option<Arc<Partitioning>>,
+    send_mode: Arc<SendMode>,
     encryptor: Option<Arc<EncryptorKind>>,
     partitioner: Option<Arc<dyn Partitioner>>,
     linger_time_micros: u64,
@@ -58,6 +62,11 @@ pub struct IggyProducer {
     last_sent_at: Arc<AtomicU64>,
     send_retries_count: Option<u32>,
     send_retries_interval: Option<IggyDuration>,
+
+    _join_handle: Option<JoinHandle<()>>,
+    sema: Semaphore,
+    failure_mode: BackpressureMode,
+    sender: Option<Arc<flume::Sender<Vec<IggyMessage>>>>,
 }
 
 impl IggyProducer {
@@ -81,6 +90,7 @@ impl IggyProducer {
         topic_max_size: MaxTopicSize,
         send_retries_count: Option<u32>,
         send_retries_interval: Option<IggyDuration>,
+        send_mode: SendMode,
     ) -> Self {
         Self {
             initialized: false,
@@ -92,6 +102,7 @@ impl IggyProducer {
             topic_name,
             batch_length,
             partitioning: partitioning.map(Arc::new),
+            send_mode: Arc::new(send_mode),
             encryptor,
             partitioner,
             linger_time_micros: interval.map_or(0, |i| i.as_micros()),
@@ -106,6 +117,10 @@ impl IggyProducer {
             last_sent_at: Arc::new(AtomicU64::new(0)),
             send_retries_count,
             send_retries_interval,
+            _join_handle: None,
+            sema: Semaphore::new(10),
+            failure_mode: BackpressureMode::Block,
+            sender: None,
         }
     }
 
@@ -179,8 +194,49 @@ impl IggyProducer {
                 .await?;
         }
 
+        let (tx, rx) = flume::unbounded::<Vec<IggyMessage>>();
+        let stream_id = self.stream_id.clone();
+        let topic_id = self.topic_id.clone();
+        let partitioning = self.partitioning.clone();
+        let default_partitioning = self.default_partitioning.clone();
+        let partitioner = self.partitioner.clone();
+        let client = self.client.clone();
+        let send_retries_count = self.send_retries_count.clone();
+        let send_retries_interval = self.send_retries_interval.clone();
+        let can_send = self.can_send.clone();
+
+        let handle = tokio::spawn(async move {
+            while let Ok(mut batch) = rx.recv_async().await {
+                let partitioning = get_partitioning(
+                    &partitioner,
+                    &stream_id,
+                    &topic_id,
+                    &batch,
+                    partitioning.clone(),
+                    &partitioning,
+                    default_partitioning.clone(),
+                )
+                .unwrap();
+                try_send_messages(
+                    client.clone(),
+                    send_retries_count,
+                    send_retries_interval,
+                    can_send.clone(),
+                    &stream_id,
+                    &topic_id,
+                    &partitioning,
+                    &mut batch,
+                ).await.unwrap();
+            }
+        });
+        self._join_handle = Some(handle);
+        self.sender = Some(Arc::new(tx));
         self.initialized = true;
-        info!("Producer has been initialized for stream: {stream_id} and 
topic: {topic_id}.");
+        info!(
+            "Producer has been initialized for stream: {} and topic: {}.",
+            self.stream_id.clone(),
+            self.topic_id.clone()
+        );
         Ok(())
     }
 
@@ -301,7 +357,17 @@ impl IggyProducer {
         partitioning: Option<Arc<Partitioning>>,
     ) -> Result<(), IggyError> {
         self.encrypt_messages(&mut messages)?;
-        let partitioning = self.get_partitioning(&stream, &topic, &messages, 
partitioning)?;
+        let default_partitioning = self.default_partitioning.clone();
+        let partitioner = self.partitioner.clone();
+        let partitioning = get_partitioning(
+            &partitioner,
+            &stream,
+            &topic,
+            &messages,
+            partitioning,
+            &self.partitioning,
+            default_partitioning,
+        )?;
         let batch_length = self.batch_length.unwrap_or(MAX_BATCH_SIZE);
         let batches = messages.chunks_mut(batch_length);
         let mut current_batch = 1;
@@ -321,8 +387,28 @@ impl IggyProducer {
             );
             self.last_sent_at
                 .store(IggyTimestamp::now().into(), ORDERING);
-            self.try_send_messages(&self.stream_id, &self.topic_id, 
&partitioning, batch)
-                .await?;
+
+            let client = self.client.clone();
+            let send_retries_count = self.send_retries_count.clone();
+            let send_retries_interval = self.send_retries_interval.clone();
+            let can_send = self.can_send.clone();
+
+            let sender = self.sender.clone();
+            let send_mode = self.send_mode.clone();
+            try_send_messages_new(
+                BackpressureMode::Block,
+                sender,
+                send_mode,
+                client,
+                send_retries_count,
+                send_retries_interval,
+                can_send,
+                &self.stream_id,
+                &self.topic_id,
+                &partitioning,
+                batch,
+            ).await?;
+
             trace!("Sent {messages_count} messages 
({current_batch}/{batches_count} batch(es)).");
             current_batch += 1;
         }
@@ -338,21 +424,56 @@ impl IggyProducer {
     ) -> Result<(), IggyError> {
         trace!("No batch size specified, sending messages immediately.");
         self.encrypt_messages(&mut messages)?;
-        let partitioning = self.get_partitioning(stream, topic, &messages, 
partitioning)?;
+        let default_partitioning = self.default_partitioning.clone();
+        let partitioner = self.partitioner.clone();
+        let client = self.client.clone();
+        let send_retries_count = self.send_retries_count.clone();
+        let send_retries_interval = self.send_retries_interval.clone();
+        let can_send = self.can_send.clone();
+
+        let partitioning = get_partitioning(
+            &partitioner,
+            &stream,
+            &topic,
+            &messages,
+            partitioning,
+            &self.partitioning,
+            default_partitioning,
+        )?;
         let batch_length = self.batch_length.unwrap_or(MAX_BATCH_SIZE);
         if messages.len() <= batch_length {
             self.last_sent_at
                 .store(IggyTimestamp::now().into(), ORDERING);
-            self.try_send_messages(stream, topic, &partitioning, &mut messages)
-                .await?;
+            try_send_messages(
+                client.clone(),
+                send_retries_count,
+                send_retries_interval,
+                can_send,
+                stream,
+                topic,
+                &partitioning,
+                &mut messages,
+            )
+            .await?;
             return Ok(());
         }
 
         for batch in messages.chunks_mut(batch_length) {
+            let client = client.clone();
+            let can_send = can_send.clone();
             self.last_sent_at
                 .store(IggyTimestamp::now().into(), ORDERING);
-            self.try_send_messages(stream, topic, &partitioning, batch)
-                .await?;
+            try_send_messages(
+                client,
+                send_retries_count,
+                send_retries_interval,
+                can_send,
+                stream,
+                topic,
+                &partitioning,
+                batch,
+            )
+            .await?;
         }
         Ok(())
     }
@@ -385,144 +506,217 @@ impl IggyProducer {
         }
         Ok(())
     }
+}
 
-    async fn try_send_messages(
-        &self,
-        stream: &Identifier,
-        topic: &Identifier,
-        partitioning: &Arc<Partitioning>,
-        messages: &mut [IggyMessage],
-    ) -> Result<(), IggyError> {
-        let client = self.client.read().await;
-        let Some(max_retries) = self.send_retries_count else {
-            return client
-                .send_messages(stream, topic, partitioning, messages)
-                .await;
-        };
+fn get_partitioning(
+    partitioner: &Option<Arc<dyn Partitioner>>,
+    stream: &Identifier,
+    topic: &Identifier,
+    messages: &[IggyMessage],
+    partitioning: Option<Arc<Partitioning>>,
+    producer_partitioning: &Option<Arc<Partitioning>>,
+    default_partitioning: Arc<Partitioning>,
+) -> Result<Arc<Partitioning>, IggyError> {
+    if let Some(partitioner) = partitioner {
+        trace!("Calculating partition id using custom partitioner.");
+        let partition_id = partitioner.calculate_partition_id(stream, topic, 
messages)?;
+        Ok(Arc::new(Partitioning::partition_id(partition_id)))
+    } else {
+        trace!("Using the provided partitioning.");
+        Ok(partitioning.unwrap_or_else(|| {
+            producer_partitioning
+                .clone()
+                .unwrap_or_else(|| default_partitioning.clone())
+        }))
+    }
+}
 
-        if max_retries == 0 {
-            return client
-                .send_messages(stream, topic, partitioning, messages)
-                .await;
+async fn wait_until_connected(
+    can_send: Arc<AtomicBool>,
+    max_retries: u32,
+    stream: &Identifier,
+    topic: &Identifier,
+    timer: &mut Option<Interval>,
+) -> Result<(), IggyError> {
+    let mut retries = 0;
+    while !can_send.load(ORDERING) {
+        retries += 1;
+        if retries > max_retries {
+            error!(
+                "Failed to send messages to topic: {topic}, stream: {stream} \
+                 after {max_retries} retries. Client is disconnected."
+            );
+            return Err(IggyError::CannotSendMessagesDueToClientDisconnection);
         }
 
-        let mut timer = if let Some(interval) = self.send_retries_interval {
-            let mut timer = tokio::time::interval(interval.get_duration());
-            timer.tick().await;
-            Some(timer)
-        } else {
-            None
-        };
+        error!(
+            "Trying to send messages to topic: {topic}, stream: {stream} \
+             but the client is disconnected. Retrying 
{retries}/{max_retries}..."
+        );
 
-        self.wait_until_connected(max_retries, stream, topic, &mut timer)
-            .await?;
-        self.send_with_retries(
-            max_retries,
-            stream,
-            topic,
-            partitioning,
-            messages,
-            &mut timer,
-        )
-        .await
+        if let Some(timer) = timer.as_mut() {
+            trace!(
+                "Waiting for the next retry to send messages to topic: 
{topic}, \
+                 stream: {stream} for disconnected client..."
+            );
+            timer.tick().await;
+        }
     }
+    Ok(())
+}
+
+async fn send_with_retries(
+    client: Arc<IggySharedMut<Box<dyn Client>>>,
+    max_retries: u32,
+    stream: &Identifier,
+    topic: &Identifier,
+    partitioning: &Arc<Partitioning>,
+    messages: &mut [IggyMessage],
+    timer: &mut Option<Interval>,
+) -> Result<(), IggyError> {
+    let client = client.read().await;
+    let mut retries = 0;
+    loop {
+        match client
+            .send_messages(stream, topic, partitioning, messages)
+            .await
+        {
+            Ok(_) => return Ok(()),
+            Err(error) => {
+                retries += 1;
+                if retries > max_retries {
+                    error!(
+                        "Failed to send messages to topic: {topic}, stream: 
{stream} \
+                         after {max_retries} retries. {error}."
+                    );
+                    return Err(error);
+                }
 
-    async fn wait_until_connected(
-        &self,
-        max_retries: u32,
-        stream: &Identifier,
-        topic: &Identifier,
-        timer: &mut Option<Interval>,
-    ) -> Result<(), IggyError> {
-        let mut retries = 0;
-        while !self.can_send.load(ORDERING) {
-            retries += 1;
-            if retries > max_retries {
                 error!(
-                    "Failed to send messages to topic: {topic}, stream: 
{stream} \
-                     after {max_retries} retries. Client is disconnected."
+                    "Failed to send messages to topic: {topic}, stream: 
{stream}. \
+                     {error} Retrying {retries}/{max_retries}..."
                 );
-                return 
Err(IggyError::CannotSendMessagesDueToClientDisconnection);
-            }
 
-            error!(
-                "Trying to send messages to topic: {topic}, stream: {stream} \
-                 but the client is disconnected. Retrying 
{retries}/{max_retries}..."
-            );
-
-            if let Some(timer) = timer.as_mut() {
-                trace!(
-                    "Waiting for the next retry to send messages to topic: 
{topic}, \
-                     stream: {stream} for disconnected client..."
-                );
-                timer.tick().await;
+                if let Some(t) = timer.as_mut() {
+                    trace!(
+                        "Waiting for the next retry to send messages to topic: 
{topic}, \
+                         stream: {stream}..."
+                    );
+                    t.tick().await;
+                }
             }
         }
-        Ok(())
     }
+}
 
-    async fn send_with_retries(
-        &self,
-        max_retries: u32,
-        stream: &Identifier,
-        topic: &Identifier,
-        partitioning: &Arc<Partitioning>,
-        messages: &mut [IggyMessage],
-        timer: &mut Option<Interval>,
-    ) -> Result<(), IggyError> {
-        let client = self.client.read().await;
-        let mut retries = 0;
-        loop {
-            match client
-                .send_messages(stream, topic, partitioning, messages)
-                .await
-            {
-                Ok(_) => return Ok(()),
-                Err(error) => {
-                    retries += 1;
-                    if retries > max_retries {
-                        error!(
-                            "Failed to send messages to topic: {topic}, 
stream: {stream} \
-                             after {max_retries} retries. {error}."
-                        );
-                        return Err(error);
-                    }
+async fn try_send_messages(
+    client: Arc<IggySharedMut<Box<dyn Client>>>,
+    send_retries_count: Option<u32>,
+    send_retries_interval: Option<IggyDuration>,
+    can_send: Arc<AtomicBool>,
+    stream: &Identifier,
+    topic: &Identifier,
+    partitioning: &Arc<Partitioning>,
+    messages: &mut [IggyMessage],
+) -> Result<(), IggyError> {
+    let rw_client = client.read().await;
+    let Some(max_retries) = send_retries_count else {
+        return rw_client
+            .send_messages(stream, topic, partitioning, messages)
+            .await;
+    };
+
+    if max_retries == 0 {
+        return rw_client
+            .send_messages(stream, topic, partitioning, messages)
+            .await;
+    }
 
-                    error!(
-                        "Failed to send messages to topic: {topic}, stream: 
{stream}. \
-                         {error} Retrying {retries}/{max_retries}..."
-                    );
+    let mut timer = if let Some(interval) = send_retries_interval {
+        let mut timer = tokio::time::interval(interval.get_duration());
+        timer.tick().await;
+        Some(timer)
+    } else {
+        None
+    };
+
+    wait_until_connected(can_send.clone(), max_retries, stream, topic, &mut 
timer).await?;
+    send_with_retries(
+        client.clone(),
+        max_retries,
+        stream,
+        topic,
+        partitioning,
+        messages,
+        &mut timer,
+    )
+    .await
+}
+
+async fn try_send_messages_new(
+    backpressure_mode: BackpressureMode,
+    sender: Option<Arc<flume::Sender<Vec<IggyMessage>>>>,
+    send_mode: Arc<SendMode>,
+    client: Arc<IggySharedMut<Box<dyn Client>>>,
+    send_retries_count: Option<u32>,
+    send_retries_interval: Option<IggyDuration>,
+    can_send: Arc<AtomicBool>,
+    stream: &Identifier,
+    topic: &Identifier,
+    partitioning: &Arc<Partitioning>,
+    messages: &mut [IggyMessage],
+) -> Result<(), IggyError> {
+    match &*send_mode {
+        SendMode::Sync => {
+            try_send_messages(
+                client,
+                send_retries_count,
+                send_retries_interval,
+                can_send,
+                stream,
+                topic,
+                partitioning,
+                messages,
+            )
+            .await
+        }
 
-                    if let Some(t) = timer.as_mut() {
-                        trace!(
-                            "Waiting for the next retry to send messages to 
topic: {topic}, \
-                             stream: {stream}..."
-                        );
-                        t.tick().await;
+        SendMode::Background(_cfg) => {
+            let out: Vec<IggyMessage> = messages
+                .iter_mut()
+                .map(|m| std::mem::take(m))
+                .collect();
+
+            match sender.as_ref().unwrap().try_send(out) {
+                Ok(()) => Ok(()),
+
+                Err(flume::TrySendError::Full(out)) => {
+                    match backpressure_mode {
+                        BackpressureMode::Block => {
+                            sender
+                                .unwrap()
+                                .send_async(out)
+                                .await
+                                .map_err(|_| IggyError::BackgroundSendError)
+                        }
+                        BackpressureMode::BlockWithTimeout(timeout) => {
+                            match tokio::time::timeout(timeout.get_duration(), 
sender.unwrap().send_async(out))
+                                .await
+                            {
+                                Ok(Ok(())) => Ok(()),
+                                Ok(Err(_)) | Err(_) => 
Err(IggyError::BackgroundSendTimeout),
+                            }
+                        }
+                        BackpressureMode::FailImmediately => {
+                            Err(IggyError::BackgroundSendBufferFull)
+                        }
                     }
                 }
+                Err(flume::TrySendError::Disconnected(_)) => {
+                    error!("Background worker has shut down.");
+                    Err(IggyError::BackgroundWorkerDisconnected)
+                }
             }
         }
     }
-
-    fn get_partitioning(
-        &self,
-        stream: &Identifier,
-        topic: &Identifier,
-        messages: &[IggyMessage],
-        partitioning: Option<Arc<Partitioning>>,
-    ) -> Result<Arc<Partitioning>, IggyError> {
-        if let Some(partitioner) = &self.partitioner {
-            trace!("Calculating partition id using custom partitioner.");
-            let partition_id = partitioner.calculate_partition_id(stream, 
topic, messages)?;
-            Ok(Arc::new(Partitioning::partition_id(partition_id)))
-        } else {
-            trace!("Using the provided partitioning.");
-            Ok(partitioning.unwrap_or_else(|| {
-                self.partitioning
-                    .clone()
-                    .unwrap_or_else(|| self.default_partitioning.clone())
-            }))
-        }
-    }
 }
diff --git a/core/sdk/src/clients/producer_builder.rs 
b/core/sdk/src/clients/producer_builder.rs
index 76ecfeea..34ffb7e1 100644
--- a/core/sdk/src/clients/producer_builder.rs
+++ b/core/sdk/src/clients/producer_builder.rs
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use super::send_mode::{self, SendMode};
 use super::MAX_BATCH_SIZE;
 use crate::prelude::IggyProducer;
 use iggy_binary_protocol::Client;
@@ -36,6 +37,7 @@ pub struct IggyProducerBuilder {
     encryptor: Option<Arc<EncryptorKind>>,
     partitioner: Option<Arc<dyn Partitioner>>,
     linger_time: Option<IggyDuration>,
+    send_mode: SendMode,
     create_stream_if_not_exists: bool,
     create_topic_if_not_exists: bool,
     topic_partitions_count: u32,
@@ -67,6 +69,7 @@ impl IggyProducerBuilder {
             partitioning: None,
             encryptor,
             partitioner,
+            send_mode: SendMode::default(),
             linger_time: Some(IggyDuration::from(1000)),
             create_stream_if_not_exists: true,
             create_topic_if_not_exists: true,
@@ -125,6 +128,13 @@ impl IggyProducerBuilder {
         }
     }
 
+    pub fn send_mode(self, send_mode: SendMode) -> Self {
+        Self {
+            send_mode,
+            ..self
+        }
+    }
+
     /// Sets the encryptor for encrypting the messages' payloads.
     pub fn encryptor(self, encryptor: Arc<EncryptorKind>) -> Self {
         Self {
@@ -249,6 +259,7 @@ impl IggyProducerBuilder {
             self.topic_max_size,
             self.send_retries_count,
             self.send_retries_interval,
+            self.send_mode,
         )
     }
 }
diff --git a/core/sdk/src/clients/send_mode.rs 
b/core/sdk/src/clients/send_mode.rs
new file mode 100644
index 00000000..98b3abef
--- /dev/null
+++ b/core/sdk/src/clients/send_mode.rs
@@ -0,0 +1,27 @@
+use iggy_common::IggyDuration;
+
+#[derive(Debug, Clone, Default)]
+pub enum SendMode {
+    #[default]
+    Sync,
+    Background(BackgroundConfig),
+}
+
+#[derive(Debug, Clone)]
+/// Determines how the `send_messages` API should behave when problem is 
encountered
+pub enum BackpressureMode {
+    /// Block until the send succeeds
+    Block,
+    /// Block with a timeout, after which the send fails
+    BlockWithTimeout(IggyDuration),
+    /// Fail immediately without retrying
+    FailImmediately,
+}
+
+#[derive(Debug, Clone)]
+pub struct BackgroundConfig {
+    pub max_in_flight: usize,
+    pub in_flight_timeout: Option<IggyDuration>,
+    pub batch_size: Option<usize>,
+    pub failure_mode: BackpressureMode,
+}

Reply via email to