This is an automated email from the ASF dual-hosted git repository.

bashirbekov pushed a commit to branch feat/add-background-send
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit c759e1764eff4f89623c0a2f37ec68dc4621f00f
Author: haze518 <[email protected]>
AuthorDate: Thu May 22 08:53:43 2025 +0600

    del
---
 core/common/src/error/iggy_error.rs                |   8 +
 core/common/src/types/message/iggy_message.rs      |   2 +-
 core/integration/tests/examples/mod.rs             |   1 +
 .../tests/examples/test_new_publisher.rs           | 126 +++++
 core/sdk/src/clients/client.rs                     |   1 +
 core/sdk/src/clients/mod.rs                        |   3 +-
 core/sdk/src/clients/producer.rs                   | 517 +++++++++++++++------
 core/sdk/src/clients/producer_builder.rs           |  27 +-
 core/sdk/src/clients/send_mode.rs                  |  27 ++
 core/sdk/src/prelude.rs                            |   2 +-
 10 files changed, 578 insertions(+), 136 deletions(-)

diff --git a/core/common/src/error/iggy_error.rs 
b/core/common/src/error/iggy_error.rs
index b8076e2e..5e812288 100644
--- a/core/common/src/error/iggy_error.rs
+++ b/core/common/src/error/iggy_error.rs
@@ -361,6 +361,14 @@ pub enum IggyError {
     TooSmallMessage(u32, u32) = 4037,
     #[error("Cannot sed messages due to client disconnection")]
     CannotSendMessagesDueToClientDisconnection = 4050,
+    #[error("Background send error")]
+    BackgroundSendError = 4051,
+    #[error("Background send timeout")]
+    BackgroundSendTimeout = 4052,
+    #[error("Background send buffer is full")]
+    BackgroundSendBufferFull = 4053,
+    #[error("Background worker disconnected")]
+    BackgroundWorkerDisconnected = 4054,
     #[error("Invalid offset: {0}")]
     InvalidOffset(u64) = 4100,
     #[error("Consumer group with ID: {0} for topic with ID: {1} was not 
found.")]
diff --git a/core/common/src/types/message/iggy_message.rs 
b/core/common/src/types/message/iggy_message.rs
index 54c3a841..ddd4076f 100644
--- a/core/common/src/types/message/iggy_message.rs
+++ b/core/common/src/types/message/iggy_message.rs
@@ -105,7 +105,7 @@ pub const MAX_USER_HEADERS_SIZE: u32 = 100 * 1000;
 ///     .build()
 ///     .unwrap();
 /// ```
-#[derive(Debug, PartialEq)]
+#[derive(Debug, PartialEq, Default)]
 pub struct IggyMessage {
     /// Message metadata
     pub header: IggyMessageHeader,
diff --git a/core/integration/tests/examples/mod.rs 
b/core/integration/tests/examples/mod.rs
index 015eaa87..8d960e66 100644
--- a/core/integration/tests/examples/mod.rs
+++ b/core/integration/tests/examples/mod.rs
@@ -20,6 +20,7 @@ mod test_basic;
 mod test_getting_started;
 mod test_message_envelope;
 mod test_message_headers;
+mod test_new_publisher;
 
 use assert_cmd::Command;
 use iggy::clients::client::IggyClient;
diff --git a/core/integration/tests/examples/test_new_publisher.rs 
b/core/integration/tests/examples/test_new_publisher.rs
new file mode 100644
index 00000000..8357f1f1
--- /dev/null
+++ b/core/integration/tests/examples/test_new_publisher.rs
@@ -0,0 +1,126 @@
+// temporary file, do not forget to delete!
+
+use std::str::FromStr;
+use std::sync::Arc;
+use std::time::Instant;
+
+use futures::StreamExt;
+use iggy::clients::send_mode::{BackgroundConfig, BackpressureMode, SendMode};
+use iggy::prelude::defaults::*;
+use iggy::prelude::*;
+use iggy::{clients::client::IggyClient, prelude::TcpClient};
+use iggy_common::TcpClientConfig;
+use integration::test_server::{IpAddrKind, TestServer};
+
+#[tokio::test]
+async fn test_new_publisher() {
+    // new
+    let mut server = TestServer::new(None, true, None, IpAddrKind::V4);
+    server.start();
+
+    let tcp_client_config = TcpClientConfig {
+        server_address: server.get_raw_tcp_addr().unwrap(),
+        ..TcpClientConfig::default()
+    };
+    let client = 
Box::new(TcpClient::create(Arc::new(tcp_client_config)).unwrap());
+    let client = IggyClient::create(client, None, None);
+
+    // setup
+    client.connect().await.unwrap();
+    let ping_result = client.ping().await;
+    assert!(ping_result.is_ok(), "Failed to ping server");
+
+    client
+        .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD)
+        .await
+        .unwrap();
+    client
+        .create_stream("sample-stream", Some(1))
+        .await
+        .unwrap();
+    client
+        .create_topic(
+            &1.try_into().unwrap(),
+            "sample-topic",
+            1,
+            CompressionAlgorithm::default(),
+            None,
+            None,
+            IggyExpiry::NeverExpire,
+            MaxTopicSize::ServerDefault,
+        )
+        .await
+        .unwrap();
+
+    let mut producer = client
+        .producer("1", "1")
+        .unwrap()
+        .batch_length(10)
+        .send_mode(SendMode::Sync)
+        // .send_mode(SendMode::Background(BackgroundConfig {
+        //     max_in_flight: 10,
+        //     in_flight_timeout: None,
+        //     batch_size: None,
+        //     failure_mode: BackpressureMode::Block,
+        // }))
+        .build();
+
+    producer.init().await.unwrap();
+
+    // produce
+    let mut send_batches = 0;
+
+    let batch_limit = 10000;
+    let mut count = 0;
+
+    let mut t = Vec::new();
+    while send_batches < batch_limit {
+        let start = Instant::now();
+
+        let mut messages = Vec::new();
+        for _ in 0..10 {
+            let message = 
IggyMessage::from_str(format!("{count}").as_str()).unwrap();
+            messages.push(message);
+            count += 1;
+        }
+
+        producer.send(messages).await.unwrap();
+        send_batches += 1;
+
+        let duration = start.elapsed().as_millis();
+        t.push(duration);
+    }
+
+    // let mut consumer = client
+    //     .consumer_group("some-consumer", "1", "1")
+    //     .unwrap()
+    //     .auto_commit(AutoCommit::When(AutoCommitWhen::PollingMessages))
+    //     .create_consumer_group_if_not_exists()
+    //     .auto_join_consumer_group()
+    //     .polling_strategy(PollingStrategy::next())
+    //     .poll_interval(IggyDuration::from_str("1ms").unwrap())
+    //     .batch_length(10)
+    //     .build();
+
+    // consumer.init().await.unwrap();
+
+    // let mut consumed_batches = 0;
+    // while let Some(message) = consumer.next().await {
+    //     if consumed_batches >= batch_limit {
+    //         break
+    //     }
+
+    //     if let Ok(message) = message {
+    //         // println!("got message");
+    //         consumed_batches += 1;
+    //     } else if let Err(error) = message {
+    //         panic!("{}", error.to_string());
+    //     }
+    // }
+
+    let total: u128 = t.iter().sum();
+    let avg = total as f64 / t.len() as f64;
+    println!("Среднее время выполнения одного батча: {:.3} мс", avg);
+}
+
+// sync: Время выполнения: 13.937269459s
diff --git a/core/sdk/src/clients/client.rs b/core/sdk/src/clients/client.rs
index 3e6e1c2b..6bbd224a 100644
--- a/core/sdk/src/clients/client.rs
+++ b/core/sdk/src/clients/client.rs
@@ -155,6 +155,7 @@ impl IggyClient {
             topic.to_owned(),
             self.encryptor.clone(),
             None,
+            
         ))
     }
 }
