This is an automated email from the ASF dual-hosted git repository.

bashirbekov pushed a commit to branch feat/add-background-send
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit 85334f59ad52bbf1663d3e1a88676ff1e4189ffd
Author: haze518 <[email protected]>
AuthorDate: Sat May 24 09:22:23 2025 +0600

    old variant
---
 Cargo.lock                                         | 146 ++++++
 core/common/src/error/iggy_error.rs                |   8 +
 core/common/src/types/message/iggy_message.rs      |   2 +-
 core/integration/Cargo.toml                        |   5 +
 core/integration/benches/send.rs                   | 102 ++++
 core/integration/tests/examples/mod.rs             |   1 +
 .../tests/examples/test_new_publisher.rs           | 131 +++++
 core/sdk/src/clients/client.rs                     |   1 +
 core/sdk/src/clients/mod.rs                        |   3 +-
 core/sdk/src/clients/producer.rs                   | 531 +++++++++++++++------
 core/sdk/src/clients/producer_builder.rs           |  27 +-
 core/sdk/src/clients/send_mode.rs                  |  27 ++
 core/sdk/src/prelude.rs                            |   2 +-
 13 files changed, 846 insertions(+), 140 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 96dee5e0..f5ed8db8 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -343,6 +343,12 @@ dependencies = [
  "libc",
 ]
 
+[[package]]
+name = "anes"
+version = "0.1.6"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "4b46cbb362ab8752921c97e041f5e366ee6297bd428a31275b9fcf1e380f7299"
+
 [[package]]
 name = "anstream"
 version = "0.6.18"
@@ -1260,6 +1266,12 @@ dependencies = [
  "thiserror 2.0.12",
 ]
 
+[[package]]
+name = "cast"
+version = "0.3.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5"
+
 [[package]]
 name = "cc"
 version = "1.2.22"
@@ -1327,6 +1339,33 @@ dependencies = [
  "windows-link",
 ]
 
+[[package]]
+name = "ciborium"
+version = "0.2.2"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "42e69ffd6f0917f5c029256a24d0161db17cea3997d185db0d35926308770f0e"
+dependencies = [
+ "ciborium-io",
+ "ciborium-ll",
+ "serde",
+]
+
+[[package]]
+name = "ciborium-io"
+version = "0.2.2"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "05afea1e0a06c9be33d539b876f1ce3692f4afea2cb41f740e7743225ed1c757"
+
+[[package]]
+name = "ciborium-ll"
+version = "0.2.2"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "57663b653d948a338bfb3eeba9bb2fd5fcfaecb9e199e87e1eda4d9e8b240fd9"
+dependencies = [
+ "ciborium-io",
+ "half",
+]
+
 [[package]]
 name = "cipher"
 version = "0.4.4"
@@ -1620,6 +1659,40 @@ dependencies = [
  "cfg-if",
 ]
 
+[[package]]
+name = "criterion"
+version = "0.6.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "3bf7af66b0989381bd0be551bd7cc91912a655a58c6918420c9527b1fd8b4679"
+dependencies = [
+ "anes",
+ "cast",
+ "ciborium",
+ "clap",
+ "criterion-plot",
+ "itertools 0.13.0",
+ "num-traits",
+ "oorandom",
+ "plotters",
+ "rayon",
+ "regex",
+ "serde",
+ "serde_json",
+ "tinytemplate",
+ "tokio",
+ "walkdir",
+]
+
+[[package]]
+name = "criterion-plot"
+version = "0.5.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "6b50826342786a51a89e2da3a28f1c32b06e387201bc2d19791f622c673706b1"
+dependencies = [
+ "cast",
+ "itertools 0.10.5",
+]
+
 [[package]]
 name = "crossbeam"
 version = "0.8.4"
@@ -3016,6 +3089,16 @@ dependencies = [
  "tracing",
 ]
 
+[[package]]
+name = "half"
+version = "2.6.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "459196ed295495a68f7d7fe1d84f6c4b7ff0e21fe3017b2f283c6fac3ad803c9"
+dependencies = [
+ "cfg-if",
+ "crunchy",
+]
+
 [[package]]
 name = "handlebars"
 version = "4.5.0"
