This is an automated email from the ASF dual-hosted git repository.

bashirbekov pushed a commit to branch feat/add-background-send
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit 1263492d30cebf2515e7cc622b5c6d473d14fac0
Author: haze518 <[email protected]>
AuthorDate: Mon Jun 2 06:24:15 2025 +0600

    del
---
 core/common/src/error/iggy_error.rs                |  11 +-
 .../tests/examples/test_new_publisher.rs           |   3 +-
 core/sdk/src/clients/mod.rs                        |   1 -
 core/sdk/src/clients/producer.rs                   | 216 +++++++++++-------
 core/sdk/src/clients/producer_builder.rs           |  10 -
 core/sdk/src/clients/producer_dispatcher.rs        | 253 +++++++++++++++++----
 core/sdk/src/clients/send_mode.rs                  | 131 -----------
 7 files changed, 352 insertions(+), 273 deletions(-)

diff --git a/core/common/src/error/iggy_error.rs 
b/core/common/src/error/iggy_error.rs
index 9fe20359..487224f1 100644
--- a/core/common/src/error/iggy_error.rs
+++ b/core/common/src/error/iggy_error.rs
@@ -16,8 +16,10 @@
  * under the License.
  */
 
-use crate::utils::byte_size::IggyByteSize;
+use std::sync::Arc;
+
 use crate::utils::topic_size::MaxTopicSize;
+use crate::{IggyMessage, utils::byte_size::IggyByteSize};
 
 use strum::{EnumDiscriminants, FromRepr, IntoStaticStr};
 use thiserror::Error;
