This is an automated email from the ASF dual-hosted git repository.

bashirbekov pushed a commit to branch feat/add-background-send
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/feat/add-background-send by 
this push:
     new 77c25b83 del
77c25b83 is described below

commit 77c25b83d1bb41d6efccf0e58d48240ef12696bf
Author: haze518 <[email protected]>
AuthorDate: Mon Jun 2 10:10:28 2025 +0600

    del
---
 core/sdk/src/clients/producer.rs            | 166 +++++++++++++++-------------
 core/sdk/src/clients/producer_builder.rs    |  36 +++---
 core/sdk/src/clients/producer_dispatcher.rs | 120 +++++++-------------
 3 files changed, 151 insertions(+), 171 deletions(-)

diff --git a/core/sdk/src/clients/producer.rs b/core/sdk/src/clients/producer.rs
index ed22d5a3..cb0b77ce 100644
--- a/core/sdk/src/clients/producer.rs
+++ b/core/sdk/src/clients/producer.rs
@@ -17,6 +17,7 @@
  */
 use super::{MAX_BATCH_LENGTH, ORDERING};
 
+use super::producer_dispatcher::{BackgroundConfig, BalancedSharding, 
ProducerDispatcher, Sharding};
 use bytes::Bytes;
 use futures_util::StreamExt;
 use iggy_binary_protocol::Client;
@@ -25,9 +26,7 @@ use iggy_common::{
     CompressionAlgorithm, DiagnosticEvent, EncryptorKind, IdKind, Identifier, 
IggyDuration,
     IggyError, IggyExpiry, IggyMessage, IggyTimestamp, MaxTopicSize, 
Partitioner, Partitioning,
 };
-use super::producer_dispatcher::{ProducerDispatcher, BalancedSharding, 
Sharding};
 use std::fmt::Debug;
-use std::marker::PhantomData;
 use std::sync::Arc;
 use std::sync::atomic::Ordering;
 use std::sync::atomic::{AtomicBool, AtomicU64};
@@ -49,6 +48,11 @@ pub trait AsyncSendStrategy: SendStrategy {
     fn flush(&self) -> Result<(), IggyError>;
 }
 
+pub(crate) enum SendMode {
+    Direct,
+    Chunked,
+}
+
 pub struct ProducerCore {
     initialized: AtomicBool,
     can_send: Arc<AtomicBool>,
@@ -184,74 +188,77 @@ impl ProducerCore {
         });
     }
 