@@ -3672,6 +3755,7 @@ dependencies = [
  "async-trait",
  "bytes",
  "chrono",
+ "criterion",
  "ctor",
  "derive_more 2.0.1",
  "env_logger",
@@ -3719,6 +3803,15 @@ version = "1.70.1"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "7943c866cc5cd64cbc25b2e01621d07fa8eb2a1a23160ee81ce38704e97b8ecf"
 
+[[package]]
+name = "itertools"
+version = "0.10.5"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "b0fd2260e829bddf4cb6ea802289de2f86d6a7a690192fbe91b3f46e0f2c8473"
+dependencies = [
+ "either",
+]
+
 [[package]]
 name = "itertools"
 version = "0.12.1"
@@ -3728,6 +3821,15 @@ dependencies = [
  "either",
 ]
 
+[[package]]
+name = "itertools"
+version = "0.13.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "413ee7dfc52ee1a4949ceeb7dbc8a33f2d6c088194d9f922fb8318faf1f01186"
+dependencies = [
+ "either",
+]
+
 [[package]]
 name = "itertools"
 version = "0.14.0"
@@ -4564,6 +4666,12 @@ version = "1.21.3"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "42f5e15c9953c5e4ccceeb2e7382a716482c34515315f7b03532b8b4e8393d2d"
 
+[[package]]
+name = "oorandom"
+version = "11.1.5"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "d6790f58c7ff633d8771f42965289203411a5e5c68388703c06e14f24770b41e"
+
 [[package]]
 name = "opaque-debug"
 version = "0.3.1"
@@ -4958,6 +5066,34 @@ version = "0.3.32"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "7edddbd0b52d732b21ad9a5fab5c704c14cd949e5e9a1ec5929a24fded1b904c"
 
+[[package]]
+name = "plotters"
+version = "0.3.7"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "5aeb6f403d7a4911efb1e33402027fc44f29b5bf6def3effcc22d7bb75f2b747"
+dependencies = [
+ "num-traits",
+ "plotters-backend",
+ "plotters-svg",
+ "wasm-bindgen",
+ "web-sys",
+]
+
+[[package]]
+name = "plotters-backend"
+version = "0.3.7"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "df42e13c12958a16b3f7f4386b9ab1f3e7933914ecea48da7139435263a4172a"
+
+[[package]]
+name = "plotters-svg"
+version = "0.3.7"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "51bae2ac328883f7acdfea3d66a7c35751187f870bc81f94563733a154d7a670"
+dependencies = [
+ "plotters-backend",
+]
+
 [[package]]
 name = "polonius-the-crab"
 version = "0.2.1"
@@ -6647,6 +6783,16 @@ dependencies = [
  "zerovec",
 ]
 
+[[package]]
+name = "tinytemplate"
+version = "1.2.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "be4d6b5f19ff7664e8c98d03e2139cb510db9b0a60b55f8e8709b689d939b6bc"
+dependencies = [
+ "serde",
+ "serde_json",
+]
+
 [[package]]
 name = "tinyvec"
 version = "1.9.0"
diff --git a/core/common/src/error/iggy_error.rs 
b/core/common/src/error/iggy_error.rs
index b8076e2e..5e812288 100644
--- a/core/common/src/error/iggy_error.rs
+++ b/core/common/src/error/iggy_error.rs
@@ -361,6 +361,14 @@ pub enum IggyError {
     TooSmallMessage(u32, u32) = 4037,
     #[error("Cannot sed messages due to client disconnection")]
     CannotSendMessagesDueToClientDisconnection = 4050,
+    #[error("Background send error")]
+    BackgroundSendError = 4051,
+    #[error("Background send timeout")]
+    BackgroundSendTimeout = 4052,
+    #[error("Background send buffer is full")]
+    BackgroundSendBufferFull = 4053,
+    #[error("Background worker disconnected")]
+    BackgroundWorkerDisconnected = 4054,
     #[error("Invalid offset: {0}")]
     InvalidOffset(u64) = 4100,
     #[error("Consumer group with ID: {0} for topic with ID: {1} was not 
found.")]
diff --git a/core/common/src/types/message/iggy_message.rs 
b/core/common/src/types/message/iggy_message.rs
index 54c3a841..ddd4076f 100644
--- a/core/common/src/types/message/iggy_message.rs
+++ b/core/common/src/types/message/iggy_message.rs
@@ -105,7 +105,7 @@ pub const MAX_USER_HEADERS_SIZE: u32 = 100 * 1000;
 ///     .build()
 ///     .unwrap();
 /// ```
-#[derive(Debug, PartialEq)]
+#[derive(Debug, PartialEq, Default)]
 pub struct IggyMessage {
     /// Message metadata
     pub header: IggyMessageHeader,
diff --git a/core/integration/Cargo.toml b/core/integration/Cargo.toml
index 46cc8847..1eaf83ce 100644
--- a/core/integration/Cargo.toml
+++ b/core/integration/Cargo.toml
@@ -32,6 +32,7 @@ assert_cmd = "2.0.17"
 async-trait = { workspace = true }
 bytes = { workspace = true }
 chrono = { workspace = true }
+criterion = { version = "0.6.0", features = ["async", "async_tokio"] }
 ctor = "0.4.2"
 derive_more = { workspace = true }
 env_logger = { workspace = true }
@@ -55,3 +56,7 @@ tracing-subscriber = { workspace = true }
 twox-hash = { workspace = true }
 uuid = { workspace = true }
 zip = { workspace = true }
+
+[[bench]]
+name = "send"
+harness = false
diff --git a/core/integration/benches/send.rs b/core/integration/benches/send.rs
new file mode 100644
index 00000000..1bb7b316
--- /dev/null
+++ b/core/integration/benches/send.rs
@@ -0,0 +1,102 @@
+use std::sync::Arc;
+
+use bytes::Bytes;
+use criterion::{criterion_group, criterion_main, Criterion};
+use iggy::clients::send_mode::{BackgroundConfig, BackpressureMode, SendMode};
+use iggy::prelude::*;
+use iggy::{clients::client::IggyClient, prelude::TcpClient};
+use iggy_common::TcpClientConfig;
+use integration::test_server::{IpAddrKind, TestServer};
+use tokio::runtime::Runtime;
+
+fn bench_send(c: &mut Criterion) {
+    let rt = Runtime::new().unwrap();
+
+    // SETUP вне замера
+    let (mut producer, batch): (IggyProducer, Vec<IggyMessage>) = 
rt.block_on(async {
+        // старт сервера
+        let mut server = TestServer::new(None, true, None, IpAddrKind::V4);
+        server.start();
+
+        let tcp_client_config = TcpClientConfig {
+            server_address: server.get_raw_tcp_addr().unwrap(),
+            ..TcpClientConfig::default()
+        };
+
+        let client = 
Box::new(TcpClient::create(Arc::new(tcp_client_config)).unwrap());
+        let client = IggyClient::create(client, None, None);
+
+        client.connect().await.unwrap();
+        client
+            .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD)
+            .await
+            .unwrap();
+        client
+            .create_stream("sample-stream", Some(1))
+            .await
+            .unwrap();
+        client
+            .create_topic(
+                &1.try_into().unwrap(),
+                "sample-topic",
+                1,
+                CompressionAlgorithm::default(),
+                None,
+                None,
+                IggyExpiry::NeverExpire,
+                MaxTopicSize::ServerDefault,
+            )
+            .await
+            .unwrap();
+
+        let mut producer = client
+            .producer("1", "1")
+            .unwrap()
+            .batch_length(10)
+            .send_mode(SendMode::Background(BackgroundConfig {
+                max_in_flight: 10,
+                in_flight_timeout: None,
+                batch_size: None,
+                failure_mode: BackpressureMode::Block,
+            }))
+            .build();
+
+        producer.init().await.unwrap();
+
+        // заранее создаём батч
+        let mut messages = Vec::new();
+        for _ in 0..10 {
+            let payload = Bytes::from(vec![0u8; 1024]);
+            let msg = IggyMessage::builder()
+                .payload(payload)
+                .build()
+                .unwrap();
+            messages.push(msg);
+        }
+
+        (producer, messages)
+    });
+
+    c.bench_function("producer.send(batch)", |b| {
+        b.to_async(&rt).iter(|| {
+            let batch = create_batch();
+            producer.send(batch)
+        });
+    });
+}
+
+fn create_batch() -> Vec<IggyMessage> {
+    let mut messages = Vec::new();
+    for _ in 0..1 {
+        let payload = Bytes::from(vec![0u8; 1024]);
+        let msg = IggyMessage::builder()
+            .payload(payload)
+            .build()
+            .unwrap();
+        messages.push(msg);
+    }
+    messages
+}
+
+criterion_group!(benches, bench_send);
+criterion_main!(benches);
diff --git a/core/integration/tests/examples/mod.rs 
b/core/integration/tests/examples/mod.rs
index 015eaa87..8d960e66 100644
--- a/core/integration/tests/examples/mod.rs
+++ b/core/integration/tests/examples/mod.rs
@@ -20,6 +20,7 @@ mod test_basic;
 mod test_getting_started;
 mod test_message_envelope;
 mod test_message_headers;
+mod test_new_publisher;
 
 use assert_cmd::Command;
 use iggy::clients::client::IggyClient;
diff --git a/core/integration/tests/examples/test_new_publisher.rs 
b/core/integration/tests/examples/test_new_publisher.rs
new file mode 100644
index 00000000..771fac8f
--- /dev/null
+++ b/core/integration/tests/examples/test_new_publisher.rs
@@ -0,0 +1,131 @@
+// temporary file, do not forget to delete!
+
+use std::str::FromStr;
+use std::sync::Arc;
+use std::time::Instant;
+
+use bytes::Bytes;
+use futures::StreamExt;
+use iggy::clients::send_mode::{BackgroundConfig, BackpressureMode, SendMode};
+use iggy::prelude::defaults::*;
+use iggy::prelude::*;
+use iggy::{clients::client::IggyClient, prelude::TcpClient};
+use iggy_common::TcpClientConfig;
+use integration::test_server::{IpAddrKind, TestServer};
+
+#[tokio::test]
+async fn test_new_publisher() {
+    // new
+    let mut server = TestServer::new(None, true, None, IpAddrKind::V4);
+    server.start();
+
+    let tcp_client_config = TcpClientConfig {
+        server_address: server.get_raw_tcp_addr().unwrap(),
+        ..TcpClientConfig::default()
+    };
+    let client = 
Box::new(TcpClient::create(Arc::new(tcp_client_config)).unwrap());
+    let client = IggyClient::create(client, None, None);
+
+    // setup
+    client.connect().await.unwrap();
+    let ping_result = client.ping().await;
+    assert!(ping_result.is_ok(), "Failed to ping server");
+
+    client
+        .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD)
+        .await
+        .unwrap();
+    client
+        .create_stream("sample-stream", Some(1))
+        .await
+        .unwrap();
+    client
+        .create_topic(
+            &1.try_into().unwrap(),
+            "sample-topic",
+            1,
+            CompressionAlgorithm::default(),
+            None,
+            None,
+            IggyExpiry::NeverExpire,
+            MaxTopicSize::ServerDefault,
+        )
+        .await
+        .unwrap();
+
+    let mut producer = client
+        .producer("1", "1")
+        .unwrap()
+        .batch_length(10)
+        .send_mode(SendMode::Sync)
+        // .send_mode(SendMode::Background(BackgroundConfig {
+        //     max_in_flight: 10,
+        //     in_flight_timeout: None,
+        //     batch_size: None,
+        //     failure_mode: BackpressureMode::Block,
+        // }))
+        .build();
+
+    producer.init().await.unwrap();
+
+    // produce
+    let mut send_batches = 0;
+
+    let batch_limit = 10000;
+    let mut count = 0;
+
+    let mut t = Vec::new();
+    while send_batches < batch_limit {
+        let start = Instant::now();
+
+        let mut messages = Vec::new();
+        for _ in 0..10 {
+            let payload = Bytes::from(vec![0u8; 1024]);
+            let msg = IggyMessage::builder()
+                .payload(payload)
+                .build()
+                .unwrap();
+            messages.push(msg);
+            count += 1;
+        }
+
+        producer.send(messages).await.unwrap();
+        send_batches += 1;
+
+        let duration = start.elapsed().as_millis();
+        t.push(duration);
+    }
+
+    // let mut consumer = client
+    //     .consumer_group("some-consumer", "1", "1")
+    //     .unwrap()
+    //     .auto_commit(AutoCommit::When(AutoCommitWhen::PollingMessages))
+    //     .create_consumer_group_if_not_exists()
+    //     .auto_join_consumer_group()
+    //     .polling_strategy(PollingStrategy::next())
+    //     .poll_interval(IggyDuration::from_str("1ms").unwrap())
+    //     .batch_length(10)
+    //     .build();
+
+    // consumer.init().await.unwrap();
+
+    // let mut consumed_batches = 0;
+    // while let Some(message) = consumer.next().await {
+    //     if consumed_batches >= batch_limit {
+    //         break
+    //     }
+
+    //     if let Ok(message) = message {
+    //         // println!("got message");
+    //         consumed_batches += 1;
+    //     } else if let Err(error) = message {
+    //         panic!("{}", error.to_string());
+    //     }
+    // }
+
+    let total: u128 = t.iter().sum();
+    let avg = total as f64 / t.len() as f64;
+    println!("Среднее время выполнения одного батча: {:.3} мс", avg);
+}
+
+// sync: Время выполнения: 13.937269459s
diff --git a/core/sdk/src/clients/client.rs b/core/sdk/src/clients/client.rs
index 3e6e1c2b..6bbd224a 100644
--- a/core/sdk/src/clients/client.rs
+++ b/core/sdk/src/clients/client.rs
@@ -155,6 +155,7 @@ impl IggyClient {
             topic.to_owned(),
             self.encryptor.clone(),
             None,
+            
         ))
     }
 }