@@ -371,6 +373,13 @@ pub enum IggyError {
     BackgroundWorkerDisconnected = 4054,
     #[error("Background send buffer overflow")]
     BackgroundSendBufferOverflow = 4055,
+    #[error("Producer send failed")]
+    ProducerSendFailed {
+        cause: String,
+        failed: Arc<Vec<IggyMessage>>,
+    } = 4056,
+    #[error("Producer closed")]
+    ProducerClosed = 4057,
     #[error("Invalid offset: {0}")]
     InvalidOffset(u64) = 4100,
     #[error("Consumer group with ID: {0} for topic with ID: {1} was not 
found.")]
diff --git a/core/integration/tests/examples/test_new_publisher.rs 
b/core/integration/tests/examples/test_new_publisher.rs
index d9a2517e..5e425e2d 100644
--- a/core/integration/tests/examples/test_new_publisher.rs
+++ b/core/integration/tests/examples/test_new_publisher.rs
@@ -6,7 +6,6 @@ use std::time::Instant;
 
 use bytes::Bytes;
 use futures::StreamExt;
-use iggy::clients::send_mode::{BackgroundConfig, BackpressureMode, SendMode};
 use iggy::prelude::defaults::*;
 use iggy::prelude::*;
 use iggy::{clients::client::IggyClient, prelude::TcpClient};
@@ -45,7 +44,7 @@ async fn test_new_publisher() {
         .producer("1", "1")
         .unwrap()
         .batch_length(10)
-        .send_mode(SendMode::Background)
+        // .send_mode(SendMode::Background)
         .build();
 
     producer.init().await.unwrap();
diff --git a/core/sdk/src/clients/mod.rs b/core/sdk/src/clients/mod.rs
index 6f8f5f7c..7524e9e5 100644
--- a/core/sdk/src/clients/mod.rs
+++ b/core/sdk/src/clients/mod.rs
@@ -32,7 +32,6 @@ pub mod consumer;
 pub mod consumer_builder;
 pub mod producer;
 pub mod producer_builder;
-pub mod send_mode;
 mod producer_dispatcher;
 
 const ORDERING: std::sync::atomic::Ordering = 
std::sync::atomic::Ordering::SeqCst;
diff --git a/core/sdk/src/clients/producer.rs b/core/sdk/src/clients/producer.rs
index 2b45ae38..ed22d5a3 100644
--- a/core/sdk/src/clients/producer.rs
+++ b/core/sdk/src/clients/producer.rs
@@ -1,4 +1,3 @@
-use super::send_mode::{BackgroundConfig, BackpressureMode, Dispatcher, 
SendMode, shardMessage};
 /* Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -26,13 +25,13 @@ use iggy_common::{
     CompressionAlgorithm, DiagnosticEvent, EncryptorKind, IdKind, Identifier, 
IggyDuration,
     IggyError, IggyExpiry, IggyMessage, IggyTimestamp, MaxTopicSize, 
Partitioner, Partitioning,
 };
+use super::producer_dispatcher::{ProducerDispatcher, BalancedSharding, 
Sharding};
 use std::fmt::Debug;
+use std::marker::PhantomData;
 use std::sync::Arc;
 use std::sync::atomic::Ordering;
 use std::sync::atomic::{AtomicBool, AtomicU64};
 use std::time::Duration;
-use tokio::sync::Semaphore;
-use tokio::task::JoinHandle;
 use tokio::time::{Interval, sleep};
 use tracing::{error, info, trace, warn};
 
@@ -74,12 +73,6 @@ pub struct ProducerCore {
     last_sent_at: Arc<AtomicU64>,
     send_retries_count: Option<u32>,
     send_retries_interval: Option<IggyDuration>,
-
-    _join_handle: Option<JoinHandle<()>>,
-    sema: Arc<Semaphore>,
-    sender: Option<Arc<flume::Sender<Vec<IggyMessage>>>>,
-    error_callback: Option<Arc<dyn ErrorCallback>>,
-    shard_number: usize,
 }
 
 impl ProducerCore {
@@ -191,37 +184,75 @@ impl ProducerCore {
         });
     }
 
-    pub(crate) async fn send_internal(
-        &self,
-        stream: &Identifier,
-        topic: &Identifier,
-        mut msgs: Vec<IggyMessage>,
-        partitioning: Option<Arc<Partitioning>>,
-    ) -> Result<(), IggyError> {
-        if msgs.is_empty() {
-            return Ok(());
+// если не надо будет разбивать на чанки, то можем не кидать msg в ошибке
+// достаточно будет добавить функцию, которая будет принимать массив.
+// Тогда мы сможем передавать массив для отправки туда, при этом сохраняя 
доступ до сообщений.
+// Надо будет подумать над этим.
+// 
+pub(crate) async fn send_internal(
+    &self,
+    stream: &Identifier,
+    topic:  &Identifier,
+    mut msgs: Vec<IggyMessage>,
+    partitioning: Option<Arc<Partitioning>>,
+    split_chunks: bool
+) -> Result<(), IggyError> {
+    if msgs.is_empty() {
+        return Ok(());
+    }
+
+    if let Err(err) = self.encrypt_messages(&mut msgs) {
+        return Err(IggyError::ProducerSendFailed {
+            cause:  err.to_string(),
+            failed: Arc::new(msgs),
+        });
+    }
+
+    let part = match self.get_partitioning(stream, topic, &msgs, 
partitioning.clone()) {
+        Ok(p)  => p,
+        Err(err) => {
+            return Err(IggyError::ProducerSendFailed {
+                cause:  err.to_string(),
+                failed: Arc::new(msgs),
+            });
         }
+    };
 
-        self.encrypt_messages(&mut msgs)?;
+    if !self.can_send_immediately && self.linger_time_micros > 0 {
+        Self::wait_before_sending(self.linger_time_micros,
+                                  self.last_sent_at.load(ORDERING)).await;
+    }
 
-        let part = self.get_partitioning(stream, topic, &msgs, partitioning)?;
+    if !split_chunks {
+        self.try_send_messages(stream, topic, &part, &mut 
msgs).await.map_err(|err| IggyError::ProducerSendFailed {
+                cause:  err.to_string(),
+                failed: Arc::new(msgs),
+        })?;
+        self.last_sent_at.store(IggyTimestamp::now().into(), ORDERING);
+        return Ok(())
+    }
 
-        // todo add batch_size or batch_length
-        let max = self.batch_length.unwrap_or(MAX_BATCH_LENGTH);
+    let max = self.batch_length.unwrap_or(MAX_BATCH_LENGTH);
 
-        if !self.can_send_immediately && self.linger_time_micros > 0 {
-            Self::wait_before_sending(self.linger_time_micros, 
self.last_sent_at.load(ORDERING))
-                .await;
-        }
+    let mut index = 0;
+    while index < msgs.len() {
+        let end   = (index + max).min(msgs.len());
+        let chunk = &mut msgs[index..end];
 
-        for chunk in msgs.chunks_mut(max) {
-            self.last_sent_at
-                .store(IggyTimestamp::now().into(), ORDERING);
-            self.try_send_messages(stream, topic, &part, chunk).await?;
+        if let Err(err) = self.try_send_messages(stream, topic, &part, 
chunk).await {
+            let failed_tail = msgs.split_off(index);
+            return Err(IggyError::ProducerSendFailed {
+                cause:  err.to_string(),
+                failed: Arc::new(failed_tail),
+            });
         }
-        Ok(())
+        self.last_sent_at.store(IggyTimestamp::now().into(), ORDERING);
+        index = end;
     }
 
+    Ok(())
+}
+
     async fn try_send_messages(
         &self,
         stream: &Identifier,
@@ -392,20 +423,62 @@ impl ProducerCore {
     }
 }
 
-pub trait ErrorCallback: Send + Sync + Debug {
-    fn call(&self, error: IggyError, messages: Vec<IggyMessage>);
+pub struct ErrorCtx {
+    pub cause:        String,
+    pub stream:       Arc<Identifier>,
+    pub topic:        Arc<Identifier>,
+    pub partitioning: Option<Arc<Partitioning>>,
+    pub messages:     Arc<Vec<IggyMessage>>,
+}
+
+pub trait ErrorCallback: Send + Sync + Debug + 'static {
+    fn call(&self, ctx: ErrorCtx) -> impl Future<Output = ()> + Send;
+}
+
+pub struct LogErrorCallback;
+
+impl std::fmt::Debug for LogErrorCallback {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("LogErrorCallback").finish()
+    }
+}
+
+impl ErrorCallback for LogErrorCallback {
+    fn call(&self, ctx: ErrorCtx) -> impl Future<Output = ()> + Send {
+        async move {
+            let partitioning = ctx.partitioning
+                .as_ref()
+                .map(|p| format!("{:?}", p))
+                .unwrap_or_else(|| "None".to_string());
+
+            error!(
+                cause = ctx.cause,
+                stream = %ctx.stream,
+                topic = %ctx.topic,
+                partitioning = %partitioning,
+                num_messages = ctx.messages.len(),
+                "Failed to send messages in background task",
+            );
+        }
+    }
 }
 
-unsafe impl Send for IggyProducer {}
-unsafe impl Sync for IggyProducer {}
 
-pub struct IggyProducer {
+unsafe impl<S: Sharding, E: ErrorCallback> Send for IggyProducer<S, E> {}
+unsafe impl<S: Sharding, E: ErrorCallback> Sync for IggyProducer<S, E> {}
+
+pub struct IggyProducer<S: Sharding, E: ErrorCallback> {
     core: Arc<ProducerCore>,
-    send_mode: SendMode,
-    dispatcher: Option<Dispatcher>,
+    dispatcher: Option<ProducerDispatcher<S, E>>,
+
+    _phantom: PhantomData<E>,
 }
 
-impl IggyProducer {
+impl<S, E> IggyProducer<S, E>
+where
+    S: Sharding,
+    E: ErrorCallback,
+{
     #[allow(clippy::too_many_arguments)]
     pub(crate) fn new(
         client: IggySharedMut<Box<dyn Client>>,
@@ -426,8 +499,7 @@ impl IggyProducer {
         topic_max_size: MaxTopicSize,
         send_retries_count: Option<u32>,
         send_retries_interval: Option<IggyDuration>,
-        send_mode: SendMode,
-        error_callback: Option<Arc<dyn ErrorCallback>>,
+        background_config: Option<BackgroundConfig<S, E>>,
     ) -> Self {
         let core = Arc::new(ProducerCore {
             initialized: AtomicBool::new(true),
@@ -453,27 +525,16 @@ impl IggyProducer {
             last_sent_at: Arc::new(AtomicU64::new(0)),
             send_retries_count,
             send_retries_interval,
-            _join_handle: None,
-            sema: Arc::new(Semaphore::new(10)),
-            sender: None,
-            error_callback,
-            shard_number: default_shard_count(),
         });
-        let num_shards = default_shard_count(); // todo потом заменмить на 
норм значепние
-        let config = BackgroundConfig {
-            max_in_flight: 4,
-            in_flight_timeout: None,
-            batch_size: None,
-            failure_mode: BackpressureMode::Block,
+        let dispatcher = match background_config {
+            Some(config) => {
+                Some(ProducerDispatcher::new(core.clone(), config))
+            }
+            None => None
         };
-        let mut dispatcher = None;
-        if send_mode == SendMode::Background {
-            dispatcher = Some(Dispatcher::new(core.clone(), num_shards, 
Arc::new(config)));
-        }
 
         Self {
             core,
-            send_mode,
             dispatcher: dispatcher,
         }
     }
@@ -502,24 +563,9 @@ impl IggyProducer {
         let stream_id = self.core.stream_id.clone();
         let topic_id = self.core.topic_id.clone();
 
-        match &self.send_mode {
-            SendMode::Sync => {
-                self.core
-                    .send_internal(&stream_id, &topic_id, messages, None)
-                    .await
-            }
-            SendMode::Background => match &self.dispatcher {
-                Some(disp) => disp
-                    .dispatch(shardMessage {
-                        messages,
-                        stream: stream_id,
-                        topic: topic_id,
-                        partitioning: None,
-                    })
-                    .await
-                    .map_err(|err| IggyError::BackgroundSendError),
-                None => Ok(()),
-            },
+        match &self.dispatcher {
+            Some(disp) => disp.dispatch(messages, stream_id, topic_id, 
None).await,
+            None => self.core.send_internal(&stream_id, &topic_id, messages, 
None, true).await,
         }
     }
 
@@ -537,12 +583,13 @@ impl IggyProducer {
             return Ok(());
         }
 
-        let stream_id = &self.core.stream_id;
-        let topic_id = &self.core.topic_id;
+        let stream_id = self.core.stream_id.clone();
+        let topic_id = self.core.topic_id.clone();
 
-        self.core
-            .send_internal(stream_id, topic_id, messages, partitioning)
-            .await
+        match &self.dispatcher {
+            Some(disp) => disp.dispatch(messages, stream_id, topic_id, 
partitioning).await,
+            None => self.core.send_internal(&stream_id, &topic_id, messages, 
partitioning, true).await,
+        }
     }
 
     pub async fn send_to(
@@ -557,10 +604,17 @@ impl IggyProducer {
             return Ok(());
         }
 
+        // todo add send via dispatcher
         self.core
-            .send_internal(&stream, &topic, messages, partitioning)
+            .send_internal(&stream, &topic, messages, partitioning, true)
             .await
     }
+
+    pub async fn shutdown(self) {
+        if let Some(disp) = self.dispatcher {
+            disp.shutdown().await;
+        }
+    }
 }
 
 fn default_shard_count() -> usize {
diff --git a/core/sdk/src/clients/producer_builder.rs 
b/core/sdk/src/clients/producer_builder.rs
index 4d9590f4..89843800 100644
--- a/core/sdk/src/clients/producer_builder.rs
+++ b/core/sdk/src/clients/producer_builder.rs
@@ -15,7 +15,6 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use super::send_mode::{self, SendMode};
 use super::MAX_BATCH_LENGTH;
 use crate::prelude::{IggyProducer, ErrorCallback};
 use iggy_binary_protocol::Client;
@@ -37,7 +36,6 @@ pub struct IggyProducerBuilder {
     encryptor: Option<Arc<EncryptorKind>>,
     partitioner: Option<Arc<dyn Partitioner>>,
     linger_time: Option<IggyDuration>,
-    send_mode: SendMode,
     create_stream_if_not_exists: bool,
     create_topic_if_not_exists: bool,
     topic_partitions_count: u32,
@@ -70,7 +68,6 @@ impl IggyProducerBuilder {
             partitioning: None,
             encryptor,
             partitioner,
-            send_mode: SendMode::default(),
             linger_time: Some(IggyDuration::from(1000)),
             create_stream_if_not_exists: true,
             create_topic_if_not_exists: true,
@@ -130,13 +127,6 @@ impl IggyProducerBuilder {
         }
     }
 
-    pub fn send_mode(self, send_mode: SendMode) -> Self {
-        Self {
-            send_mode,
-            ..self
-        }
-    }
-
     pub fn error_callback(self, cb: Arc<dyn ErrorCallback>) -> Self {
         Self {
             error_callback: Some(cb),
diff --git a/core/sdk/src/clients/producer_dispatcher.rs 
b/core/sdk/src/clients/producer_dispatcher.rs
index 2f6a25d7..818c958c 100644
--- a/core/sdk/src/clients/producer_dispatcher.rs
+++ b/core/sdk/src/clients/producer_dispatcher.rs
@@ -1,15 +1,41 @@
-use std::sync::{
-    Arc,
-    atomic::{AtomicUsize, Ordering},
-};
+use std::{sync::{
+    atomic::{AtomicBool, AtomicUsize, Ordering}, Arc
+}, time::Duration};
 
 use iggy_common::{
     Identifier, IggyByteSize, IggyDuration, IggyError, IggyMessage, 
Partitioning, Sizeable,
 };
 use tokio::{sync::Notify, task::JoinHandle};
 use tracing::error;
+use futures::FutureExt;
+
+use crate::prelude::ErrorCallback;
+
+#[derive(Debug, Clone)]
+/// Determines how the `send_messages` API should behave when problem is 
encountered
+pub enum BackpressureMode {
+    /// Block until the send succeeds
+    Block,
+    /// Block with a timeout, after which the send fails
+    BlockWithTimeout(IggyDuration),
+    /// Fail immediately without retrying
+    FailImmediately,
+}
+
+#[derive(Debug, Clone)]
+pub struct BackgroundConfig<S: Sharding, E: ErrorCallback> {
+    pub max_in_flight: usize,
+    pub in_flight_timeout: Option<IggyDuration>,
+    pub batch_size: Option<usize>,
+    pub batch_length: Option<usize>,
+    pub failure_mode: BackpressureMode,
+    pub buffer_size: Option<IggyByteSize>,
+    pub linger_time: Option<IggyDuration>,
+    pub error_callback: Arc<E>,
+    pub sharding: Box<S>,
+}
 
-use super::{producer::ProducerCore, send_mode::BackgroundConfig};
+use super::{producer::{ErrorCtx, ProducerCore}};
 
 #[derive(Debug)]
 pub struct ShardMessage {
@@ -38,23 +64,123 @@ pub struct Shard {
 }
 
 impl Shard {
+    pub fn new_(
+        core: Arc<ProducerCore>,
+        global_buffer: Arc<AtomicUsize>,
+        config: Arc<BackgroundConfig<impl Sharding, impl ErrorCallback>>,
+        notify: Arc<Notify>,
+        err_sender: flume::Sender<ErrorCtx>,
+    ) -> Self {
+        let (tx, rx) = flume::bounded::<ShardMessage>(1024);
+
+        let handle = tokio::spawn(async move {
+            let mut buffer = Vec::new();
+            let mut buffer_bytes = 0usize;
+            let mut last_flush = tokio::time::Instant::now();
+
+            loop {
+                tokio::select! {
+                    maybe_msg = rx.recv_async() => {
+                        match maybe_msg {
+                            Ok(msg) => {
+                                buffer_bytes += 
msg.get_size_bytes().as_bytes_usize();
+                                buffer.push(msg);
+
+                                let exceed_batch_len = config.batch_length
+                                    .map_or(false, |len| buffer.len() >= len);
+                                let exceed_batch_size = config.batch_size
+                                    .map_or(false, |size| buffer_bytes >= 
size);
+
+                                if exceed_batch_len || exceed_batch_size {
+                                    Self::flush_buffer(&core, &mut buffer, 
&mut buffer_bytes, &global_buffer, &err_sender, &notify).await;
+                                    last_flush = tokio::time::Instant::now();
+                                }
+                            }
+                            Err(_) => break,
+                        }
+                    }
+                    _ = 
tokio::time::sleep(config.linger_time.map_or(Duration::from_secs(u64::MAX), |d| 
d.get_duration())) => {
+                        if !buffer.is_empty() {
+                            Self::flush_buffer(&core, &mut buffer, &mut 
buffer_bytes, &global_buffer, &err_sender, &notify).await;
+                            last_flush = tokio::time::Instant::now();
+                        }
+                    }
+                }
+            }
+        });
+    }
+
+    async fn flush_buffer(
+        core: &Arc<ProducerCore>,
+        buffer: &mut Vec<ShardMessage>,
+        buffer_bytes: &mut usize,
+        global_buffer: &Arc<AtomicUsize>,
+        err_sender: &flume::Sender<ErrorCtx>,
+        notify: &Arc<Notify>,
+    ) {
+        for msg in buffer.drain(..) {
+            let size = msg.get_size_bytes().as_bytes_usize();
+            if let Err(err) = core
+                .send_internal(&msg.stream, &msg.topic, msg.messages, 
msg.partitioning.clone(), false)
+                .await
+            {
+                if let IggyError::ProducerSendFailed { failed, cause } = &err {
+                    let ctx = ErrorCtx {
+                        cause: cause.clone(),
+                        stream: msg.stream,
+                        topic: msg.topic,
+                        partitioning: msg.partitioning,
+                        messages: failed.clone(),
+                    };
+                    if err_sender.send_async(ctx).await.is_err() {
+                        tracing::warn!("error-queue receiver has been dropped; 
lost error report");
+                    }
+                } else {
+                    tracing::error!("background send failed: {err}");
+                }
+            }
+
+            global_buffer.fetch_sub(size, Ordering::Relaxed);
+            notify.notify_waiters();
+        }
+        *buffer_bytes = 0;
+    }
+
     pub fn new(
         core: Arc<ProducerCore>,
-        current_buffered: Arc<AtomicUsize>,
+        current_buffered_size: Arc<AtomicUsize>,
         notify: Arc<Notify>,
+        err_sender: flume::Sender<ErrorCtx>,
     ) -> Self {
         let (tx, rx) = flume::bounded::<ShardMessage>(10); // use from config
         let handle = tokio::spawn(async move {
             while let Ok(msg) = rx.recv_async().await {
                 let size = msg.get_size_bytes();
-                if let Err(e) = core
-                    .send_internal(&msg.stream, &msg.topic, msg.messages, 
msg.partitioning)
+                if let Err(err) = core
+                    .send_internal(
+                        &msg.stream,
+                        &msg.topic,
+                        msg.messages,
+                        msg.partitioning.clone(),
+                    )
                     .await
                 {
-                    // send to err chan
-                    // error!("{:?}", e);
+                    if let IggyError::ProducerSendFailed { failed, cause } = 
&err {
+                        let ctx = ErrorCtx {
+                            cause: cause.clone(),
+                            stream: msg.stream,
+                            topic: msg.topic,
+                            partitioning: msg.partitioning,
+                            messages: failed.clone(),
+                        };
+                        if err_sender.send_async(ctx).await.is_err() {
+                            tracing::warn!("error-queue receiver has been 
dropped; lost error report");
+                        }
+                    } else {
+                        tracing::error!("background send failed: {err}");
+                    }
                 }
-                current_buffered.fetch_sub(size.as_bytes_usize(), 
Ordering::Relaxed);
+                current_buffered_size.fetch_sub(size.as_bytes_usize(), 
Ordering::Relaxed);
                 notify.notify_waiters();
             }
         });
@@ -95,9 +221,14 @@ impl Shard {
             IggyError::BackgroundSendError
         })
     }
+
+    async fn shutdown(self) {
+        drop(self.tx);
+        let _ = self._handle.await;
+    }
 }
 
-pub trait Sharding {
+pub trait Sharding: Default + Send + Sync + 'static {
     fn pick_shard(
         &self,
         shards: &[Shard],
@@ -107,11 +238,11 @@ pub trait Sharding {
     ) -> usize;
 }
 
-pub struct RoundRobinSharding {
+pub struct BalancedSharding {
     counter: AtomicUsize,
 }
 
-impl Default for RoundRobinSharding {
+impl Default for BalancedSharding {
     fn default() -> Self {
         Self {
             counter: AtomicUsize::new(0),
@@ -119,7 +250,7 @@ impl Default for RoundRobinSharding {
     }
 }
 
-impl Sharding for RoundRobinSharding {
+impl Sharding for BalancedSharding {
     fn pick_shard(
         &self,
         shards: &[Shard],
@@ -131,55 +262,62 @@ impl Sharding for RoundRobinSharding {
     }
 }
 
-pub enum Backpressure {
-    /// Block until the send succeeds
-    Block,
-    /// Block with a timeout, after which the send fails
-    BlockWithTimeout(IggyDuration),
-    /// Fail immediately without retrying
-    FailImmediately,
-}
-
-struct ProducerDispatcher<S: Sharding> {
+pub struct ProducerDispatcher<S: Sharding, E: ErrorCallback> {
     core: Arc<ProducerCore>,
-    backpressure: Backpressure,
-    sharding: S,
     shards: Vec<Shard>,
-    current_buffered: Arc<AtomicUsize>,
+    current_buffered: Arc<AtomicUsize>, // todo добавить аналог, но по 
length(in-flight)
     notify: Arc<Notify>,
-    config: Arc<BackgroundConfig>,
+    config: Arc<BackgroundConfig<S, E>>,
+    closed: AtomicBool,
+    _join_handle: JoinHandle<()>,
 }
 
-impl<S> ProducerDispatcher<S>
+impl<S, E> ProducerDispatcher<S, E>
 where
     S: Sharding,
+    E: ErrorCallback,
 {
     pub fn new(
         core: Arc<ProducerCore>,
-        backpressure: Backpressure,
-        config: Arc<BackgroundConfig>,
-        sharding: S,
+        config: BackgroundConfig<S, E>,
     ) -> Self {
         let mut shards = Vec::with_capacity(config.max_in_flight);
-        let current_buffered = Arc::new(AtomicUsize::new(0));
+        let current_buffered_size = Arc::new(AtomicUsize::new(0));
+        // let current_buffered_length = Arc::new(AtomicUsize::new(0));
         let notify = Arc::new(Notify::new());
+        let config = Arc::new(config);
+
+        let (err_tx, err_rx) = flume::unbounded::<ErrorCtx>();
+        let err_callback = config.error_callback.clone();
+        let handle = tokio::spawn(async move {
+            while let Ok(ctx) = err_rx.recv_async().await {
+                if let Err(panic) = 
std::panic::AssertUnwindSafe(err_callback.call(ctx))
+                    .catch_unwind()
+                    .await
+                {
+                    tracing::error!("error_callback panicked: {:?}", panic);
+                }
+            }
+            tracing::debug!("error-callback worker finished");
+        });
 
         for _ in 0..config.max_in_flight {
             shards.push(Shard::new(
                 core.clone(),
-                current_buffered.clone(),
+                current_buffered_size.clone(),
                 notify.clone(),
+                err_tx.clone(),
             ));
         }
 
         Self {
             core,
-            backpressure,
-            sharding,
             shards,
-            current_buffered,
+            current_buffered: current_buffered_size,
             config,
             notify,
+            closed: AtomicBool::new(false),
+            _join_handle: handle,
         }
     }
 
@@ -190,6 +328,10 @@ where
         topic: Arc<Identifier>,
         partitioning: Option<Arc<Partitioning>>,
     ) -> Result<(), IggyError> {
+        if self.closed.load(Ordering::Relaxed) {
+            return Err(IggyError::ProducerClosed);
+        }
+
         let shard_message = ShardMessage {
             messages,
             stream,
@@ -207,12 +349,12 @@ where
                 if buffer_size.as_bytes_usize() != 0
                     && reserved + batch_bytes.as_bytes_usize() > 
buffer_size.as_bytes_usize()
                 {
-                    match self.backpressure {
-                        Backpressure::Block => {
+                    match self.config.failure_mode {
+                        BackpressureMode::Block => {
                             self.notify.notified().await;
                             continue;
                         }
-                        Backpressure::BlockWithTimeout(t) => {
+                        BackpressureMode::BlockWithTimeout(t) => {
                             if tokio::time::timeout(t.get_duration(), 
self.notify.notified())
                                 .await
                                 .is_err()
@@ -221,7 +363,7 @@ where
                             }
                             continue;
                         }
-                        Backpressure::FailImmediately => {
+                        BackpressureMode::FailImmediately => {
                             return 
Err(IggyError::BackgroundSendBufferOverflow);
                         }
                     };
@@ -238,7 +380,7 @@ where
             }
         }
 
-        let shard_ix = self.sharding.pick_shard(
+        let shard_ix = self.config.sharding.pick_shard(
             &self.shards,
             &shard_message.messages,
             &shard_message.stream,
@@ -246,10 +388,12 @@ where
         );
         let shard = self.shards.get(shard_ix).unwrap();
 
-        let result = match self.backpressure {
-            Backpressure::Block => shard.send_with_block(shard_message).await,
-            Backpressure::BlockWithTimeout(t) => 
shard.send_with_timeout(shard_message, t).await,
-            Backpressure::FailImmediately => 
shard.send_with_fail(shard_message),
+        let result = match self.config.failure_mode {
+            BackpressureMode::Block => 
shard.send_with_block(shard_message).await,
+            BackpressureMode::BlockWithTimeout(t) => {
+                shard.send_with_timeout(shard_message, t).await
+            }
+            BackpressureMode::FailImmediately => 
shard.send_with_fail(shard_message),
         };
         if result.is_err() {
             self.current_buffered
@@ -257,4 +401,19 @@ where
         }
         result
     }
+
+    pub async fn shutdown(mut self) {
+        if self.closed.swap(true, Ordering::Relaxed) {
+            return;
+        }
+
+        let mut handles = Vec::with_capacity(self.shards.len());
+        for shard in self.shards.drain(..) {
+            handles.push(shard.shutdown());
+        }
+
+        futures::future::join_all(handles).await;
+        let _ = self._join_handle.await;
+        debug_assert_eq!(self.current_buffered.load(Ordering::Relaxed), 0);
+    }
 }
diff --git a/core/sdk/src/clients/send_mode.rs 
b/core/sdk/src/clients/send_mode.rs
deleted file mode 100644
index 63145db3..00000000
--- a/core/sdk/src/clients/send_mode.rs
+++ /dev/null
@@ -1,131 +0,0 @@
-use std::sync::{atomic::{AtomicUsize, Ordering}, Arc};
-
-use iggy_common::{Identifier, IggyByteSize, IggyDuration, IggyError, 
IggyMessage, Partitioning};
-use tokio::task::JoinHandle;
-use tracing::error;
-
-use super::producer::ProducerCore;
-
-#[derive(Debug, Clone, Default, PartialEq)]
-pub enum SendMode {
-    #[default]
-    Sync,
-    Background,
-}
-
-#[derive(Debug, Clone)]
-/// Determines how the `send_messages` API should behave when problem is 
encountered
-pub enum BackpressureMode {
-    /// Block until the send succeeds
-    Block,
-    /// Block with a timeout, after which the send fails
-    BlockWithTimeout(IggyDuration),
-    /// Fail immediately without retrying
-    FailImmediately,
-}
-
-#[derive(Debug, Clone)]
-pub struct BackgroundConfig {
-    pub max_in_flight: usize,
-    pub in_flight_timeout: Option<IggyDuration>,
-    pub batch_size: Option<usize>,
-    pub failure_mode: BackpressureMode,
-    pub buffer_size: Option<IggyByteSize>, // rename: maximum_buffer_size
-}
-
-pub struct shardMessage {
-    pub stream: Arc<Identifier>,
-    pub topic: Arc<Identifier>,
-    pub messages: Vec<IggyMessage>,
-    pub partitioning: Option<Arc<Partitioning>>,
-}
-
-struct shard {
-    core: Arc<ProducerCore>,
-    tx: flume::Sender<shardMessage>,
-    _join_handle: JoinHandle<()>,
-}
-
-impl shard {
-    fn new(id: usize, producer_core: Arc<ProducerCore>) -> Self {
-        let (tx, rx) = flume::bounded::<shardMessage>(10); // todo добавить 
размер в конфигурацию
-        let core = producer_core.clone();
-        let handle = tokio::spawn(async move {
-            while let Ok(message) = rx.recv_async().await {
-                // todo поменять на match
-                core.send_internal(&message.stream, &message.topic, 
message.messages, message.partitioning).await.map_err(|e| {
-                    error!("{e}");
-                }).unwrap();
-            }
-        });
-        Self {
-            core: producer_core,
-            tx,
-            _join_handle: handle,
-        }
-    }
-
-    async fn send(
-        &self,
-        message: shardMessage,
-    ) -> Result<(), IggyError> {
-        self.tx.send_async(message).await.map_err(|_| 
IggyError::BackgroundSendError)
-    }
-
-    async fn send_timeout(&self, message: shardMessage, timeout: IggyDuration) 
-> Result<(), IggyError> {
-        match tokio::time::timeout(timeout.get_duration(), 
self.tx.send_async(message)).await {
-            Ok(Ok(())) => Ok(()),
-            Ok(Err(e)) => Err(IggyError::BackgroundSendTimeout),
-            Err(_) => Err(IggyError::BackgroundSendTimeout)
-        }
-    }
-
-    fn send_with_fail(&self, message: shardMessage) -> Result<(), IggyError> {
-        self.tx.try_send(message).map_err(|_| IggyError::BackgroundSendError)
-    }
-}
-
-pub struct Dispatcher {
-    config: Arc<BackgroundConfig>,
-    sender: flume::Sender<shardMessage>,
-    _join_handle: JoinHandle<()>,
-}
-
-impl Dispatcher {
-    pub fn new(core: Arc<ProducerCore>, num_shards: usize, config: 
Arc<BackgroundConfig>) -> Self {
-        let mut shards = Vec::with_capacity(num_shards);
-        for i in 0..num_shards {
-            shards.push(shard::new(i, core.clone()));
-        }
-
-        let (tx, rx) = flume::bounded::<shardMessage>(0);
-        let sent = AtomicUsize::new(0);
-        let inner_config = config.clone();
-        let handle = tokio::spawn(async move {
-            loop {
-                if let Ok(msg) = rx.recv_async().await {
-                    let ix = sent.fetch_add(1, Ordering::SeqCst) % num_shards;
-                    let shard = shards.get(ix).unwrap();
-                    let result = match inner_config.failure_mode {
-                        BackpressureMode::Block => shard.send(msg).await,
-                        BackpressureMode::BlockWithTimeout(t) => 
shard.send_timeout(msg, t).await,
-                        BackpressureMode::FailImmediately => 
shard.send_with_fail(msg),
-                    };
-                    if let Err(e) = result {
-                        // todo добавить канал для ошибок
-                        error!("{}", e);
-                    }
-                }
-            }
-        });
-        Self {
-            config,
-            sender: tx,
-            _join_handle: handle,
-        }
-    }
-
-    pub async fn dispatch(&self, msg: shardMessage) -> Result<(), 
flume::SendError<shardMessage>> {
-        self.sender.send_async(msg).await
-    }
-}

Reply via email to