This is an automated email from the ASF dual-hosted git repository. bashirbekov pushed a commit to branch feat/add-background-send in repository https://gitbox.apache.org/repos/asf/iggy.git
commit c3069208311531049e4db5e45b93c2850195a70e Author: haze518 <[email protected]> AuthorDate: Thu May 29 20:45:53 2025 +0600 del --- core/common/src/error/iggy_error.rs | 2 + core/sdk/src/clients/mod.rs | 1 + core/sdk/src/clients/producer.rs | 120 +++++++------ core/sdk/src/clients/producer_dispatcher.rs | 260 ++++++++++++++++++++++++++++ core/sdk/src/clients/send_mode.rs | 3 +- 5 files changed, 323 insertions(+), 63 deletions(-) diff --git a/core/common/src/error/iggy_error.rs b/core/common/src/error/iggy_error.rs index 5e812288..9fe20359 100644 --- a/core/common/src/error/iggy_error.rs +++ b/core/common/src/error/iggy_error.rs @@ -369,6 +369,8 @@ pub enum IggyError { BackgroundSendBufferFull = 4053, #[error("Background worker disconnected")] BackgroundWorkerDisconnected = 4054, + #[error("Background send buffer overflow")] + BackgroundSendBufferOverflow = 4055, #[error("Invalid offset: {0}")] InvalidOffset(u64) = 4100, #[error("Consumer group with ID: {0} for topic with ID: {1} was not found.")] diff --git a/core/sdk/src/clients/mod.rs b/core/sdk/src/clients/mod.rs index 7067d2c2..6f8f5f7c 100644 --- a/core/sdk/src/clients/mod.rs +++ b/core/sdk/src/clients/mod.rs @@ -33,6 +33,7 @@ pub mod consumer_builder; pub mod producer; pub mod producer_builder; pub mod send_mode; +mod producer_dispatcher; const ORDERING: std::sync::atomic::Ordering = std::sync::atomic::Ordering::SeqCst; const MAX_BATCH_LENGTH: usize = 1000000; diff --git a/core/sdk/src/clients/producer.rs b/core/sdk/src/clients/producer.rs index ba2ccc94..4ede13f3 100644 --- a/core/sdk/src/clients/producer.rs +++ b/core/sdk/src/clients/producer.rs @@ -1,4 +1,4 @@ -use super::send_mode::{shardMessage, BackgroundConfig, BackpressureMode, Dispatcher, SendMode}; +use super::send_mode::{BackgroundConfig, BackpressureMode, Dispatcher, SendMode, shardMessage}; /* Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -70,10 +70,11 @@ pub struct ProducerCore { impl ProducerCore { pub async fn init(&self) -> Result<(), IggyError> { - if self.initialized.compare_exchange( - false, true, - Ordering::SeqCst, Ordering::SeqCst, - ).is_err() { + if self + .initialized + .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst) + .is_err() + { return Ok(()); } @@ -191,6 +192,7 @@ impl ProducerCore { let part = self.get_partitioning(stream, topic, &msgs, partitioning)?; + // todo add batch_size or batch_length let max = self.batch_length.unwrap_or(MAX_BATCH_LENGTH); if !self.can_send_immediately && self.linger_time_micros > 0 { @@ -376,7 +378,6 @@ impl ProducerCore { } } - pub trait ErrorCallback: Send + Sync + Debug { fn call(&self, error: IggyError, messages: Vec<IggyMessage>); } @@ -415,37 +416,37 @@ impl IggyProducer { error_callback: Option<Arc<dyn ErrorCallback>>, ) -> Self { let core = Arc::new(ProducerCore { - initialized: AtomicBool::new(true), - client: Arc::new(client), - can_send: Arc::new(AtomicBool::new(true)), - stream_id: Arc::new(stream), - stream_name, - topic_id: Arc::new(topic), - topic_name, - batch_length, - partitioning: partitioning.map(Arc::new), - encryptor, - partitioner, - linger_time_micros: linger_time.map_or(0, |i| i.as_micros()), - create_stream_if_not_exists, - create_topic_if_not_exists, - topic_partitions_count, - topic_replication_factor, - topic_message_expiry, - topic_max_size, - default_partitioning: Arc::new(Partitioning::balanced()), - can_send_immediately: linger_time.is_none(), - last_sent_at: Arc::new(AtomicU64::new(0)), - send_retries_count, - send_retries_interval, - _join_handle: None, - sema: Arc::new(Semaphore::new(10)), - sender: None, - error_callback, - shard_number: default_shard_count(), - }); + initialized: AtomicBool::new(true), + client: Arc::new(client), + can_send: Arc::new(AtomicBool::new(true)), + stream_id: Arc::new(stream), + stream_name, + topic_id: Arc::new(topic), + topic_name, + batch_length, + partitioning: partitioning.map(Arc::new), + encryptor, + partitioner, + linger_time_micros: linger_time.map_or(0, |i| i.as_micros()), + create_stream_if_not_exists, + create_topic_if_not_exists, + topic_partitions_count, + topic_replication_factor, + topic_message_expiry, + topic_max_size, + default_partitioning: Arc::new(Partitioning::balanced()), + can_send_immediately: linger_time.is_none(), + last_sent_at: Arc::new(AtomicU64::new(0)), + send_retries_count, + send_retries_interval, + _join_handle: None, + sema: Arc::new(Semaphore::new(10)), + sender: None, + error_callback, + shard_number: default_shard_count(), + }); let num_shards = default_shard_count(); // todo потом заменмить на норм значепние - let config = BackgroundConfig{ + let config = BackgroundConfig { max_in_flight: 4, in_flight_timeout: None, batch_size: None, @@ -489,24 +490,23 @@ impl IggyProducer { match &self.send_mode { SendMode::Sync => { - self.core.send_internal(&stream_id, &topic_id, messages, None).await - - } - SendMode::Background => { - match &self.dispatcher { - Some(disp) => { - disp.dispatch(shardMessage{ - messages, - stream: stream_id, - topic: topic_id, - partitioning: None, - }).await.map_err(|err| IggyError::BackgroundSendError) - } - None => Ok(()) - } + self.core + .send_internal(&stream_id, &topic_id, messages, None) + .await } + SendMode::Background => match &self.dispatcher { + Some(disp) => disp + .dispatch(shardMessage { + messages, + stream: stream_id, + topic: topic_id, + partitioning: None, + }) + .await + .map_err(|err| IggyError::BackgroundSendError), + None => Ok(()), + }, } - } pub async fn send_one(&self, message: IggyMessage) -> Result<(), IggyError> { @@ -526,7 +526,9 @@ impl IggyProducer { let stream_id = &self.core.stream_id; let topic_id = &self.core.topic_id; - self.core.send_internal(stream_id, topic_id, messages, partitioning).await + self.core + .send_internal(stream_id, topic_id, messages, partitioning) + .await } pub async fn send_to( @@ -541,7 +543,9 @@ impl IggyProducer { return Ok(()); } - self.core.send_internal(&stream, &topic, messages, partitioning).await + self.core + .send_internal(&stream, &topic, messages, partitioning) + .await } } @@ -549,11 +553,3 @@ fn default_shard_count() -> usize { let cpus = num_cpus::get(); cpus.clamp(2, 16) } - -fn default_sharder(msg: &IggyMessage) -> &[u8] { - if let Some(h) = &msg.user_headers { - return &h[..h.len().min(16)]; - } - - &msg.payload[..msg.payload.len().min(16)] -} diff --git a/core/sdk/src/clients/producer_dispatcher.rs b/core/sdk/src/clients/producer_dispatcher.rs new file mode 100644 index 00000000..2f6a25d7 --- /dev/null +++ b/core/sdk/src/clients/producer_dispatcher.rs @@ -0,0 +1,260 @@ +use std::sync::{ + Arc, + atomic::{AtomicUsize, Ordering}, +}; + +use iggy_common::{ + Identifier, IggyByteSize, IggyDuration, IggyError, IggyMessage, Partitioning, Sizeable, +}; +use tokio::{sync::Notify, task::JoinHandle}; +use tracing::error; + +use super::{producer::ProducerCore, send_mode::BackgroundConfig}; + +#[derive(Debug)] +pub struct ShardMessage { + pub stream: Arc<Identifier>, + pub topic: Arc<Identifier>, + pub messages: Vec<IggyMessage>, + pub partitioning: Option<Arc<Partitioning>>, +} + +impl Sizeable for ShardMessage { + fn get_size_bytes(&self) -> IggyByteSize { + let mut total = IggyByteSize::new(0); + + total += self.stream.get_size_bytes(); + total += self.topic.get_size_bytes(); + for msg in &self.messages { + total += msg.get_size_bytes(); + } + total + } +} + +pub struct Shard { + tx: flume::Sender<ShardMessage>, + _handle: JoinHandle<()>, +} + +impl Shard { + pub fn new( + core: Arc<ProducerCore>, + current_buffered: Arc<AtomicUsize>, + notify: Arc<Notify>, + ) -> Self { + let (tx, rx) = flume::bounded::<ShardMessage>(10); // use from config + let handle = tokio::spawn(async move { + while let Ok(msg) = rx.recv_async().await { + let size = msg.get_size_bytes(); + if let Err(e) = core + .send_internal(&msg.stream, &msg.topic, msg.messages, msg.partitioning) + .await + { + // send to err chan + // error!("{:?}", e); + } + current_buffered.fetch_sub(size.as_bytes_usize(), Ordering::Relaxed); + notify.notify_waiters(); + } + }); + Self { + tx, + _handle: handle, + } + } + + async fn send_with_block(&self, message: ShardMessage) -> Result<(), IggyError> { + self.tx.send_async(message).await.map_err(|e| { + error!("Failed to send_with_block: {e}"); + IggyError::BackgroundSendError + }) + } + + async fn send_with_timeout( + &self, + message: ShardMessage, + timeout: IggyDuration, + ) -> Result<(), IggyError> { + match tokio::time::timeout(timeout.get_duration(), self.tx.send_async(message)).await { + Ok(Ok(())) => Ok(()), + Ok(Err(e)) => { + error!("Channel send failed during timeout: {e}"); + Err(IggyError::BackgroundSendTimeout) + } + Err(_) => { + error!("Timeout elapsed before sending message batch"); + Err(IggyError::BackgroundSendTimeout) + } + } + } + + fn send_with_fail(&self, message: ShardMessage) -> Result<(), IggyError> { + self.tx.try_send(message).map_err(|_| { + error!("Channel is full, dropping message batch"); + IggyError::BackgroundSendError + }) + } +} + +pub trait Sharding { + fn pick_shard( + &self, + shards: &[Shard], + messages: &[IggyMessage], + stream: &Identifier, + topic: &Identifier, + ) -> usize; +} + +pub struct RoundRobinSharding { + counter: AtomicUsize, +} + +impl Default for RoundRobinSharding { + fn default() -> Self { + Self { + counter: AtomicUsize::new(0), + } + } +} + +impl Sharding for RoundRobinSharding { + fn pick_shard( + &self, + shards: &[Shard], + _: &[IggyMessage], + _: &Identifier, + _: &Identifier, + ) -> usize { + self.counter.fetch_add(1, Ordering::Relaxed) % shards.len() + } +} + +pub enum Backpressure { + /// Block until the send succeeds + Block, + /// Block with a timeout, after which the send fails + BlockWithTimeout(IggyDuration), + /// Fail immediately without retrying + FailImmediately, +} + +struct ProducerDispatcher<S: Sharding> { + core: Arc<ProducerCore>, + backpressure: Backpressure, + sharding: S, + shards: Vec<Shard>, + current_buffered: Arc<AtomicUsize>, + notify: Arc<Notify>, + config: Arc<BackgroundConfig>, +} + +impl<S> ProducerDispatcher<S> +where + S: Sharding, +{ + pub fn new( + core: Arc<ProducerCore>, + backpressure: Backpressure, + config: Arc<BackgroundConfig>, + sharding: S, + ) -> Self { + let mut shards = Vec::with_capacity(config.max_in_flight); + let current_buffered = Arc::new(AtomicUsize::new(0)); + let notify = Arc::new(Notify::new()); + + for _ in 0..config.max_in_flight { + shards.push(Shard::new( + core.clone(), + current_buffered.clone(), + notify.clone(), + )); + } + + Self { + core, + backpressure, + sharding, + shards, + current_buffered, + config, + notify, + } + } + + pub async fn dispatch( + &self, + messages: Vec<IggyMessage>, + stream: Arc<Identifier>, + topic: Arc<Identifier>, + partitioning: Option<Arc<Partitioning>>, + ) -> Result<(), IggyError> { + let shard_message = ShardMessage { + messages, + stream, + topic, + partitioning, + }; + let batch_bytes = shard_message.get_size_bytes(); + + let mut reserved = self.current_buffered.load(Ordering::Relaxed); + if let Some(buffer_size) = &self.config.buffer_size { + if batch_bytes.as_bytes_usize() > buffer_size.as_bytes_usize() { + return Err(IggyError::BackgroundSendBufferOverflow); + } + loop { + if buffer_size.as_bytes_usize() != 0 + && reserved + batch_bytes.as_bytes_usize() > buffer_size.as_bytes_usize() + { + match self.backpressure { + Backpressure::Block => { + self.notify.notified().await; + continue; + } + Backpressure::BlockWithTimeout(t) => { + if tokio::time::timeout(t.get_duration(), self.notify.notified()) + .await + .is_err() + { + return Err(IggyError::BackgroundSendTimeout); + } + continue; + } + Backpressure::FailImmediately => { + return Err(IggyError::BackgroundSendBufferOverflow); + } + }; + } + match self.current_buffered.compare_exchange( + reserved, + reserved + batch_bytes.as_bytes_usize(), + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => break, + Err(v) => reserved = v, + } + } + } + + let shard_ix = self.sharding.pick_shard( + &self.shards, + &shard_message.messages, + &shard_message.stream, + &shard_message.topic, + ); + let shard = self.shards.get(shard_ix).unwrap(); + + let result = match self.backpressure { + Backpressure::Block => shard.send_with_block(shard_message).await, + Backpressure::BlockWithTimeout(t) => shard.send_with_timeout(shard_message, t).await, + Backpressure::FailImmediately => shard.send_with_fail(shard_message), + }; + if result.is_err() { + self.current_buffered + .fetch_sub(batch_bytes.as_bytes_usize(), Ordering::Relaxed); + } + result + } +} diff --git a/core/sdk/src/clients/send_mode.rs b/core/sdk/src/clients/send_mode.rs index 6ed07d54..63145db3 100644 --- a/core/sdk/src/clients/send_mode.rs +++ b/core/sdk/src/clients/send_mode.rs @@ -1,6 +1,6 @@ use std::sync::{atomic::{AtomicUsize, Ordering}, Arc}; -use iggy_common::{Identifier, IggyDuration, IggyError, IggyMessage, Partitioning}; +use iggy_common::{Identifier, IggyByteSize, IggyDuration, IggyError, IggyMessage, Partitioning}; use tokio::task::JoinHandle; use tracing::error; @@ -30,6 +30,7 @@ pub struct BackgroundConfig { pub in_flight_timeout: Option<IggyDuration>, pub batch_size: Option<usize>, pub failure_mode: BackpressureMode, + pub buffer_size: Option<IggyByteSize>, // rename: maximum_buffer_size } pub struct shardMessage {