diff --git a/core/sdk/src/clients/mod.rs b/core/sdk/src/clients/mod.rs
index be854f1b..7067d2c2 100644
--- a/core/sdk/src/clients/mod.rs
+++ b/core/sdk/src/clients/mod.rs
@@ -32,6 +32,7 @@ pub mod consumer;
 pub mod consumer_builder;
 pub mod producer;
 pub mod producer_builder;
+pub mod send_mode;
 
 const ORDERING: std::sync::atomic::Ordering = 
std::sync::atomic::Ordering::SeqCst;
-const MAX_BATCH_SIZE: usize = 1000000;
+const MAX_BATCH_LENGTH: usize = 1000000;
diff --git a/core/sdk/src/clients/producer.rs b/core/sdk/src/clients/producer.rs
index f3706e3a..c0457538 100644
--- a/core/sdk/src/clients/producer.rs
+++ b/core/sdk/src/clients/producer.rs
@@ -1,3 +1,4 @@
+use super::send_mode::{BackpressureMode, SendMode};
 /* Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -15,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-use super::{MAX_BATCH_SIZE, ORDERING};
+use super::{MAX_BATCH_LENGTH, ORDERING};
 
 use bytes::Bytes;
 use futures_util::StreamExt;
@@ -25,12 +26,19 @@ use iggy_common::{
     CompressionAlgorithm, DiagnosticEvent, EncryptorKind, IdKind, Identifier, 
IggyDuration,
     IggyError, IggyExpiry, IggyMessage, IggyTimestamp, MaxTopicSize, 
Partitioner, Partitioning,
 };
+use std::fmt::Debug;
 use std::sync::Arc;
 use std::sync::atomic::{AtomicBool, AtomicU64};
-use std::time::Duration;
+use std::time::{Duration, Instant};
+use tokio::sync::Semaphore;
+use tokio::task::JoinHandle;
 use tokio::time::{Interval, sleep};
 use tracing::{error, info, trace, warn};
 
+pub trait ErrorCallback: Send + Sync + Debug {
+    fn call(&self, error: IggyError, messages: Vec<IggyMessage>);
+}
+
 unsafe impl Send for IggyProducer {}
 unsafe impl Sync for IggyProducer {}
 
@@ -44,6 +52,7 @@ pub struct IggyProducer {
     topic_name: String,
     batch_length: Option<usize>,
     partitioning: Option<Arc<Partitioning>>,
+    send_mode: Arc<SendMode>,
     encryptor: Option<Arc<EncryptorKind>>,
     partitioner: Option<Arc<dyn Partitioner>>,
     linger_time_micros: u64,
@@ -58,6 +67,11 @@ pub struct IggyProducer {
     last_sent_at: Arc<AtomicU64>,
     send_retries_count: Option<u32>,
     send_retries_interval: Option<IggyDuration>,
+
+    _join_handle: Option<JoinHandle<()>>,
+    sema: Arc<Semaphore>,
+    sender: Option<Arc<flume::Sender<Vec<IggyMessage>>>>,
+    error_callback: Option<Arc<dyn ErrorCallback>>,
 }
 
 impl IggyProducer {
@@ -72,7 +86,7 @@ impl IggyProducer {
         partitioning: Option<Partitioning>,
         encryptor: Option<Arc<EncryptorKind>>,
         partitioner: Option<Arc<dyn Partitioner>>,
-        interval: Option<IggyDuration>,
+        linger_time: Option<IggyDuration>,
         create_stream_if_not_exists: bool,
         create_topic_if_not_exists: bool,
         topic_partitions_count: u32,
@@ -81,6 +95,8 @@ impl IggyProducer {
         topic_max_size: MaxTopicSize,
         send_retries_count: Option<u32>,
         send_retries_interval: Option<IggyDuration>,
+        send_mode: SendMode,
+        error_callback: Option<Arc<dyn ErrorCallback>>,
     ) -> Self {
         Self {
             initialized: false,
@@ -92,9 +108,10 @@ impl IggyProducer {
             topic_name,
             batch_length,
             partitioning: partitioning.map(Arc::new),
+            send_mode: Arc::new(send_mode),
             encryptor,
             partitioner,
-            linger_time_micros: interval.map_or(0, |i| i.as_micros()),
+            linger_time_micros: linger_time.map_or(0, |i| i.as_micros()),
             create_stream_if_not_exists,
             create_topic_if_not_exists,
             topic_partitions_count,
@@ -102,10 +119,14 @@ impl IggyProducer {
             topic_message_expiry,
             topic_max_size,
             default_partitioning: Arc::new(Partitioning::balanced()),
-            can_send_immediately: interval.is_none(),
+            can_send_immediately: linger_time.is_none(),
             last_sent_at: Arc::new(AtomicU64::new(0)),
             send_retries_count,
             send_retries_interval,
+            _join_handle: None,
+            sema: Arc::new(Semaphore::new(10)),
+            sender: None,
+            error_callback,
         }
     }
 
@@ -179,8 +200,84 @@ impl IggyProducer {
                 .await?;
         }
 
+        let (tx, rx) = flume::bounded::<Vec<IggyMessage>>(0); // todo задать 
какое-то значение
+        let stream_id = self.stream_id.clone();
+        let topic_id = self.topic_id.clone();
+        let partitioning = self.partitioning.clone();
+        let default_partitioning = self.default_partitioning.clone();
+        let partitioner = self.partitioner.clone();
+        let client = self.client.clone();
+        let send_retries_count = self.send_retries_count.clone();
+        let send_retries_interval = self.send_retries_interval.clone();
+        let can_send = self.can_send.clone();
+        let sema = self.sema.clone();
+
+        let handle = tokio::spawn(async move {
+            while let Ok(mut batch) = rx.recv_async().await {
+                // let sema = sema.clone();
+                // let partitioner = partitioner.clone();
+                // let stream_id = stream_id.clone();
+                // let topic_id = topic_id.clone();
+                // let partitioning = partitioning.clone();
+                // let default_partitioning = default_partitioning.clone();
+                // let client = client.clone();
+                // let can_send = can_send.clone();
+
+                let partitioning = get_partitioning(
+                    &partitioner,
+                    &stream_id,
+                    &topic_id,
+                    &batch,
+                    partitioning.clone(),
+                    &partitioning,
+                    default_partitioning.clone(),
+                )
+                .unwrap();
+                try_send_messages(
+                    client.clone(),
+                    send_retries_count,
+                    send_retries_interval,
+                    can_send.clone(),
+                    &stream_id,
+                    &topic_id,
+                    &partitioning,
+                    &mut batch,
+                )
+                .await
+                .unwrap();
+                // tokio::spawn(async move {
+                //     let _permit = sema.acquire().await.unwrap();
+                //     let partitioning = get_partitioning(
+                //         &partitioner,
+                //         &stream_id,
+                //         &topic_id,
+                //         &batch,
+                //         partitioning.clone(),
+                //         &partitioning,
+                //         default_partitioning.clone(),
+                //     )
+                //     .unwrap();
+                //     try_send_messages(
+                //         client.clone(),
+                //         send_retries_count,
+                //         send_retries_interval,
+                //         can_send.clone(),
+                //         &stream_id,
+                //         &topic_id,
+                //         &partitioning,
+                //         &mut batch,
+                //     ).await.unwrap();
+                // });
+            }
+        });
+        self._join_handle = Some(handle);
+        self.sender = Some(Arc::new(tx));
         self.initialized = true;
-        info!("Producer has been initialized for stream: {stream_id} and 
topic: {topic_id}.");
+        info!(
+            "Producer has been initialized for stream: {} and topic: {}.",
+            self.stream_id.clone(),
+            self.topic_id.clone()
+        );
         Ok(())
     }
 
@@ -242,6 +339,12 @@ impl IggyProducer {
         .await
     }
 
+    // todod добавить канал для считывания
+    pub async fn send_async(&self, messages: Vec<IggyMessage>) {
+        let sender = self.sender.clone();
+        sender.unwrap().send_async(messages).await.unwrap();
+    }
+
     pub async fn send_one(&self, message: IggyMessage) -> Result<(), 
IggyError> {
         self.send(vec![message]).await
     }
@@ -293,6 +396,7 @@ impl IggyProducer {
             .await
     }
 
+    // TODO add batch_size
     async fn send_buffered(
         &self,
         stream: Arc<Identifier>,
@@ -301,8 +405,18 @@ impl IggyProducer {
         partitioning: Option<Arc<Partitioning>>,
     ) -> Result<(), IggyError> {
         self.encrypt_messages(&mut messages)?;
-        let partitioning = self.get_partitioning(&stream, &topic, &messages, 
partitioning)?;
-        let batch_length = self.batch_length.unwrap_or(MAX_BATCH_SIZE);
+        let default_partitioning = self.default_partitioning.clone();
+        let partitioner = self.partitioner.clone();
+        let partitioning = get_partitioning(
+            &partitioner,
+            &stream,
+            &topic,
+            &messages,
+            partitioning,
+            &self.partitioning,
+            default_partitioning,
+        )?;
+        let batch_length = self.batch_length.unwrap_or(MAX_BATCH_LENGTH);
         let batches = messages.chunks_mut(batch_length);
         let mut current_batch = 1;
         let batches_count = batches.len();
@@ -321,8 +435,30 @@ impl IggyProducer {
             );
             self.last_sent_at
                 .store(IggyTimestamp::now().into(), ORDERING);
-            self.try_send_messages(&self.stream_id, &self.topic_id, 
&partitioning, batch)
-                .await?;
+
+            let client = self.client.clone();
+            let send_retries_count = self.send_retries_count.clone();
+            let send_retries_interval = self.send_retries_interval.clone();
+            let can_send = self.can_send.clone();
+
+            let sender = self.sender.clone();
+            let send_mode = self.send_mode.clone();
+            try_send_messages_new(
+                BackpressureMode::Block,
+                sender,
+                self.error_callback.clone(),
+                send_mode,
+                client,
+                send_retries_count,
+                send_retries_interval,
+                can_send,
+                &self.stream_id,
+                &self.topic_id,
+                &partitioning,
+                batch,
+            )
+            .await?;
+
             trace!("Sent {messages_count} messages 
({current_batch}/{batches_count} batch(es)).");
             current_batch += 1;
         }
@@ -338,21 +474,56 @@ impl IggyProducer {
     ) -> Result<(), IggyError> {
         trace!("No batch size specified, sending messages immediately.");
         self.encrypt_messages(&mut messages)?;
-        let partitioning = self.get_partitioning(stream, topic, &messages, 
partitioning)?;
-        let batch_length = self.batch_length.unwrap_or(MAX_BATCH_SIZE);
+        let default_partitioning = self.default_partitioning.clone();
+        let partitioner = self.partitioner.clone();
+        let client = self.client.clone();
+        let send_retries_count = self.send_retries_count.clone();
+        let send_retries_interval = self.send_retries_interval.clone();
+        let can_send = self.can_send.clone();
+
+        let partitioning = get_partitioning(
+            &partitioner,
+            &stream,
+            &topic,
+            &messages,
+            partitioning,
+            &self.partitioning,
+            default_partitioning,
+        )?;
+        let batch_length = self.batch_length.unwrap_or(MAX_BATCH_LENGTH);
         if messages.len() <= batch_length {
             self.last_sent_at
                 .store(IggyTimestamp::now().into(), ORDERING);
-            self.try_send_messages(stream, topic, &partitioning, &mut messages)
-                .await?;
+            try_send_messages(
+                client.clone(),
+                send_retries_count,
+                send_retries_interval,
+                can_send,
+                stream,
+                topic,
+                &partitioning,
+                &mut messages,
+            )
+            .await?;
             return Ok(());
         }
 
         for batch in messages.chunks_mut(batch_length) {
+            let client = client.clone();
+            let can_send = can_send.clone();
             self.last_sent_at
                 .store(IggyTimestamp::now().into(), ORDERING);
-            self.try_send_messages(stream, topic, &partitioning, batch)
-                .await?;
+            try_send_messages(
+                client,
+                send_retries_count,
+                send_retries_interval,
+                can_send,
+                stream,
+                topic,
+                &partitioning,
+                batch,
+            )
+            .await?;
         }
         Ok(())
     }
@@ -385,144 +556,236 @@ impl IggyProducer {
         }
         Ok(())
     }
+}
 
-    async fn try_send_messages(
-        &self,
-        stream: &Identifier,
-        topic: &Identifier,
-        partitioning: &Arc<Partitioning>,
-        messages: &mut [IggyMessage],
-    ) -> Result<(), IggyError> {
-        let client = self.client.read().await;
-        let Some(max_retries) = self.send_retries_count else {
-            return client
-                .send_messages(stream, topic, partitioning, messages)
-                .await;
-        };
+fn get_partitioning(
+    partitioner: &Option<Arc<dyn Partitioner>>,
+    stream: &Identifier,
+    topic: &Identifier,
+    messages: &[IggyMessage],
+    partitioning: Option<Arc<Partitioning>>,
+    producer_partitioning: &Option<Arc<Partitioning>>,
+    default_partitioning: Arc<Partitioning>,
+) -> Result<Arc<Partitioning>, IggyError> {
+    if let Some(partitioner) = partitioner {
+        trace!("Calculating partition id using custom partitioner.");
+        let partition_id = partitioner.calculate_partition_id(stream, topic, 
messages)?;
+        Ok(Arc::new(Partitioning::partition_id(partition_id)))
+    } else {
+        trace!("Using the provided partitioning.");
+        Ok(partitioning.unwrap_or_else(|| {
+            producer_partitioning
+                .clone()
+                .unwrap_or_else(|| default_partitioning.clone())
+        }))
+    }
+}
 
-        if max_retries == 0 {
-            return client
-                .send_messages(stream, topic, partitioning, messages)
-                .await;
+async fn wait_until_connected(
+    can_send: Arc<AtomicBool>,
+    max_retries: u32,
+    stream: &Identifier,
+    topic: &Identifier,
+    timer: &mut Option<Interval>,
+) -> Result<(), IggyError> {
+    let mut retries = 0;
+    while !can_send.load(ORDERING) {
+        retries += 1;
+        if retries > max_retries {
+            error!(
+                "Failed to send messages to topic: {topic}, stream: {stream} \
+                 after {max_retries} retries. Client is disconnected."
+            );
+            return Err(IggyError::CannotSendMessagesDueToClientDisconnection);
         }
 
-        let mut timer = if let Some(interval) = self.send_retries_interval {
-            let mut timer = tokio::time::interval(interval.get_duration());
-            timer.tick().await;
-            Some(timer)
-        } else {
-            None
-        };
+        error!(
+            "Trying to send messages to topic: {topic}, stream: {stream} \
+             but the client is disconnected. Retrying 
{retries}/{max_retries}..."
+        );
 
-        self.wait_until_connected(max_retries, stream, topic, &mut timer)
-            .await?;
-        self.send_with_retries(
-            max_retries,
-            stream,
-            topic,
-            partitioning,
-            messages,
-            &mut timer,
-        )
-        .await
+        if let Some(timer) = timer.as_mut() {
+            trace!(
+                "Waiting for the next retry to send messages to topic: 
{topic}, \
+                 stream: {stream} for disconnected client..."
+            );
+            timer.tick().await;
+        }
     }
+    Ok(())
+}
+
+async fn send_with_retries(
+    client: Arc<IggySharedMut<Box<dyn Client>>>,
+    max_retries: u32,
+    stream: &Identifier,
+    topic: &Identifier,
+    partitioning: &Arc<Partitioning>,
+    messages: &mut [IggyMessage],
+    timer: &mut Option<Interval>,
+) -> Result<(), IggyError> {
+    let client = client.read().await;
+    let mut retries = 0;
+    loop {
+        match client
+            .send_messages(stream, topic, partitioning, messages)
+            .await
+        {
+            Ok(_) => return Ok(()),
+            Err(error) => {
+                retries += 1;
+                if retries > max_retries {
+                    error!(
+                        "Failed to send messages to topic: {topic}, stream: 
{stream} \
+                         after {max_retries} retries. {error}."
+                    );
+                    return Err(error);
+                }
 
-    async fn wait_until_connected(
-        &self,
-        max_retries: u32,
-        stream: &Identifier,
-        topic: &Identifier,
-        timer: &mut Option<Interval>,
-    ) -> Result<(), IggyError> {
-        let mut retries = 0;
-        while !self.can_send.load(ORDERING) {
-            retries += 1;
-            if retries > max_retries {
                 error!(
-                    "Failed to send messages to topic: {topic}, stream: 
{stream} \
-                     after {max_retries} retries. Client is disconnected."
+                    "Failed to send messages to topic: {topic}, stream: 
{stream}. \
+                     {error} Retrying {retries}/{max_retries}..."
                 );
-                return 
Err(IggyError::CannotSendMessagesDueToClientDisconnection);
-            }
-
-            error!(
-                "Trying to send messages to topic: {topic}, stream: {stream} \
-                 but the client is disconnected. Retrying 
{retries}/{max_retries}..."
-            );
 
-            if let Some(timer) = timer.as_mut() {
-                trace!(
-                    "Waiting for the next retry to send messages to topic: 
{topic}, \
-                     stream: {stream} for disconnected client..."
-                );
-                timer.tick().await;
+                if let Some(t) = timer.as_mut() {
+                    trace!(
+                        "Waiting for the next retry to send messages to topic: 
{topic}, \
+                         stream: {stream}..."
+                    );
+                    t.tick().await;
+                }
             }
         }
-        Ok(())
     }
+}
 
-    async fn send_with_retries(
-        &self,
-        max_retries: u32,
-        stream: &Identifier,
-        topic: &Identifier,
-        partitioning: &Arc<Partitioning>,
-        messages: &mut [IggyMessage],
-        timer: &mut Option<Interval>,
-    ) -> Result<(), IggyError> {
-        let client = self.client.read().await;
-        let mut retries = 0;
-        loop {
-            match client
-                .send_messages(stream, topic, partitioning, messages)
-                .await
-            {
-                Ok(_) => return Ok(()),
-                Err(error) => {
-                    retries += 1;
-                    if retries > max_retries {
-                        error!(
-                            "Failed to send messages to topic: {topic}, 
stream: {stream} \
-                             after {max_retries} retries. {error}."
-                        );
-                        return Err(error);
+async fn try_send_messages(
+    client: Arc<IggySharedMut<Box<dyn Client>>>,
+    send_retries_count: Option<u32>,
+    send_retries_interval: Option<IggyDuration>,
+    can_send: Arc<AtomicBool>,
+    stream: &Identifier,
+    topic: &Identifier,
+    partitioning: &Arc<Partitioning>,
+    messages: &mut [IggyMessage],
+) -> Result<(), IggyError> {
+    let rw_client = client.read().await;
+    let Some(max_retries) = send_retries_count else {
+        return rw_client
+            .send_messages(stream, topic, partitioning, messages)
+            .await;
+    };
+
+    if max_retries == 0 {
+        return rw_client
+            .send_messages(stream, topic, partitioning, messages)
+            .await;
+    }
+
+    let mut timer = if let Some(interval) = send_retries_interval {
+        let mut timer = tokio::time::interval(interval.get_duration());
+        timer.tick().await;
+        Some(timer)
+    } else {
+        None
+    };
+
+    wait_until_connected(can_send.clone(), max_retries, stream, topic, &mut 
timer).await?;
+    send_with_retries(
+        client.clone(),
+        max_retries,
+        stream,
+        topic,
+        partitioning,
+        messages,
+        &mut timer,
+    )
+    .await
+}
+
+async fn try_send_messages_new(
+    backpressure_mode: BackpressureMode,
+    sender: Option<Arc<flume::Sender<Vec<IggyMessage>>>>,
+    error_callback: Option<Arc<dyn ErrorCallback>>,
+    send_mode: Arc<SendMode>,
+    client: Arc<IggySharedMut<Box<dyn Client>>>,
+    send_retries_count: Option<u32>,
+    send_retries_interval: Option<IggyDuration>,
+    can_send: Arc<AtomicBool>,
+    stream: &Identifier,
+    topic: &Identifier,
+    partitioning: &Arc<Partitioning>,
+    messages: &mut [IggyMessage],
+) -> Result<(), IggyError> {
+    match &*send_mode {
+        SendMode::Sync => {
+            try_send_messages(
+                client,
+                send_retries_count,
+                send_retries_interval,
+                can_send,
+                stream,
+                topic,
+                partitioning,
+                messages,
+            )
+            .await
+        }
+
+        SendMode::Background(_cfg) => {
+            let out: Vec<IggyMessage> = 
messages.iter_mut().map(std::mem::take).collect();
+            let sender = sender.as_ref().expect("init() must set 
sender").clone();
+
+            match sender.try_send(out) {
+                Ok(()) => Ok(()),
+
+                Err(flume::TrySendError::Full(out)) => match backpressure_mode 
{
+                    BackpressureMode::Block => match 
sender.send_async(out).await {
+                        Ok(()) => Ok(()),
+                        Err(flume::SendError(out)) => {
+                            if let Some(cb) = error_callback {
+                                cb.call(IggyError::BackgroundSendError, out);
+                            }
+                            Err(IggyError::BackgroundSendError)
+                        }
+                    },
+
+                    BackpressureMode::BlockWithTimeout(t) => {
+                        match tokio::time::timeout(t.get_duration(), 
sender.send_async(out)).await {
+                            Ok(Ok(())) => Ok(()),
+
+                            Ok(Err(flume::SendError(out))) => {
+                                if let Some(cb) = error_callback {
+                                    cb.call(IggyError::BackgroundSendTimeout, 
out);
+                                }
+                                Err(IggyError::BackgroundSendTimeout)
+                            }
+
+                            Err(_) => {
+                                if let Some(cb) = &error_callback {
+                                    cb.call(IggyError::BackgroundSendTimeout, 
vec![]);
+                                }
+                                Err(IggyError::BackgroundSendTimeout)
+                            }
+                        }
                     }
 
-                    error!(
-                        "Failed to send messages to topic: {topic}, stream: 
{stream}. \
-                         {error} Retrying {retries}/{max_retries}..."
-                    );
+                    BackpressureMode::FailImmediately => {
+                        if let Some(cb) = &error_callback {
+                            cb.call(IggyError::BackgroundSendBufferFull, out);
+                        }
+                        Err(IggyError::BackgroundSendBufferFull)
+                    }
+                },
 
-                    if let Some(t) = timer.as_mut() {
-                        trace!(
-                            "Waiting for the next retry to send messages to 
topic: {topic}, \
-                             stream: {stream}..."
-                        );
-                        t.tick().await;
+                Err(flume::TrySendError::Disconnected(out)) => {
+                    if let Some(cb) = &error_callback {
+                        cb.call(IggyError::BackgroundWorkerDisconnected, out);
                     }
+                    error!("Background worker has shut down.");
+                    Err(IggyError::BackgroundWorkerDisconnected)
                 }
             }
         }
     }
-
-    fn get_partitioning(
-        &self,
-        stream: &Identifier,
-        topic: &Identifier,
-        messages: &[IggyMessage],
-        partitioning: Option<Arc<Partitioning>>,
-    ) -> Result<Arc<Partitioning>, IggyError> {
-        if let Some(partitioner) = &self.partitioner {
-            trace!("Calculating partition id using custom partitioner.");
-            let partition_id = partitioner.calculate_partition_id(stream, 
topic, messages)?;
-            Ok(Arc::new(Partitioning::partition_id(partition_id)))
-        } else {
-            trace!("Using the provided partitioning.");
-            Ok(partitioning.unwrap_or_else(|| {
-                self.partitioning
-                    .clone()
-                    .unwrap_or_else(|| self.default_partitioning.clone())
-            }))
-        }
-    }
 }
diff --git a/core/sdk/src/clients/producer_builder.rs 
b/core/sdk/src/clients/producer_builder.rs
index 76ecfeea..4d9590f4 100644
--- a/core/sdk/src/clients/producer_builder.rs
+++ b/core/sdk/src/clients/producer_builder.rs
@@ -15,8 +15,9 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use super::MAX_BATCH_SIZE;
-use crate::prelude::IggyProducer;
+use super::send_mode::{self, SendMode};
+use super::MAX_BATCH_LENGTH;
+use crate::prelude::{IggyProducer, ErrorCallback};
 use iggy_binary_protocol::Client;
 use iggy_common::locking::IggySharedMut;
 use iggy_common::{
@@ -36,6 +37,7 @@ pub struct IggyProducerBuilder {
     encryptor: Option<Arc<EncryptorKind>>,
     partitioner: Option<Arc<dyn Partitioner>>,
     linger_time: Option<IggyDuration>,
+    send_mode: SendMode,
     create_stream_if_not_exists: bool,
     create_topic_if_not_exists: bool,
     topic_partitions_count: u32,
@@ -44,6 +46,7 @@ pub struct IggyProducerBuilder {
     send_retries_interval: Option<IggyDuration>,
     topic_message_expiry: IggyExpiry,
     topic_max_size: MaxTopicSize,
+    error_callback: Option<Arc<dyn ErrorCallback>>
 }
 
 impl IggyProducerBuilder {
@@ -67,6 +70,7 @@ impl IggyProducerBuilder {
             partitioning: None,
             encryptor,
             partitioner,
+            send_mode: SendMode::default(),
             linger_time: Some(IggyDuration::from(1000)),
             create_stream_if_not_exists: true,
             create_topic_if_not_exists: true,
@@ -76,6 +80,7 @@ impl IggyProducerBuilder {
             topic_max_size: MaxTopicSize::ServerDefault,
             send_retries_count: Some(3),
             send_retries_interval: Some(IggyDuration::ONE_SECOND),
+            error_callback: None,
         }
     }
 
@@ -95,7 +100,7 @@ impl IggyProducerBuilder {
             batch_length: if batch_length == 0 {
                 None
             } else {
-                Some(batch_length.min(MAX_BATCH_SIZE as u32) as usize)
+                Some(batch_length.min(MAX_BATCH_LENGTH as u32) as usize)
             },
             ..self
         }
@@ -125,6 +130,20 @@ impl IggyProducerBuilder {
         }
     }
 
+    pub fn send_mode(self, send_mode: SendMode) -> Self {
+        Self {
+            send_mode,
+            ..self
+        }
+    }
+
+    pub fn error_callback(self, cb: Arc<dyn ErrorCallback>) -> Self {
+        Self {
+            error_callback: Some(cb),
+            ..self
+        }
+    }
+
     /// Sets the encryptor for encrypting the messages' payloads.
     pub fn encryptor(self, encryptor: Arc<EncryptorKind>) -> Self {
         Self {
@@ -249,6 +268,8 @@ impl IggyProducerBuilder {
             self.topic_max_size,
             self.send_retries_count,
             self.send_retries_interval,
+            self.send_mode,
+            self.error_callback,
         )
     }
 }
diff --git a/core/sdk/src/clients/send_mode.rs 
b/core/sdk/src/clients/send_mode.rs
new file mode 100644
index 00000000..98b3abef
--- /dev/null
+++ b/core/sdk/src/clients/send_mode.rs
@@ -0,0 +1,27 @@
+use iggy_common::IggyDuration;
+
+#[derive(Debug, Clone, Default)]
+pub enum SendMode {
+    #[default]
+    Sync,
+    Background(BackgroundConfig),
+}
+
+#[derive(Debug, Clone)]
+/// Determines how the `send_messages` API should behave when problem is 
encountered
+pub enum BackpressureMode {
+    /// Block until the send succeeds
+    Block,
+    /// Block with a timeout, after which the send fails
+    BlockWithTimeout(IggyDuration),
+    /// Fail immediately without retrying
+    FailImmediately,
+}
+
+#[derive(Debug, Clone)]
+pub struct BackgroundConfig {
+    pub max_in_flight: usize,
+    pub in_flight_timeout: Option<IggyDuration>,
+    pub batch_size: Option<usize>,
+    pub failure_mode: BackpressureMode,
+}
diff --git a/core/sdk/src/prelude.rs b/core/sdk/src/prelude.rs
index 3d87cc82..9b13f218 100644
--- a/core/sdk/src/prelude.rs
+++ b/core/sdk/src/prelude.rs
@@ -35,7 +35,7 @@ pub use crate::clients::consumer::{
     AutoCommit, AutoCommitAfter, AutoCommitWhen, IggyConsumer, ReceivedMessage,
 };
 pub use crate::clients::consumer_builder::IggyConsumerBuilder;
-pub use crate::clients::producer::IggyProducer;
+pub use crate::clients::producer::{IggyProducer, ErrorCallback};
 pub use crate::clients::producer_builder::IggyProducerBuilder;
 pub use crate::consumer_ext::IggyConsumerMessageExt;
 pub use crate::stream_builder::IggyConsumerConfig;


Reply via email to