-// если не надо будет разбивать на чанки, то можем не кидать msg в ошибке
-// достаточно будет добавить функцию, которая будет принимать массив.
-// Тогда мы сможем передавать массив для отправки туда, при этом сохраняя 
доступ до сообщений.
-// Надо будет подумать над этим.
-// 
-pub(crate) async fn send_internal(
-    &self,
-    stream: &Identifier,
-    topic:  &Identifier,
-    mut msgs: Vec<IggyMessage>,
-    partitioning: Option<Arc<Partitioning>>,
-    split_chunks: bool
-) -> Result<(), IggyError> {
-    if msgs.is_empty() {
-        return Ok(());
-    }
-
-    if let Err(err) = self.encrypt_messages(&mut msgs) {
-        return Err(IggyError::ProducerSendFailed {
-            cause:  err.to_string(),
-            failed: Arc::new(msgs),
-        });
-    }
+    pub(crate) async fn send_internal(
+        &self,
+        stream: &Identifier,
+        topic: &Identifier,
+        mut msgs: Vec<IggyMessage>,
+        partitioning: Option<Arc<Partitioning>>,
+        mode: SendMode,
+    ) -> Result<(), IggyError> {
+        if msgs.is_empty() {
+            return Ok(());
+        }
 
-    let part = match self.get_partitioning(stream, topic, &msgs, 
partitioning.clone()) {
-        Ok(p)  => p,
-        Err(err) => {
+        if let Err(err) = self.encrypt_messages(&mut msgs) {
             return Err(IggyError::ProducerSendFailed {
-                cause:  err.to_string(),
+                cause: err.to_string(),
                 failed: Arc::new(msgs),
             });
         }
-    };
-
-    if !self.can_send_immediately && self.linger_time_micros > 0 {
-        Self::wait_before_sending(self.linger_time_micros,
-                                  self.last_sent_at.load(ORDERING)).await;
-    }
-
-    if !split_chunks {
-        self.try_send_messages(stream, topic, &part, &mut 
msgs).await.map_err(|err| IggyError::ProducerSendFailed {
-                cause:  err.to_string(),
-                failed: Arc::new(msgs),
-        })?;
-        self.last_sent_at.store(IggyTimestamp::now().into(), ORDERING);
-        return Ok(())
-    }
 
-    let max = self.batch_length.unwrap_or(MAX_BATCH_LENGTH);
+        let part = match self.get_partitioning(stream, topic, &msgs, 
partitioning.clone()) {
+            Ok(p) => p,
+            Err(err) => {
+                return Err(IggyError::ProducerSendFailed {
+                    cause: err.to_string(),
+                    failed: Arc::new(msgs),
+                });
+            }
+        };
 
-    let mut index = 0;
-    while index < msgs.len() {
-        let end   = (index + max).min(msgs.len());
-        let chunk = &mut msgs[index..end];
+        match mode {
+            SendMode::Direct => {
+                self.try_send_messages(stream, topic, &part, &mut msgs)
+                    .await
+                    .map_err(|err| IggyError::ProducerSendFailed {
+                        cause: err.to_string(),
+                        failed: Arc::new(msgs),
+                    })?;
+                self.last_sent_at
+                    .store(IggyTimestamp::now().into(), ORDERING);
+            }
+            SendMode::Chunked => {
+                if !self.can_send_immediately && self.linger_time_micros > 0 {
+                    Self::wait_before_sending(
+                        self.linger_time_micros,
+                        self.last_sent_at.load(ORDERING),
+                    )
+                    .await;
+                }
 
-        if let Err(err) = self.try_send_messages(stream, topic, &part, 
chunk).await {
-            let failed_tail = msgs.split_off(index);
-            return Err(IggyError::ProducerSendFailed {
-                cause:  err.to_string(),
-                failed: Arc::new(failed_tail),
-            });
+                let max = self.batch_length.unwrap_or(MAX_BATCH_LENGTH);
+                let mut index = 0;
+                while index < msgs.len() {
+                    let end = (index + max).min(msgs.len());
+                    let chunk = &mut msgs[index..end];
+
+                    if let Err(err) = self.try_send_messages(stream, topic, 
&part, chunk).await {
+                        let failed_tail = msgs.split_off(index);
+                        return Err(IggyError::ProducerSendFailed {
+                            cause: err.to_string(),
+                            failed: Arc::new(failed_tail),
+                        });
+                    }
+                    self.last_sent_at
+                        .store(IggyTimestamp::now().into(), ORDERING);
+                    index = end;
+                }
+            }
         }
-        self.last_sent_at.store(IggyTimestamp::now().into(), ORDERING);
-        index = end;
-    }
 
-    Ok(())
-}
+        Ok(())
+    }
 
     async fn try_send_messages(
         &self,
@@ -424,11 +431,11 @@ pub(crate) async fn send_internal(
 }
 
 pub struct ErrorCtx {
-    pub cause:        String,
-    pub stream:       Arc<Identifier>,
-    pub topic:        Arc<Identifier>,
+    pub cause: String,
+    pub stream: Arc<Identifier>,
+    pub topic: Arc<Identifier>,
     pub partitioning: Option<Arc<Partitioning>>,
-    pub messages:     Arc<Vec<IggyMessage>>,
+    pub messages: Arc<Vec<IggyMessage>>,
 }
 
 pub trait ErrorCallback: Send + Sync + Debug + 'static {
@@ -446,7 +453,8 @@ impl std::fmt::Debug for LogErrorCallback {
 impl ErrorCallback for LogErrorCallback {
     fn call(&self, ctx: ErrorCtx) -> impl Future<Output = ()> + Send {
         async move {
-            let partitioning = ctx.partitioning
+            let partitioning = ctx
+                .partitioning
                 .as_ref()
                 .map(|p| format!("{:?}", p))
                 .unwrap_or_else(|| "None".to_string());
@@ -463,15 +471,12 @@ impl ErrorCallback for LogErrorCallback {
     }
 }
 
-
 unsafe impl<S: Sharding, E: ErrorCallback> Send for IggyProducer<S, E> {}
 unsafe impl<S: Sharding, E: ErrorCallback> Sync for IggyProducer<S, E> {}
 
 pub struct IggyProducer<S: Sharding, E: ErrorCallback> {
     core: Arc<ProducerCore>,
     dispatcher: Option<ProducerDispatcher<S, E>>,
-
-    _phantom: PhantomData<E>,
 }
 
 impl<S, E> IggyProducer<S, E>
@@ -527,10 +532,8 @@ where
             send_retries_interval,
         });
         let dispatcher = match background_config {
-            Some(config) => {
-                Some(ProducerDispatcher::new(core.clone(), config))
-            }
-            None => None
+            Some(config) => Some(ProducerDispatcher::new(core.clone(), 
config)),
+            None => None,
         };
 
         Self {
@@ -565,7 +568,11 @@ where
 
         match &self.dispatcher {
             Some(disp) => disp.dispatch(messages, stream_id, topic_id, 
None).await,
-            None => self.core.send_internal(&stream_id, &topic_id, messages, 
None, true).await,
+            None => {
+                self.core
+                    .send_internal(&stream_id, &topic_id, messages, None, 
SendMode::Chunked)
+                    .await
+            }
         }
     }
 
@@ -587,8 +594,15 @@ where
         let topic_id = self.core.topic_id.clone();
 
         match &self.dispatcher {
-            Some(disp) => disp.dispatch(messages, stream_id, topic_id, 
partitioning).await,
-            None => self.core.send_internal(&stream_id, &topic_id, messages, 
partitioning, true).await,
+            Some(disp) => {
+                disp.dispatch(messages, stream_id, topic_id, partitioning)
+                    .await
+            }
+            None => {
+                self.core
+                    .send_internal(&stream_id, &topic_id, messages, 
partitioning, SendMode::Chunked)
+                    .await
+            }
         }
     }
 
@@ -606,7 +620,7 @@ where
 
         // todo add send via dispatcher
         self.core
-            .send_internal(&stream, &topic, messages, partitioning, true)
+            .send_internal(&stream, &topic, messages, partitioning, 
SendMode::Chunked)
             .await
     }
 
diff --git a/core/sdk/src/clients/producer_builder.rs 
b/core/sdk/src/clients/producer_builder.rs
index 89843800..3cadd295 100644
--- a/core/sdk/src/clients/producer_builder.rs
+++ b/core/sdk/src/clients/producer_builder.rs
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use super::producer_dispatcher::{BackgroundConfig, Sharding};
 use super::MAX_BATCH_LENGTH;
 use crate::prelude::{IggyProducer, ErrorCallback};
 use iggy_binary_protocol::Client;
@@ -24,8 +25,18 @@ use iggy_common::{
 };
 use std::sync::Arc;
 
+pub struct Unset;
+pub struct Sync;
+pub struct Bg;
+
+
+
 #[derive(Debug)]
-pub struct IggyProducerBuilder {
+pub struct IggyProducerBuilder<S, E>
+where
+    S: Sharding,
+    E: ErrorCallback,
+{
     client: IggySharedMut<Box<dyn Client>>,
     stream: Identifier,
     stream_name: String,
@@ -44,10 +55,14 @@ pub struct IggyProducerBuilder {
     send_retries_interval: Option<IggyDuration>,
     topic_message_expiry: IggyExpiry,
     topic_max_size: MaxTopicSize,
-    error_callback: Option<Arc<dyn ErrorCallback>>
+    background_config: Option<BackgroundConfig<S, E>>,
 }
 
-impl IggyProducerBuilder {
+impl<S, E> IggyProducerBuilder<S, E>
+where
+    S: Sharding,
+    E: ErrorCallback,
+{
     #[allow(clippy::too_many_arguments)]
     pub(crate) fn new(
         client: IggySharedMut<Box<dyn Client>>,
@@ -77,7 +92,7 @@ impl IggyProducerBuilder {
             topic_max_size: MaxTopicSize::ServerDefault,
             send_retries_count: Some(3),
             send_retries_interval: Some(IggyDuration::ONE_SECOND),
-            error_callback: None,
+            background_config: None,
         }
     }
 
@@ -103,6 +118,10 @@ impl IggyProducerBuilder {
         }
     }
 
+    pub fn with_background_config(self) -> Self {
+
+    }
+
     /// Clears the batch size.
     pub fn without_batch_length(self) -> Self {
         Self {
@@ -127,13 +146,6 @@ impl IggyProducerBuilder {
         }
     }
 
-    pub fn error_callback(self, cb: Arc<dyn ErrorCallback>) -> Self {
-        Self {
-            error_callback: Some(cb),
-            ..self
-        }
-    }
-
     /// Sets the encryptor for encrypting the messages' payloads.
     pub fn encryptor(self, encryptor: Arc<EncryptorKind>) -> Self {
         Self {
@@ -258,8 +270,6 @@ impl IggyProducerBuilder {
             self.topic_max_size,
             self.send_retries_count,
             self.send_retries_interval,
-            self.send_mode,
-            self.error_callback,
         )
     }
 }
diff --git a/core/sdk/src/clients/producer_dispatcher.rs 
b/core/sdk/src/clients/producer_dispatcher.rs
index 818c958c..ecc19212 100644
--- a/core/sdk/src/clients/producer_dispatcher.rs
+++ b/core/sdk/src/clients/producer_dispatcher.rs
@@ -1,13 +1,18 @@
-use std::{sync::{
-    atomic::{AtomicBool, AtomicUsize, Ordering}, Arc
-}, time::Duration};
+use std::{
+    sync::{
+        Arc,
+        atomic::{AtomicBool, AtomicUsize, Ordering},
+    },
+    time::Duration,
+};
 
+use futures::FutureExt;
 use iggy_common::{
     Identifier, IggyByteSize, IggyDuration, IggyError, IggyMessage, 
Partitioning, Sizeable,
 };
 use tokio::{sync::Notify, task::JoinHandle};
 use tracing::error;
-use futures::FutureExt;
+use super::producer::{ErrorCtx, ProducerCore, SendMode};
 
 use crate::prelude::ErrorCallback;
 
@@ -25,18 +30,15 @@ pub enum BackpressureMode {
 #[derive(Debug, Clone)]
 pub struct BackgroundConfig<S: Sharding, E: ErrorCallback> {
     pub max_in_flight: usize,
-    pub in_flight_timeout: Option<IggyDuration>,
     pub batch_size: Option<usize>,
     pub batch_length: Option<usize>,
     pub failure_mode: BackpressureMode,
-    pub buffer_size: Option<IggyByteSize>,
-    pub linger_time: Option<IggyDuration>,
+    pub max_buffer_size: Option<IggyByteSize>,
+    pub linger_time: IggyDuration,
     pub error_callback: Arc<E>,
     pub sharding: Box<S>,
 }
 
-use super::{producer::{ErrorCtx, ProducerCore}};
-
 #[derive(Debug)]
 pub struct ShardMessage {
     pub stream: Arc<Identifier>,
@@ -64,7 +66,7 @@ pub struct Shard {
 }
 
 impl Shard {
-    pub fn new_(
+    pub fn new(
         core: Arc<ProducerCore>,
         global_buffer: Arc<AtomicUsize>,
         config: Arc<BackgroundConfig<impl Sharding, impl ErrorCallback>>,
@@ -75,10 +77,11 @@ impl Shard {
 
         let handle = tokio::spawn(async move {
             let mut buffer = Vec::new();
-            let mut buffer_bytes = 0usize;
+            let mut buffer_bytes = 0;
             let mut last_flush = tokio::time::Instant::now();
 
             loop {
+                let deadline = last_flush + config.linger_time.get_duration();
                 tokio::select! {
                     maybe_msg = rx.recv_async() => {
                         match maybe_msg {
@@ -99,7 +102,7 @@ impl Shard {
                             Err(_) => break,
                         }
                     }
-                    _ = 
tokio::time::sleep(config.linger_time.map_or(Duration::from_secs(u64::MAX), |d| 
d.get_duration())) => {
+                    _ = tokio::time::sleep_until(deadline) => {
                         if !buffer.is_empty() {
                             Self::flush_buffer(&core, &mut buffer, &mut 
buffer_bytes, &global_buffer, &err_sender, &notify).await;
                             last_flush = tokio::time::Instant::now();
@@ -108,6 +111,11 @@ impl Shard {
                 }
             }
         });
+
+        Self {
+            tx,
+            _handle: handle,
+        }
     }
 
     async fn flush_buffer(
@@ -121,7 +129,13 @@ impl Shard {
         for msg in buffer.drain(..) {
             let size = msg.get_size_bytes().as_bytes_usize();
             if let Err(err) = core
-                .send_internal(&msg.stream, &msg.topic, msg.messages, 
msg.partitioning.clone(), false)
+                .send_internal(
+                    &msg.stream,
+                    &msg.topic,
+                    msg.messages,
+                    msg.partitioning.clone(),
+                    SendMode::Direct,
+                )
                 .await
             {
                 if let IggyError::ProducerSendFailed { failed, cause } = &err {
@@ -146,50 +160,6 @@ impl Shard {
         *buffer_bytes = 0;
     }
 
-    pub fn new(
-        core: Arc<ProducerCore>,
-        current_buffered_size: Arc<AtomicUsize>,
-        notify: Arc<Notify>,
-        err_sender: flume::Sender<ErrorCtx>,
-    ) -> Self {
-        let (tx, rx) = flume::bounded::<ShardMessage>(10); // use from config
-        let handle = tokio::spawn(async move {
-            while let Ok(msg) = rx.recv_async().await {
-                let size = msg.get_size_bytes();
-                if let Err(err) = core
-                    .send_internal(
-                        &msg.stream,
-                        &msg.topic,
-                        msg.messages,
-                        msg.partitioning.clone(),
-                    )
-                    .await
-                {
-                    if let IggyError::ProducerSendFailed { failed, cause } = 
&err {
-                        let ctx = ErrorCtx {
-                            cause: cause.clone(),
-                            stream: msg.stream,
-                            topic: msg.topic,
-                            partitioning: msg.partitioning,
-                            messages: failed.clone(),
-                        };
-                        if err_sender.send_async(ctx).await.is_err() {
-                            tracing::warn!("error-queue receiver has been 
dropped; lost error report");
-                        }
-                    } else {
-                        tracing::error!("background send failed: {err}");
-                    }
-                }
-                current_buffered_size.fetch_sub(size.as_bytes_usize(), 
Ordering::Relaxed);
-                notify.notify_waiters();
-            }
-        });
-        Self {
-            tx,
-            _handle: handle,
-        }
-    }
-
     async fn send_with_block(&self, message: ShardMessage) -> Result<(), 
IggyError> {
         self.tx.send_async(message).await.map_err(|e| {
             error!("Failed to send_with_block: {e}");
@@ -265,7 +235,7 @@ impl Sharding for BalancedSharding {
 pub struct ProducerDispatcher<S: Sharding, E: ErrorCallback> {
     core: Arc<ProducerCore>,
     shards: Vec<Shard>,
-    current_buffered: Arc<AtomicUsize>, // todo добавить аналог, но по 
length(in-flight)
+    global_buffer: Arc<AtomicUsize>,
     notify: Arc<Notify>,
     config: Arc<BackgroundConfig<S, E>>,
     closed: AtomicBool,
@@ -277,13 +247,9 @@ where
     S: Sharding,
     E: ErrorCallback,
 {
-    pub fn new(
-        core: Arc<ProducerCore>,
-        config: BackgroundConfig<S, E>,
-    ) -> Self {
+    pub fn new(core: Arc<ProducerCore>, config: BackgroundConfig<S, E>) -> 
Self {
         let mut shards = Vec::with_capacity(config.max_in_flight);
         let current_buffered_size = Arc::new(AtomicUsize::new(0));
-        // let current_buffered_length = Arc::new(AtomicUsize::new(0));
         let notify = Arc::new(Notify::new());
         let config = Arc::new(config);
 
@@ -305,6 +271,7 @@ where
             shards.push(Shard::new(
                 core.clone(),
                 current_buffered_size.clone(),
+                config.clone(),
                 notify.clone(),
                 err_tx.clone(),
             ));
@@ -313,7 +280,7 @@ where
         Self {
             core,
             shards,
-            current_buffered: current_buffered_size,
+            global_buffer: current_buffered_size,
             config,
             notify,
             closed: AtomicBool::new(false),
@@ -340,8 +307,8 @@ where
         };
         let batch_bytes = shard_message.get_size_bytes();
 
-        let mut reserved = self.current_buffered.load(Ordering::Relaxed);
-        if let Some(buffer_size) = &self.config.buffer_size {
+        let mut reserved = self.global_buffer.load(Ordering::Relaxed);
+        if let Some(buffer_size) = &self.config.max_buffer_size {
             if batch_bytes.as_bytes_usize() > buffer_size.as_bytes_usize() {
                 return Err(IggyError::BackgroundSendBufferOverflow);
             }
@@ -368,7 +335,7 @@ where
                         }
                     };
                 }
-                match self.current_buffered.compare_exchange(
+                match self.global_buffer.compare_exchange(
                     reserved,
                     reserved + batch_bytes.as_bytes_usize(),
                     Ordering::AcqRel,
@@ -386,20 +353,9 @@ where
             &shard_message.stream,
             &shard_message.topic,
         );
-        let shard = self.shards.get(shard_ix).unwrap();
-
-        let result = match self.config.failure_mode {
-            BackpressureMode::Block => 
shard.send_with_block(shard_message).await,
-            BackpressureMode::BlockWithTimeout(t) => {
-                shard.send_with_timeout(shard_message, t).await
-            }
-            BackpressureMode::FailImmediately => 
shard.send_with_fail(shard_message),
-        };
-        if result.is_err() {
-            self.current_buffered
-                .fetch_sub(batch_bytes.as_bytes_usize(), Ordering::Relaxed);
-        }
-        result
+        debug_assert!(shard_ix < self.shards.len());
+        let shard = &self.shards[shard_ix];
+        shard.send_with_block(shard_message).await
     }
 
     pub async fn shutdown(mut self) {
@@ -414,6 +370,6 @@ where
 
         futures::future::join_all(handles).await;
         let _ = self._join_handle.await;
-        debug_assert_eq!(self.current_buffered.load(Ordering::Relaxed), 0);
+        debug_assert_eq!(self.global_buffer.load(Ordering::Relaxed), 0);
     }
 }

Reply via email to