diff --git a/core/sdk/src/clients/mod.rs b/core/sdk/src/clients/mod.rs
index be854f1b..7067d2c2 100644
--- a/core/sdk/src/clients/mod.rs
+++ b/core/sdk/src/clients/mod.rs
@@ -32,6 +32,7 @@ pub mod consumer;
 pub mod consumer_builder;
 pub mod producer;
 pub mod producer_builder;
+pub mod send_mode;
 
 const ORDERING: std::sync::atomic::Ordering = 
std::sync::atomic::Ordering::SeqCst;
-const MAX_BATCH_SIZE: usize = 1000000;
+const MAX_BATCH_LENGTH: usize = 1000000;
diff --git a/core/sdk/src/clients/producer.rs b/core/sdk/src/clients/producer.rs
index f3706e3a..de4f6410 100644
--- a/core/sdk/src/clients/producer.rs
+++ b/core/sdk/src/clients/producer.rs
@@ -1,3 +1,4 @@
+use super::send_mode::{BackpressureMode, SendMode};
 /* Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -15,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-use super::{MAX_BATCH_SIZE, ORDERING};
+use super::{MAX_BATCH_LENGTH, ORDERING};
 
 use bytes::Bytes;
 use futures_util::StreamExt;
@@ -25,12 +26,19 @@ use iggy_common::{
     CompressionAlgorithm, DiagnosticEvent, EncryptorKind, IdKind, Identifier, 
IggyDuration,
     IggyError, IggyExpiry, IggyMessage, IggyTimestamp, MaxTopicSize, 
Partitioner, Partitioning,
 };
+use std::fmt::Debug;
 use std::sync::Arc;
 use std::sync::atomic::{AtomicBool, AtomicU64};
 use std::time::Duration;
+use tokio::sync::Semaphore;
+use tokio::task::JoinHandle;
 use tokio::time::{Interval, sleep};
 use tracing::{error, info, trace, warn};
 
+pub trait ErrorCallback: Send + Sync + Debug {
+    fn call(&self, error: IggyError, messages: Vec<IggyMessage>);
+}
+
 unsafe impl Send for IggyProducer {}
 unsafe impl Sync for IggyProducer {}
 
@@ -44,6 +52,7 @@ pub struct IggyProducer {
     topic_name: String,
     batch_length: Option<usize>,
     partitioning: Option<Arc<Partitioning>>,
+    send_mode: Arc<SendMode>,
     encryptor: Option<Arc<EncryptorKind>>,
     partitioner: Option<Arc<dyn Partitioner>>,
     linger_time_micros: u64,
@@ -58,6 +67,11 @@ pub struct IggyProducer {
     last_sent_at: Arc<AtomicU64>,
     send_retries_count: Option<u32>,
     send_retries_interval: Option<IggyDuration>,
+
+    _join_handle: Option<JoinHandle<()>>,
+    sema: Arc<Semaphore>,
+    sender: Option<Arc<flume::Sender<Vec<IggyMessage>>>>,
+    error_callback: Option<Arc<dyn ErrorCallback>>,
 }
 
 impl IggyProducer {
@@ -81,6 +95,8 @@ impl IggyProducer {
         topic_max_size: MaxTopicSize,
         send_retries_count: Option<u32>,
         send_retries_interval: Option<IggyDuration>,
+        send_mode: SendMode,
+        error_callback: Option<Arc<dyn ErrorCallback>>,
     ) -> Self {
         Self {
             initialized: false,
@@ -92,6 +108,7 @@ impl IggyProducer {
             topic_name,
             batch_length,
             partitioning: partitioning.map(Arc::new),
+            send_mode: Arc::new(send_mode),
             encryptor,
             partitioner,
             linger_time_micros: interval.map_or(0, |i| i.as_micros()),
@@ -106,6 +123,10 @@ impl IggyProducer {
             last_sent_at: Arc::new(AtomicU64::new(0)),
             send_retries_count,
             send_retries_interval,
+            _join_handle: None,
+            sema: Arc::new(Semaphore::new(10)),
+            sender: None,
+            error_callback,
         }
     }
 
@@ -179,8 +200,84 @@ impl IggyProducer {
                 .await?;
         }
 
+        let (tx, rx) = flume::unbounded::<Vec<IggyMessage>>(100); // todo 
задать какое-то значение
+        let stream_id = self.stream_id.clone();
+        let topic_id = self.topic_id.clone();
+        let partitioning = self.partitioning.clone();
+        let default_partitioning = self.default_partitioning.clone();
+        let partitioner = self.partitioner.clone();
+        let client = self.client.clone();
+        let send_retries_count = self.send_retries_count.clone();
+        let send_retries_interval = self.send_retries_interval.clone();
+        let can_send = self.can_send.clone();
+        let sema = self.sema.clone();
+
+        let handle = tokio::spawn(async move {
+            while let Ok(mut batch) = rx.recv_async().await {
+                // let sema = sema.clone();
+                // let partitioner = partitioner.clone();
+                // let stream_id = stream_id.clone();
+                // let topic_id = topic_id.clone();
+                // let partitioning = partitioning.clone();
+                // let default_partitioning = default_partitioning.clone();
+                // let client = client.clone();
+                // let can_send = can_send.clone();
+
+                let partitioning = get_partitioning(
+                    &partitioner,
+                    &stream_id,
+                    &topic_id,
+                    &batch,
+                    partitioning.clone(),
+                    &partitioning,
+                    default_partitioning.clone(),
+                )
+                .unwrap();
+                try_send_messages(
+                    client.clone(),
+                    send_retries_count,
+                    send_retries_interval,
+                    can_send.clone(),
+                    &stream_id,
+                    &topic_id,
+                    &partitioning,
+                    &mut batch,
+                )
+                .await
+                .unwrap();
+                // tokio::spawn(async move {
+                //     let _permit = sema.acquire().await.unwrap();
+                //     let partitioning = get_partitioning(
+                //         &partitioner,
+                //         &stream_id,
+                //         &topic_id,
+                //         &batch,
+                //         partitioning.clone(),
+                //         &partitioning,
+                //         default_partitioning.clone(),
+                //     )
+                //     .unwrap();
+                //     try_send_messages(
+                //         client.clone(),
+                //         send_retries_count,
+                //         send_retries_interval,
+                //         can_send.clone(),
+                //         &stream_id,
+                //         &topic_id,
+                //         &partitioning,
+                //         &mut batch,
+                //     ).await.unwrap();
+                // });
+            }
+        });
+        self._join_handle = Some(handle);
+        self.sender = Some(Arc::new(tx));
         self.initialized = true;
-        info!("Producer has been initialized for stream: {stream_id} and 
topic: {topic_id}.");
+        info!(
+            "Producer has been initialized for stream: {} and topic: {}.",
+            self.stream_id.clone(),
+            self.topic_id.clone()
+        );
         Ok(())
     }
 
@@ -293,6 +390,7 @@ impl IggyProducer {
             .await
     }
 
+    // TODO add batch_size
     async fn send_buffered(
         &self,
         stream: Arc<Identifier>,
@@ -301,8 +399,18 @@ impl IggyProducer {
         partitioning: Option<Arc<Partitioning>>,
     ) -> Result<(), IggyError> {
         self.encrypt_messages(&mut messages)?;
-        let partitioning = self.get_partitioning(&stream, &topic, &messages, 
partitioning)?;
-        let batch_length = self.batch_length.unwrap_or(MAX_BATCH_SIZE);
+        let default_partitioning = self.default_partitioning.clone();
+        let partitioner = self.partitioner.clone();
+        let partitioning = get_partitioning(
+            &partitioner,
+            &stream,
+            &topic,
+            &messages,
+            partitioning,
+            &self.partitioning,
+            default_partitioning,
+        )?;
+        let batch_length = self.batch_length.unwrap_or(MAX_BATCH_LENGTH);
         let batches = messages.chunks_mut(batch_length);
         let mut current_batch = 1;
         let batches_count = batches.len();
@@ -321,8 +429,30 @@ impl IggyProducer {
             );
             self.last_sent_at
                 .store(IggyTimestamp::now().into(), ORDERING);
-            self.try_send_messages(&self.stream_id, &self.topic_id, 
&partitioning, batch)
-                .await?;
+
+            let client = self.client.clone();
+            let send_retries_count = self.send_retries_count.clone();
+            let send_retries_interval = self.send_retries_interval.clone();
+            let can_send = self.can_send.clone();
+
+            let sender = self.sender.clone();
+            let send_mode = self.send_mode.clone();
+            try_send_messages_new(
+                BackpressureMode::Block,
+                sender,
+                self.error_callback.clone(),
+                send_mode,
+                client,
+                send_retries_count,
+                send_retries_interval,
+                can_send,
+                &self.stream_id,
+                &self.topic_id,
+                &partitioning,
+                batch,
+            )
+            .await?;
+
             trace!("Sent {messages_count} messages 
({current_batch}/{batches_count} batch(es)).");
             current_batch += 1;
         }
@@ -338,21 +468,56 @@ impl IggyProducer {
     ) -> Result<(), IggyError> {
         trace!("No batch size specified, sending messages immediately.");
         self.encrypt_messages(&mut messages)?;
-        let partitioning = self.get_partitioning(stream, topic, &messages, 
partitioning)?;
-        let batch_length = self.batch_length.unwrap_or(MAX_BATCH_SIZE);
+        let default_partitioning = self.default_partitioning.clone();
+        let partitioner = self.partitioner.clone();
+        let client = self.client.clone();
+        let send_retries_count = self.send_retries_count.clone();
+        let send_retries_interval = self.send_retries_interval.clone();
+        let can_send = self.can_send.clone();
+
+        let partitioning = get_partitioning(
+            &partitioner,
+            &stream,
+            &topic,
+            &messages,
+            partitioning,
+            &self.partitioning,
+            default_partitioning,
+        )?;
+        let batch_length = self.batch_length.unwrap_or(MAX_BATCH_LENGTH);
         if messages.len() <= batch_length {
             self.last_sent_at
                 .store(IggyTimestamp::now().into(), ORDERING);
-            self.try_send_messages(stream, topic, &partitioning, &mut messages)
-                .await?;
+            try_send_messages(
+                client.clone(),
+                send_retries_count,
+                send_retries_interval,
+                can_send,
+                stream,
+                topic,
+                &partitioning,
+                &mut messages,
+            )
+            .await?;
             return Ok(());
         }
 
         for batch in messages.chunks_mut(batch_length) {
+            let client = client.clone();
+            let can_send = can_send.clone();
             self.last_sent_at
                 .store(IggyTimestamp::now().into(), ORDERING);
-            self.try_send_messages(stream, topic, &partitioning, batch)
-                .await?;
+            try_send_messages(
+                client,
+                send_retries_count,
+                send_retries_interval,
+                can_send,
+                stream,
+                topic,
+                &partitioning,
+                batch,
+            )
+            .await?;
         }
         Ok(())
     }
@@ -385,144 +550,236 @@ impl IggyProducer {
         }
         Ok(())
     }
+}
 
-    async fn try_send_messages(
-        &self,
-        stream: &Identifier,
-        topic: &Identifier,
-        partitioning: &Arc<Partitioning>,
-        messages: &mut [IggyMessage],
-    ) -> Result<(), IggyError> {
-        let client = self.client.read().await;
-        let Some(max_retries) = self.send_retries_count else {
-            return client
-                .send_messages(stream, topic, partitioning, messages)
-                .await;
-        };
+fn get_partitioning(
+    partitioner: &Option<Arc<dyn Partitioner>>,
+    stream: &Identifier,
+    topic: &Identifier,
+    messages: &[IggyMessage],
+    partitioning: Option<Arc<Partitioning>>,
+    producer_partitioning: &Option<Arc<Partitioning>>,
+    default_partitioning: Arc<Partitioning>,
+) -> Result<Arc<Partitioning>, IggyError> {
+    if let Some(partitioner) = partitioner {
+        trace!("Calculating partition id using custom partitioner.");
+        let partition_id = partitioner.calculate_partition_id(stream, topic, 
messages)?;
+        Ok(Arc::new(Partitioning::partition_id(partition_id)))
+    } else {
+        trace!("Using the provided partitioning.");
+        Ok(partitioning.unwrap_or_else(|| {
+            producer_partitioning
+                .clone()
+                .unwrap_or_else(|| default_partitioning.clone())
+        }))
+    }
+}
 
-        if max_retries == 0 {
-            return client
-                .send_messages(stream, topic, partitioning, messages)
-                .await;
+async fn wait_until_connected(
+    can_send: Arc<AtomicBool>,
+    max_retries: u32,
+    stream: &Identifier,
+    topic: &Identifier,
+    timer: &mut Option<Interval>,
+) -> Result<(), IggyError> {
+    let mut retries = 0;
+    while !can_send.load(ORDERING) {
+        retries += 1;
+        if retries > max_retries {
+            error!(
+                "Failed to send messages to topic: {topic}, stream: {stream} \
+                 after {max_retries} retries. Client is disconnected."
+            );
+            return Err(IggyError::CannotSendMessagesDueToClientDisconnection);
         }
 
-        let mut timer = if let Some(interval) = self.send_retries_interval {
-            let mut timer = tokio::time::interval(interval.get_duration());
-            timer.tick().await;
-            Some(timer)
-        } else {
-            None
-        };
+        error!(
+            "Trying to send messages to topic: {topic}, stream: {stream} \
+             but the client is disconnected. Retrying 
{retries}/{max_retries}..."
+        );
 
-        self.wait_until_connected(max_retries, stream, topic, &mut timer)
-            .await?;
-        self.send_with_retries(
-            max_retries,
-            stream,
-            topic,
-            partitioning,
-            messages,
-            &mut timer,
-        )
-        .await
+        if let Some(timer) = timer.as_mut() {
+            trace!(
+                "Waiting for the next retry to send messages to topic: 
{topic}, \
+                 stream: {stream} for disconnected client..."
+            );
+            timer.tick().await;
+        }
     }
+    Ok(())
+}
+
+async fn send_with_retries(
+    client: Arc<IggySharedMut<Box<dyn Client>>>,
+    max_retries: u32,
+    stream: &Identifier,
+    topic: &Identifier,
+    partitioning: &Arc<Partitioning>,
+    messages: &mut [IggyMessage],
+    timer: &mut Option<Interval>,
+) -> Result<(), IggyError> {
+    let client = client.read().await;
+    let mut retries = 0;
+    loop {
+        match client
+            .send_messages(stream, topic, partitioning, messages)
+            .await
+        {
+            Ok(_) => return Ok(()),
+            Err(error) => {
+                retries += 1;
+                if retries > max_retries {
+                    error!(
+                        "Failed to send messages to topic: {topic}, stream: 
{stream} \
+                         after {max_retries} retries. {error}."
+                    );
+                    return Err(error);
+                }
 
-    async fn wait_until_connected(
-        &self,
-        max_retries: u32,
-        stream: &Identifier,
-        topic: &Identifier,
-        timer: &mut Option<Interval>,
-    ) -> Result<(), IggyError> {
-        let mut retries = 0;
-        while !self.can_send.load(ORDERING) {
-            retries += 1;
-            if retries > max_retries {
                 error!(
-                    "Failed to send messages to topic: {topic}, stream: 
{stream} \
-                     after {max_retries} retries. Client is disconnected."
+                    "Failed to send messages to topic: {topic}, stream: 
{stream}. \
+                     {error} Retrying {retries}/{max_retries}..."
                 );
-                return 
Err(IggyError::CannotSendMessagesDueToClientDisconnection);
-            }
 
-            error!(
-                "Trying to send messages to topic: {topic}, stream: {stream} \
-                 but the client is disconnected. Retrying 
{retries}/{max_retries}..."
-            );
-
-            if let Some(timer) = timer.as_mut() {
-                trace!(
-                    "Waiting for the next retry to send messages to topic: 
{topic}, \
-                     stream: {stream} for disconnected client..."
-                );
-                timer.tick().await;
+                if let Some(t) = timer.as_mut() {
+                    trace!(
+                        "Waiting for the next retry to send messages to topic: 
{topic}, \
+                         stream: {stream}..."
+                    );
+                    t.tick().await;
+                }
             }
         }
-        Ok(())
     }
+}
 
-    async fn send_with_retries(
-        &self,
-        max_retries: u32,
-        stream: &Identifier,
-        topic: &Identifier,
-        partitioning: &Arc<Partitioning>,
-        messages: &mut [IggyMessage],
-        timer: &mut Option<Interval>,
-    ) -> Result<(), IggyError> {
-        let client = self.client.read().await;
-        let mut retries = 0;
-        loop {
-            match client
-                .send_messages(stream, topic, partitioning, messages)
-                .await
-            {
-                Ok(_) => return Ok(()),
-                Err(error) => {
-                    retries += 1;
-                    if retries > max_retries {
-                        error!(
-                            "Failed to send messages to topic: {topic}, 
stream: {stream} \
-                             after {max_retries} retries. {error}."
-                        );
-                        return Err(error);
+async fn try_send_messages(
+    client: Arc<IggySharedMut<Box<dyn Client>>>,
+    send_retries_count: Option<u32>,
+    send_retries_interval: Option<IggyDuration>,
+    can_send: Arc<AtomicBool>,
+    stream: &Identifier,
+    topic: &Identifier,
+    partitioning: &Arc<Partitioning>,
+    messages: &mut [IggyMessage],
+) -> Result<(), IggyError> {
+    let rw_client = client.read().await;
+    let Some(max_retries) = send_retries_count else {
+        return rw_client
+            .send_messages(stream, topic, partitioning, messages)
+            .await;
+    };
+
+    if max_retries == 0 {
+        return rw_client
+            .send_messages(stream, topic, partitioning, messages)
+            .await;
+    }
+
+    let mut timer = if let Some(interval) = send_retries_interval {
+        let mut timer = tokio::time::interval(interval.get_duration());
+        timer.tick().await;
+        Some(timer)
+    } else {
+        None
+    };
+
+    wait_until_connected(can_send.clone(), max_retries, stream, topic, &mut 
timer).await?;
+    send_with_retries(
+        client.clone(),
+        max_retries,
+        stream,
+        topic,
+        partitioning,
+        messages,
+        &mut timer,
+    )
+    .await
+}
+
+async fn try_send_messages_new(
+    backpressure_mode: BackpressureMode,
+    sender: Option<Arc<flume::Sender<Vec<IggyMessage>>>>,
+    error_callback: Option<Arc<dyn ErrorCallback>>,
+    send_mode: Arc<SendMode>,
+    client: Arc<IggySharedMut<Box<dyn Client>>>,
+    send_retries_count: Option<u32>,
+    send_retries_interval: Option<IggyDuration>,
+    can_send: Arc<AtomicBool>,
+    stream: &Identifier,
+    topic: &Identifier,
+    partitioning: &Arc<Partitioning>,
+    messages: &mut [IggyMessage],
+) -> Result<(), IggyError> {
+    match &*send_mode {
+        SendMode::Sync => {
+            try_send_messages(
+                client,
+                send_retries_count,
+                send_retries_interval,
+                can_send,
+                stream,
+                topic,
+                partitioning,
+                messages,
+            )
+            .await
+        }
+
+        SendMode::Background(_cfg) => {
+            let out: Vec<IggyMessage> = 
messages.iter_mut().map(std::mem::take).collect();
+            let sender = sender.as_ref().expect("init() must set 
sender").clone();
+
+            match sender.try_send(out) {
+                Ok(()) => Ok(()),
+
+                Err(flume::TrySendError::Full(out)) => match backpressure_mode 
{
+                    BackpressureMode::Block => match 
sender.send_async(out).await {
+                        Ok(()) => Ok(()),
+                        Err(flume::SendError(out)) => {
+                            if let Some(cb) = error_callback {
+                                cb.call(IggyError::BackgroundSendError, out);
+                            }
+                            Err(IggyError::BackgroundSendError)
+                        }
+                    },
+
+                    BackpressureMode::BlockWithTimeout(t) => {
+                        match tokio::time::timeout(t.get_duration(), 
sender.send_async(out)).await {
+                            Ok(Ok(())) => Ok(()),
+
+                            Ok(Err(flume::SendError(out))) => {
+                                if let Some(cb) = error_callback {
+                                    cb.call(IggyError::BackgroundSendTimeout, 
out);
+                                }
+                                Err(IggyError::BackgroundSendTimeout)
+                            }
+
+                            Err(_) => {
+                                if let Some(cb) = &error_callback {
+                                    cb.call(IggyError::BackgroundSendTimeout, 
vec![]);
+                                }
+                                Err(IggyError::BackgroundSendTimeout)
+                            }
+                        }
                     }
 
-                    error!(
-                        "Failed to send messages to topic: {topic}, stream: 
{stream}. \
-                         {error} Retrying {retries}/{max_retries}..."
-                    );
+                    BackpressureMode::FailImmediately => {
+                        if let Some(cb) = &error_callback {
+                            cb.call(IggyError::BackgroundSendBufferFull, out);
+                        }
+                        Err(IggyError::BackgroundSendBufferFull)
+                    }
+                },
 
-                    if let Some(t) = timer.as_mut() {
-                        trace!(
-                            "Waiting for the next retry to send messages to 
topic: {topic}, \
-                             stream: {stream}..."
-                        );
-                        t.tick().await;
+                Err(flume::TrySendError::Disconnected(out)) => {
+                    if let Some(cb) = &error_callback {
+                        cb.call(IggyError::BackgroundWorkerDisconnected, out);
                     }
+                    error!("Background worker has shut down.");
+                    Err(IggyError::BackgroundWorkerDisconnected)
                 }
             }
         }
     }
-
-    fn get_partitioning(
-        &self,
-        stream: &Identifier,
-        topic: &Identifier,
-        messages: &[IggyMessage],
-        partitioning: Option<Arc<Partitioning>>,
-    ) -> Result<Arc<Partitioning>, IggyError> {
-        if let Some(partitioner) = &self.partitioner {
-            trace!("Calculating partition id using custom partitioner.");
-            let partition_id = partitioner.calculate_partition_id(stream, 
topic, messages)?;
-            Ok(Arc::new(Partitioning::partition_id(partition_id)))
-        } else {
-            trace!("Using the provided partitioning.");
-            Ok(partitioning.unwrap_or_else(|| {
-                self.partitioning
-                    .clone()
-                    .unwrap_or_else(|| self.default_partitioning.clone())
-            }))
-        }
-    }
 }
diff --git a/core/sdk/src/clients/producer_builder.rs 
b/core/sdk/src/clients/producer_builder.rs
index 76ecfeea..4d9590f4 100644
--- a/core/sdk/src/clients/producer_builder.rs
+++ b/core/sdk/src/clients/producer_builder.rs
@@ -15,8 +15,9 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use super::MAX_BATCH_SIZE;
-use crate::prelude::IggyProducer;
+use super::send_mode::{self, SendMode};
+use super::MAX_BATCH_LENGTH;
+use crate::prelude::{IggyProducer, ErrorCallback};
 use iggy_binary_protocol::Client;
 use iggy_common::locking::IggySharedMut;
 use iggy_common::{
@@ -36,6 +37,7 @@ pub struct IggyProducerBuilder {
     encryptor: Option<Arc<EncryptorKind>>,
     partitioner: Option<Arc<dyn Partitioner>>,
     linger_time: Option<IggyDuration>,
+    send_mode: SendMode,
     create_stream_if_not_exists: bool,
     create_topic_if_not_exists: bool,
     topic_partitions_count: u32,
@@ -44,6 +46,7 @@ pub struct IggyProducerBuilder {
     send_retries_interval: Option<IggyDuration>,
     topic_message_expiry: IggyExpiry,
     topic_max_size: MaxTopicSize,
+    error_callback: Option<Arc<dyn ErrorCallback>>
 }
 
 impl IggyProducerBuilder {
@@ -67,6 +70,7 @@ impl IggyProducerBuilder {
             partitioning: None,
             encryptor,
             partitioner,
+            send_mode: SendMode::default(),
             linger_time: Some(IggyDuration::from(1000)),
             create_stream_if_not_exists: true,
             create_topic_if_not_exists: true,
@@ -76,6 +80,7 @@ impl IggyProducerBuilder {
             topic_max_size: MaxTopicSize::ServerDefault,
             send_retries_count: Some(3),
             send_retries_interval: Some(IggyDuration::ONE_SECOND),
+            error_callback: None,
         }
     }
 
@@ -95,7 +100,7 @@ impl IggyProducerBuilder {
             batch_length: if batch_length == 0 {
                 None
             } else {
-                Some(batch_length.min(MAX_BATCH_SIZE as u32) as usize)
+                Some(batch_length.min(MAX_BATCH_LENGTH as u32) as usize)
             },
             ..self
         }
@@ -125,6 +130,20 @@ impl IggyProducerBuilder {
         }
     }
 
+    pub fn send_mode(self, send_mode: SendMode) -> Self {
+        Self {
+            send_mode,
+            ..self
+        }
+    }
+
+    pub fn error_callback(self, cb: Arc<dyn ErrorCallback>) -> Self {
+        Self {
+            error_callback: Some(cb),
+            ..self
+        }
+    }
+
     /// Sets the encryptor for encrypting the messages' payloads.
     pub fn encryptor(self, encryptor: Arc<EncryptorKind>) -> Self {
         Self {
@@ -249,6 +268,8 @@ impl IggyProducerBuilder {
             self.topic_max_size,
             self.send_retries_count,
             self.send_retries_interval,
+            self.send_mode,
+            self.error_callback,
         )
     }
 }
diff --git a/core/sdk/src/clients/send_mode.rs 
b/core/sdk/src/clients/send_mode.rs
new file mode 100644
index 00000000..98b3abef
--- /dev/null
+++ b/core/sdk/src/clients/send_mode.rs
@@ -0,0 +1,27 @@
+use iggy_common::IggyDuration;
+
+#[derive(Debug, Clone, Default)]
+pub enum SendMode {
+    #[default]
+    Sync,
+    Background(BackgroundConfig),
+}
+
+#[derive(Debug, Clone)]
+/// Determines how the `send_messages` API should behave when problem is 
encountered
+pub enum BackpressureMode {
+    /// Block until the send succeeds
+    Block,
+    /// Block with a timeout, after which the send fails
+    BlockWithTimeout(IggyDuration),
+    /// Fail immediately without retrying
+    FailImmediately,
+}
+
+#[derive(Debug, Clone)]
+pub struct BackgroundConfig {
+    pub max_in_flight: usize,
+    pub in_flight_timeout: Option<IggyDuration>,
+    pub batch_size: Option<usize>,
+    pub failure_mode: BackpressureMode,
+}
diff --git a/core/sdk/src/prelude.rs b/core/sdk/src/prelude.rs
index 3d87cc82..9b13f218 100644
--- a/core/sdk/src/prelude.rs
+++ b/core/sdk/src/prelude.rs
@@ -35,7 +35,7 @@ pub use crate::clients::consumer::{
     AutoCommit, AutoCommitAfter, AutoCommitWhen, IggyConsumer, ReceivedMessage,
 };
 pub use crate::clients::consumer_builder::IggyConsumerBuilder;
-pub use crate::clients::producer::IggyProducer;
+pub use crate::clients::producer::{IggyProducer, ErrorCallback};
 pub use crate::clients::producer_builder::IggyProducerBuilder;
 pub use crate::consumer_ext::IggyConsumerMessageExt;
 pub use crate::stream_builder::IggyConsumerConfig;


Reply via email to