This is an automated email from the ASF dual-hosted git repository.

bashirbekov pushed a commit to branch feat/add-background-send
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit c3069208311531049e4db5e45b93c2850195a70e
Author: haze518 <[email protected]>
AuthorDate: Thu May 29 20:45:53 2025 +0600

    del
---
 core/common/src/error/iggy_error.rs         |   2 +
 core/sdk/src/clients/mod.rs                 |   1 +
 core/sdk/src/clients/producer.rs            | 120 +++++++------
 core/sdk/src/clients/producer_dispatcher.rs | 260 ++++++++++++++++++++++++++++
 core/sdk/src/clients/send_mode.rs           |   3 +-
 5 files changed, 323 insertions(+), 63 deletions(-)

diff --git a/core/common/src/error/iggy_error.rs 
b/core/common/src/error/iggy_error.rs
index 5e812288..9fe20359 100644
--- a/core/common/src/error/iggy_error.rs
+++ b/core/common/src/error/iggy_error.rs
@@ -369,6 +369,8 @@ pub enum IggyError {
     BackgroundSendBufferFull = 4053,
     #[error("Background worker disconnected")]
     BackgroundWorkerDisconnected = 4054,
+    #[error("Background send buffer overflow")]
+    BackgroundSendBufferOverflow = 4055,
     #[error("Invalid offset: {0}")]
     InvalidOffset(u64) = 4100,
     #[error("Consumer group with ID: {0} for topic with ID: {1} was not 
found.")]
diff --git a/core/sdk/src/clients/mod.rs b/core/sdk/src/clients/mod.rs
index 7067d2c2..6f8f5f7c 100644
--- a/core/sdk/src/clients/mod.rs
+++ b/core/sdk/src/clients/mod.rs
@@ -33,6 +33,7 @@ pub mod consumer_builder;
 pub mod producer;
 pub mod producer_builder;
 pub mod send_mode;
+mod producer_dispatcher;
 
 const ORDERING: std::sync::atomic::Ordering = 
std::sync::atomic::Ordering::SeqCst;
 const MAX_BATCH_LENGTH: usize = 1000000;
diff --git a/core/sdk/src/clients/producer.rs b/core/sdk/src/clients/producer.rs
index ba2ccc94..4ede13f3 100644
--- a/core/sdk/src/clients/producer.rs
+++ b/core/sdk/src/clients/producer.rs
@@ -1,4 +1,4 @@
-use super::send_mode::{shardMessage, BackgroundConfig, BackpressureMode, 
Dispatcher, SendMode};
+use super::send_mode::{BackgroundConfig, BackpressureMode, Dispatcher, 
SendMode, shardMessage};
 /* Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -70,10 +70,11 @@ pub struct ProducerCore {
 
 impl ProducerCore {
     pub async fn init(&self) -> Result<(), IggyError> {
-        if self.initialized.compare_exchange(
-            false, true,
-            Ordering::SeqCst, Ordering::SeqCst,
-        ).is_err() {
+        if self
+            .initialized
+            .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
+            .is_err()
+        {
             return Ok(());
         }
 
@@ -191,6 +192,7 @@ impl ProducerCore {
 
         let part = self.get_partitioning(stream, topic, &msgs, partitioning)?;
 
+        // todo add batch_size or batch_length
         let max = self.batch_length.unwrap_or(MAX_BATCH_LENGTH);
 
         if !self.can_send_immediately && self.linger_time_micros > 0 {
@@ -376,7 +378,6 @@ impl ProducerCore {
     }
 }
 
-
 pub trait ErrorCallback: Send + Sync + Debug {
     fn call(&self, error: IggyError, messages: Vec<IggyMessage>);
 }
@@ -415,37 +416,37 @@ impl IggyProducer {
         error_callback: Option<Arc<dyn ErrorCallback>>,
     ) -> Self {
         let core = Arc::new(ProducerCore {
-                initialized: AtomicBool::new(true),
-                client: Arc::new(client),
-                can_send: Arc::new(AtomicBool::new(true)),
-                stream_id: Arc::new(stream),
-                stream_name,
-                topic_id: Arc::new(topic),
-                topic_name,
-                batch_length,
-                partitioning: partitioning.map(Arc::new),
-                encryptor,
-                partitioner,
-                linger_time_micros: linger_time.map_or(0, |i| i.as_micros()),
-                create_stream_if_not_exists,
-                create_topic_if_not_exists,
-                topic_partitions_count,
-                topic_replication_factor,
-                topic_message_expiry,
-                topic_max_size,
-                default_partitioning: Arc::new(Partitioning::balanced()),
-                can_send_immediately: linger_time.is_none(),
-                last_sent_at: Arc::new(AtomicU64::new(0)),
-                send_retries_count,
-                send_retries_interval,
-                _join_handle: None,
-                sema: Arc::new(Semaphore::new(10)),
-                sender: None,
-                error_callback,
-                shard_number: default_shard_count(),
-            });
+            initialized: AtomicBool::new(true),
+            client: Arc::new(client),
+            can_send: Arc::new(AtomicBool::new(true)),
+            stream_id: Arc::new(stream),
+            stream_name,
+            topic_id: Arc::new(topic),
+            topic_name,
+            batch_length,
+            partitioning: partitioning.map(Arc::new),
+            encryptor,
+            partitioner,
+            linger_time_micros: linger_time.map_or(0, |i| i.as_micros()),
+            create_stream_if_not_exists,
+            create_topic_if_not_exists,
+            topic_partitions_count,
+            topic_replication_factor,
+            topic_message_expiry,
+            topic_max_size,
+            default_partitioning: Arc::new(Partitioning::balanced()),
+            can_send_immediately: linger_time.is_none(),
+            last_sent_at: Arc::new(AtomicU64::new(0)),
+            send_retries_count,
+            send_retries_interval,
+            _join_handle: None,
+            sema: Arc::new(Semaphore::new(10)),
+            sender: None,
+            error_callback,
+            shard_number: default_shard_count(),
+        });
         let num_shards = default_shard_count(); // todo потом заменмить на 
норм значепние
-        let config = BackgroundConfig{
+        let config = BackgroundConfig {
             max_in_flight: 4,
             in_flight_timeout: None,
             batch_size: None,
@@ -489,24 +490,23 @@ impl IggyProducer {
 
         match &self.send_mode {
             SendMode::Sync => {
-                self.core.send_internal(&stream_id, &topic_id, messages, 
None).await
-
-            }
-            SendMode::Background => {
-                match &self.dispatcher {
-                    Some(disp) => {
-                        disp.dispatch(shardMessage{
-                            messages,
-                            stream: stream_id,
-                            topic: topic_id,
-                            partitioning: None,
-                        }).await.map_err(|err| IggyError::BackgroundSendError)
-                    }
-                    None => Ok(())
-                }
+                self.core
+                    .send_internal(&stream_id, &topic_id, messages, None)
+                    .await
             }
+            SendMode::Background => match &self.dispatcher {
+                Some(disp) => disp
+                    .dispatch(shardMessage {
+                        messages,
+                        stream: stream_id,
+                        topic: topic_id,
+                        partitioning: None,
+                    })
+                    .await
+                    .map_err(|err| IggyError::BackgroundSendError),
+                None => Ok(()),
+            },
         }
-
     }
 
     pub async fn send_one(&self, message: IggyMessage) -> Result<(), 
IggyError> {
@@ -526,7 +526,9 @@ impl IggyProducer {
         let stream_id = &self.core.stream_id;
         let topic_id = &self.core.topic_id;
 
-        self.core.send_internal(stream_id, topic_id, messages, 
partitioning).await
+        self.core
+            .send_internal(stream_id, topic_id, messages, partitioning)
+            .await
     }
 
     pub async fn send_to(
@@ -541,7 +543,9 @@ impl IggyProducer {
             return Ok(());
         }
 
-        self.core.send_internal(&stream, &topic, messages, partitioning).await
+        self.core
+            .send_internal(&stream, &topic, messages, partitioning)
+            .await
     }
 }
 
@@ -549,11 +553,3 @@ fn default_shard_count() -> usize {
     let cpus = num_cpus::get();
     cpus.clamp(2, 16)
 }
-
-fn default_sharder(msg: &IggyMessage) -> &[u8] {
-    if let Some(h) = &msg.user_headers {
-        return &h[..h.len().min(16)];
-    }
-
-    &msg.payload[..msg.payload.len().min(16)]
-}
diff --git a/core/sdk/src/clients/producer_dispatcher.rs 
b/core/sdk/src/clients/producer_dispatcher.rs
new file mode 100644
index 00000000..2f6a25d7
--- /dev/null
+++ b/core/sdk/src/clients/producer_dispatcher.rs
@@ -0,0 +1,260 @@
+use std::sync::{
+    Arc,
+    atomic::{AtomicUsize, Ordering},
+};
+
+use iggy_common::{
+    Identifier, IggyByteSize, IggyDuration, IggyError, IggyMessage, 
Partitioning, Sizeable,
+};
+use tokio::{sync::Notify, task::JoinHandle};
+use tracing::error;
+
+use super::{producer::ProducerCore, send_mode::BackgroundConfig};
+
+#[derive(Debug)]
+pub struct ShardMessage {
+    pub stream: Arc<Identifier>,
+    pub topic: Arc<Identifier>,
+    pub messages: Vec<IggyMessage>,
+    pub partitioning: Option<Arc<Partitioning>>,
+}
+
+impl Sizeable for ShardMessage {
+    fn get_size_bytes(&self) -> IggyByteSize {
+        let mut total = IggyByteSize::new(0);
+
+        total += self.stream.get_size_bytes();
+        total += self.topic.get_size_bytes();
+        for msg in &self.messages {
+            total += msg.get_size_bytes();
+        }
+        total
+    }
+}
+
+pub struct Shard {
+    tx: flume::Sender<ShardMessage>,
+    _handle: JoinHandle<()>,
+}
+
+impl Shard {
+    pub fn new(
+        core: Arc<ProducerCore>,
+        current_buffered: Arc<AtomicUsize>,
+        notify: Arc<Notify>,
+    ) -> Self {
+        let (tx, rx) = flume::bounded::<ShardMessage>(10); // use from config
+        let handle = tokio::spawn(async move {
+            while let Ok(msg) = rx.recv_async().await {
+                let size = msg.get_size_bytes();
+                if let Err(e) = core
+                    .send_internal(&msg.stream, &msg.topic, msg.messages, 
msg.partitioning)
+                    .await
+                {
+                    // send to err chan
+                    // error!("{:?}", e);
+                }
+                current_buffered.fetch_sub(size.as_bytes_usize(), 
Ordering::Relaxed);
+                notify.notify_waiters();
+            }
+        });
+        Self {
+            tx,
+            _handle: handle,
+        }
+    }
+
+    async fn send_with_block(&self, message: ShardMessage) -> Result<(), 
IggyError> {
+        self.tx.send_async(message).await.map_err(|e| {
+            error!("Failed to send_with_block: {e}");
+            IggyError::BackgroundSendError
+        })
+    }
+
+    async fn send_with_timeout(
+        &self,
+        message: ShardMessage,
+        timeout: IggyDuration,
+    ) -> Result<(), IggyError> {
+        match tokio::time::timeout(timeout.get_duration(), 
self.tx.send_async(message)).await {
+            Ok(Ok(())) => Ok(()),
+            Ok(Err(e)) => {
+                error!("Channel send failed during timeout: {e}");
+                Err(IggyError::BackgroundSendTimeout)
+            }
+            Err(_) => {
+                error!("Timeout elapsed before sending message batch");
+                Err(IggyError::BackgroundSendTimeout)
+            }
+        }
+    }
+
+    fn send_with_fail(&self, message: ShardMessage) -> Result<(), IggyError> {
+        self.tx.try_send(message).map_err(|_| {
+            error!("Channel is full, dropping message batch");
+            IggyError::BackgroundSendError
+        })
+    }
+}
+
+pub trait Sharding {
+    fn pick_shard(
+        &self,
+        shards: &[Shard],
+        messages: &[IggyMessage],
+        stream: &Identifier,
+        topic: &Identifier,
+    ) -> usize;
+}
+
+pub struct RoundRobinSharding {
+    counter: AtomicUsize,
+}
+
+impl Default for RoundRobinSharding {
+    fn default() -> Self {
+        Self {
+            counter: AtomicUsize::new(0),
+        }
+    }
+}
+
+impl Sharding for RoundRobinSharding {
+    fn pick_shard(
+        &self,
+        shards: &[Shard],
+        _: &[IggyMessage],
+        _: &Identifier,
+        _: &Identifier,
+    ) -> usize {
+        self.counter.fetch_add(1, Ordering::Relaxed) % shards.len()
+    }
+}
+
+pub enum Backpressure {
+    /// Block until the send succeeds
+    Block,
+    /// Block with a timeout, after which the send fails
+    BlockWithTimeout(IggyDuration),
+    /// Fail immediately without retrying
+    FailImmediately,
+}
+
+struct ProducerDispatcher<S: Sharding> {
+    core: Arc<ProducerCore>,
+    backpressure: Backpressure,
+    sharding: S,
+    shards: Vec<Shard>,
+    current_buffered: Arc<AtomicUsize>,
+    notify: Arc<Notify>,
+    config: Arc<BackgroundConfig>,
+}
+
+impl<S> ProducerDispatcher<S>
+where
+    S: Sharding,
+{
+    pub fn new(
+        core: Arc<ProducerCore>,
+        backpressure: Backpressure,
+        config: Arc<BackgroundConfig>,
+        sharding: S,
+    ) -> Self {
+        let mut shards = Vec::with_capacity(config.max_in_flight);
+        let current_buffered = Arc::new(AtomicUsize::new(0));
+        let notify = Arc::new(Notify::new());
+
+        for _ in 0..config.max_in_flight {
+            shards.push(Shard::new(
+                core.clone(),
+                current_buffered.clone(),
+                notify.clone(),
+            ));
+        }
+
+        Self {
+            core,
+            backpressure,
+            sharding,
+            shards,
+            current_buffered,
+            config,
+            notify,
+        }
+    }
+
+    pub async fn dispatch(
+        &self,
+        messages: Vec<IggyMessage>,
+        stream: Arc<Identifier>,
+        topic: Arc<Identifier>,
+        partitioning: Option<Arc<Partitioning>>,
+    ) -> Result<(), IggyError> {
+        let shard_message = ShardMessage {
+            messages,
+            stream,
+            topic,
+            partitioning,
+        };
+        let batch_bytes = shard_message.get_size_bytes();
+
+        let mut reserved = self.current_buffered.load(Ordering::Relaxed);
+        if let Some(buffer_size) = &self.config.buffer_size {
+            if batch_bytes.as_bytes_usize() > buffer_size.as_bytes_usize() {
+                return Err(IggyError::BackgroundSendBufferOverflow);
+            }
+            loop {
+                if buffer_size.as_bytes_usize() != 0
+                    && reserved + batch_bytes.as_bytes_usize() > 
buffer_size.as_bytes_usize()
+                {
+                    match self.backpressure {
+                        Backpressure::Block => {
+                            self.notify.notified().await;
+                            continue;
+                        }
+                        Backpressure::BlockWithTimeout(t) => {
+                            if tokio::time::timeout(t.get_duration(), 
self.notify.notified())
+                                .await
+                                .is_err()
+                            {
+                                return Err(IggyError::BackgroundSendTimeout);
+                            }
+                            continue;
+                        }
+                        Backpressure::FailImmediately => {
+                            return 
Err(IggyError::BackgroundSendBufferOverflow);
+                        }
+                    };
+                }
+                match self.current_buffered.compare_exchange(
+                    reserved,
+                    reserved + batch_bytes.as_bytes_usize(),
+                    Ordering::AcqRel,
+                    Ordering::Acquire,
+                ) {
+                    Ok(_) => break,
+                    Err(v) => reserved = v,
+                }
+            }
+        }
+
+        let shard_ix = self.sharding.pick_shard(
+            &self.shards,
+            &shard_message.messages,
+            &shard_message.stream,
+            &shard_message.topic,
+        );
+        let shard = self.shards.get(shard_ix).unwrap();
+
+        let result = match self.backpressure {
+            Backpressure::Block => shard.send_with_block(shard_message).await,
+            Backpressure::BlockWithTimeout(t) => 
shard.send_with_timeout(shard_message, t).await,
+            Backpressure::FailImmediately => 
shard.send_with_fail(shard_message),
+        };
+        if result.is_err() {
+            self.current_buffered
+                .fetch_sub(batch_bytes.as_bytes_usize(), Ordering::Relaxed);
+        }
+        result
+    }
+}
diff --git a/core/sdk/src/clients/send_mode.rs 
b/core/sdk/src/clients/send_mode.rs
index 6ed07d54..63145db3 100644
--- a/core/sdk/src/clients/send_mode.rs
+++ b/core/sdk/src/clients/send_mode.rs
@@ -1,6 +1,6 @@
 use std::sync::{atomic::{AtomicUsize, Ordering}, Arc};
 
-use iggy_common::{Identifier, IggyDuration, IggyError, IggyMessage, 
Partitioning};
+use iggy_common::{Identifier, IggyByteSize, IggyDuration, IggyError, 
IggyMessage, Partitioning};
 use tokio::task::JoinHandle;
 use tracing::error;
 
@@ -30,6 +30,7 @@ pub struct BackgroundConfig {
     pub in_flight_timeout: Option<IggyDuration>,
     pub batch_size: Option<usize>,
     pub failure_mode: BackpressureMode,
+    pub buffer_size: Option<IggyByteSize>, // rename: maximum_buffer_size
 }
 
 pub struct shardMessage {

Reply via email to