This is an automated email from the ASF dual-hosted git repository. bashirbekov pushed a commit to branch feat/add-background-send in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 41e924464df29c9e0470403b226578261b7e65b4 Author: haze518 <[email protected]> AuthorDate: Thu Jun 5 14:01:00 2025 +0600 add background send option --- Cargo.lock | 2 + Cargo.toml | 4 - core/common/src/error/iggy_error.rs | 21 +- core/common/src/types/message/iggy_message.rs | 2 +- core/connectors/runtime/src/configs.rs | 4 +- core/connectors/runtime/src/sink.rs | 2 +- core/connectors/runtime/src/source.rs | 11 +- core/examples/src/multi-tenant/producer/main.rs | 8 +- core/examples/src/new-sdk/producer/main.rs | 9 +- core/examples/src/sink-data-producer/main.rs | 8 +- core/sdk/Cargo.toml | 4 + core/sdk/src/clients/mod.rs | 6 +- core/sdk/src/clients/producer.rs | 523 +++++++++++---------- core/sdk/src/clients/producer_builder.rs | 299 ++++++++++-- core/sdk/src/clients/producer_config.rs | 53 +++ core/sdk/src/clients/producer_dispatcher.rs | 447 ++++++++++++++++++ core/sdk/src/clients/producer_error_callback.rs | 67 +++ core/sdk/src/clients/producer_sharding.rs | 437 +++++++++++++++++ .../stream_builder/build/build_iggy_producer.rs | 7 +- 19 files changed, 1601 insertions(+), 313 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b1556a42..8ef9cd57 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3591,6 +3591,8 @@ dependencies = [ "futures-util", "iggy_binary_protocol", "iggy_common", + "mockall", + "num_cpus", "quinn", "reqwest", "reqwest-middleware", diff --git a/Cargo.toml b/Cargo.toml index 1f2053f7..d9ea29f0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -80,7 +80,6 @@ comfy-table = "7.1.4" crc32fast = "1.4.2" crossbeam = "0.8.4" dashmap = "6.1.0" -derive_builder = "0.20.2" derive_more = { version = "2.0.1", features = ["full"] } derive-new = "0.7.0" dirs = "6.0.0" @@ -171,7 +170,4 @@ iggy = { path = "core/sdk", version = "0.7.0" } server = { path = "core/server" } integration = { path = "core/integration" } bench-report = { path = "core/bench/report" } -bench-runner = { path = "core/bench/runner" } -bench-dashboard-frontend = { path = "core/bench/dashboard/frontend" } -bench-dashboard-server = { path = "core/bench/dashboard/server" } bench-dashboard-shared = { path = "core/bench/dashboard/shared" } diff --git a/core/common/src/error/iggy_error.rs b/core/common/src/error/iggy_error.rs index b293270b..45917460 100644 --- a/core/common/src/error/iggy_error.rs +++ b/core/common/src/error/iggy_error.rs @@ -16,8 +16,10 @@ * under the License. */ -use crate::utils::byte_size::IggyByteSize; +use std::sync::Arc; + use crate::utils::topic_size::MaxTopicSize; +use crate::{IggyMessage, utils::byte_size::IggyByteSize}; use strum::{EnumDiscriminants, FromRepr, IntoStaticStr}; use thiserror::Error; @@ -363,6 +365,23 @@ pub enum IggyError { TooSmallMessage(u32, u32) = 4037, #[error("Cannot sed messages due to client disconnection")] CannotSendMessagesDueToClientDisconnection = 4050, + #[error("Background send error")] + BackgroundSendError = 4051, + #[error("Background send timeout")] + BackgroundSendTimeout = 4052, + #[error("Background send buffer is full")] + BackgroundSendBufferFull = 4053, + #[error("Background worker disconnected")] + BackgroundWorkerDisconnected = 4054, + #[error("Background send buffer overflow")] + BackgroundSendBufferOverflow = 4055, + #[error("Producer send failed")] + ProducerSendFailed { + cause: String, + failed: Arc<Vec<IggyMessage>>, + } = 4056, + #[error("Producer closed")] + ProducerClosed = 4057, #[error("Invalid offset: {0}")] InvalidOffset(u64) = 4100, #[error("Consumer group with ID: {0} for topic with ID: {1} was not found.")] diff --git a/core/common/src/types/message/iggy_message.rs b/core/common/src/types/message/iggy_message.rs index 54c3a841..ddd4076f 100644 --- a/core/common/src/types/message/iggy_message.rs +++ b/core/common/src/types/message/iggy_message.rs @@ -105,7 +105,7 @@ pub const MAX_USER_HEADERS_SIZE: u32 = 100 * 1000; /// .build() /// .unwrap(); /// ``` -#[derive(Debug, PartialEq)] +#[derive(Debug, PartialEq, Default)] pub struct IggyMessage { /// Message metadata pub header: IggyMessageHeader, diff --git a/core/connectors/runtime/src/configs.rs b/core/connectors/runtime/src/configs.rs index aa6d8fa8..aafc9a48 100644 --- a/core/connectors/runtime/src/configs.rs +++ b/core/connectors/runtime/src/configs.rs @@ -60,8 +60,8 @@ pub struct StreamProducerConfig { pub stream: String, pub topic: String, pub schema: Schema, - pub batch_size: Option<u32>, - pub send_interval: Option<String>, + pub batch_length: Option<u32>, + pub linger_time: Option<String>, } #[derive(Debug, Serialize, Deserialize)] diff --git a/core/connectors/runtime/src/sink.rs b/core/connectors/runtime/src/sink.rs index 17b014eb..78008011 100644 --- a/core/connectors/runtime/src/sink.rs +++ b/core/connectors/runtime/src/sink.rs @@ -125,7 +125,7 @@ pub async fn init( .auto_join_consumer_group() .polling_strategy(PollingStrategy::next()) .poll_interval(poll_interval) - .batch_size(batch_size) + .batch_length(batch_size) .build(); consumer.init().await?; diff --git a/core/connectors/runtime/src/source.rs b/core/connectors/runtime/src/source.rs index 9cde82ca..ac903a6c 100644 --- a/core/connectors/runtime/src/source.rs +++ b/core/connectors/runtime/src/source.rs @@ -112,14 +112,13 @@ pub async fn init( .expect("Failed to get source plugin"); for stream in config.streams { - let send_interval = - IggyDuration::from_str(&stream.send_interval.unwrap_or("5ms".to_owned())) + let linger_time = + IggyDuration::from_str(&stream.linger_time.unwrap_or("5ms".to_owned())) .expect("Invalid send interval"); - let batch_size = stream.batch_size.unwrap_or(1000); - let mut producer = iggy_client + let batch_length = stream.batch_length.unwrap_or(1000); + let producer = iggy_client .producer(&stream.stream, &stream.topic)? - .send_interval(send_interval) - .batch_size(batch_size) + .sync(|b| b.batch_length(batch_length).linger_time(linger_time)) .build(); producer.init().await?; diff --git a/core/examples/src/multi-tenant/producer/main.rs b/core/examples/src/multi-tenant/producer/main.rs index 28dc9759..5867cbb8 100644 --- a/core/examples/src/multi-tenant/producer/main.rs +++ b/core/examples/src/multi-tenant/producer/main.rs @@ -260,10 +260,12 @@ async fn create_producers( let mut producers = Vec::new(); for topic in topics { for id in 1..=producers_count { - let mut producer = client + let producer = client .producer(stream, topic)? - .batch_length(batch_length) - .linger_time(IggyDuration::from_str(interval).expect("Invalid duration")) + .sync(|b| { + b.batch_length(batch_length) + .linger_time(IggyDuration::from_str(interval).expect("Invalid duration")) + }) .partitioning(Partitioning::balanced()) .create_topic_if_not_exists( partitions_count, diff --git a/core/examples/src/new-sdk/producer/main.rs b/core/examples/src/new-sdk/producer/main.rs index 52f60e7c..c30f3e4d 100644 --- a/core/examples/src/new-sdk/producer/main.rs +++ b/core/examples/src/new-sdk/producer/main.rs @@ -43,10 +43,13 @@ async fn main() -> anyhow::Result<(), Box<dyn Error>> { let client = client_provider::get_raw_client(client_provider_config, false).await?; let client = IggyClient::builder().with_client(client).build()?; client.connect().await?; - let mut producer = client + let interval = IggyDuration::from_str(&args.interval)?; + let producer = client .producer(&args.stream_id, &args.topic_id)? - .batch_length(args.messages_per_batch) - .linger_time(IggyDuration::from_str(&args.interval)?) + .sync(|b| { + b.batch_length(args.messages_per_batch) + .linger_time(interval) + }) .partitioning(Partitioning::balanced()) .create_topic_if_not_exists( 3, diff --git a/core/examples/src/sink-data-producer/main.rs b/core/examples/src/sink-data-producer/main.rs index 2607d955..f678bb5f 100644 --- a/core/examples/src/sink-data-producer/main.rs +++ b/core/examples/src/sink-data-producer/main.rs @@ -55,10 +55,12 @@ async fn main() -> Result<(), DataProducerError> { let stream = env::var("IGGY_STREAM").unwrap_or("qw".to_owned()); let topic = env::var("IGGY_TOPIC").unwrap_or("records".to_owned()); let client = create_client(&address, &username, &password).await?; - let mut producer = client + let producer = client .producer(&stream, &topic)? - .batch_size(1000) - .send_interval(IggyDuration::from_str("5ms").unwrap()) + .sync(|b| { + b.batch_length(1000) + .linger_time(IggyDuration::from_str("5ms").unwrap()) + }) .partitioning(Partitioning::balanced()) .build(); producer.init().await?; diff --git a/core/sdk/Cargo.toml b/core/sdk/Cargo.toml index c763af12..ebc8f1b4 100644 --- a/core/sdk/Cargo.toml +++ b/core/sdk/Cargo.toml @@ -48,6 +48,7 @@ futures = { workspace = true } futures-util = { workspace = true } iggy_binary_protocol = { workspace = true } iggy_common = { workspace = true } +num_cpus = "1.16.0" quinn = { workspace = true } reqwest = { workspace = true } reqwest-middleware = { workspace = true } @@ -59,3 +60,6 @@ tokio-rustls = { workspace = true } tracing = { workspace = true } trait-variant = { workspace = true } webpki-roots = { workspace = true } + +[dev-dependencies] +mockall = { workspace = true } diff --git a/core/sdk/src/clients/mod.rs b/core/sdk/src/clients/mod.rs index be854f1b..e50bc443 100644 --- a/core/sdk/src/clients/mod.rs +++ b/core/sdk/src/clients/mod.rs @@ -32,6 +32,10 @@ pub mod consumer; pub mod consumer_builder; pub mod producer; pub mod producer_builder; +pub mod producer_config; +pub mod producer_dispatcher; +pub mod producer_error_callback; +pub mod producer_sharding; const ORDERING: std::sync::atomic::Ordering = std::sync::atomic::Ordering::SeqCst; -const MAX_BATCH_SIZE: usize = 1000000; +const MAX_BATCH_LENGTH: usize = 1000000; diff --git a/core/sdk/src/clients/producer.rs b/core/sdk/src/clients/producer.rs index f3706e3a..58f38e2b 100644 --- a/core/sdk/src/clients/producer.rs +++ b/core/sdk/src/clients/producer.rs @@ -15,8 +15,11 @@ * specific language governing permissions and limitations * under the License. */ -use super::{MAX_BATCH_SIZE, ORDERING}; +use super::ORDERING; +use crate::clients::producer_builder::SendMode; +use crate::clients::producer_config::SyncConfig; +use crate::clients::producer_dispatcher::ProducerDispatcher; use bytes::Bytes; use futures_util::StreamExt; use iggy_binary_protocol::Client; @@ -26,27 +29,37 @@ use iggy_common::{ IggyError, IggyExpiry, IggyMessage, IggyTimestamp, MaxTopicSize, Partitioner, Partitioning, }; use std::sync::Arc; +use std::sync::atomic::Ordering; use std::sync::atomic::{AtomicBool, AtomicU64}; use std::time::Duration; use tokio::time::{Interval, sleep}; use tracing::{error, info, trace, warn}; -unsafe impl Send for IggyProducer {} -unsafe impl Sync for IggyProducer {} +#[cfg(test)] +use mockall::automock; -pub struct IggyProducer { - initialized: bool, +#[cfg_attr(test, automock)] +pub trait ProducerCoreBackend: Send + Sync + 'static { + fn send_internal( + &self, + stream: &Identifier, + topic: &Identifier, + msgs: Vec<IggyMessage>, + partitioning: Option<Arc<Partitioning>>, + ) -> impl Future<Output = Result<(), IggyError>> + Send; +} + +pub struct ProducerCore { + initialized: AtomicBool, can_send: Arc<AtomicBool>, client: Arc<IggySharedMut<Box<dyn Client>>>, stream_id: Arc<Identifier>, stream_name: String, topic_id: Arc<Identifier>, topic_name: String, - batch_length: Option<usize>, partitioning: Option<Arc<Partitioning>>, encryptor: Option<Arc<EncryptorKind>>, partitioner: Option<Arc<dyn Partitioner>>, - linger_time_micros: u64, create_stream_if_not_exists: bool, create_topic_if_not_exists: bool, topic_partitions_count: u32, @@ -54,74 +67,15 @@ pub struct IggyProducer { topic_message_expiry: IggyExpiry, topic_max_size: MaxTopicSize, default_partitioning: Arc<Partitioning>, - can_send_immediately: bool, last_sent_at: Arc<AtomicU64>, send_retries_count: Option<u32>, send_retries_interval: Option<IggyDuration>, + sync_config: Option<SyncConfig>, } -impl IggyProducer { - #[allow(clippy::too_many_arguments)] - pub(crate) fn new( - client: IggySharedMut<Box<dyn Client>>, - stream: Identifier, - stream_name: String, - topic: Identifier, - topic_name: String, - batch_length: Option<usize>, - partitioning: Option<Partitioning>, - encryptor: Option<Arc<EncryptorKind>>, - partitioner: Option<Arc<dyn Partitioner>>, - interval: Option<IggyDuration>, - create_stream_if_not_exists: bool, - create_topic_if_not_exists: bool, - topic_partitions_count: u32, - topic_replication_factor: Option<u8>, - topic_message_expiry: IggyExpiry, - topic_max_size: MaxTopicSize, - send_retries_count: Option<u32>, - send_retries_interval: Option<IggyDuration>, - ) -> Self { - Self { - initialized: false, - client: Arc::new(client), - can_send: Arc::new(AtomicBool::new(true)), - stream_id: Arc::new(stream), - stream_name, - topic_id: Arc::new(topic), - topic_name, - batch_length, - partitioning: partitioning.map(Arc::new), - encryptor, - partitioner, - linger_time_micros: interval.map_or(0, |i| i.as_micros()), - create_stream_if_not_exists, - create_topic_if_not_exists, - topic_partitions_count, - topic_replication_factor, - topic_message_expiry, - topic_max_size, - default_partitioning: Arc::new(Partitioning::balanced()), - can_send_immediately: interval.is_none(), - last_sent_at: Arc::new(AtomicU64::new(0)), - send_retries_count, - send_retries_interval, - } - } - - pub fn stream(&self) -> &Identifier { - &self.stream_id - } - - pub fn topic(&self) -> &Identifier { - &self.topic_id - } - - /// Initializes the producer by subscribing to diagnostic events, creating the stream and topic if they do not exist etc. - /// - /// Note: This method must be invoked before producing messages. - pub async fn init(&mut self) -> Result<(), IggyError> { - if self.initialized { +impl ProducerCore { + pub async fn init(&self) -> Result<(), IggyError> { + if self.initialized.load(Ordering::SeqCst) { return Ok(()); } @@ -179,7 +133,9 @@ impl IggyProducer { .await?; } - self.initialized = true; + let _ = self + .initialized + .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst); info!("Producer has been initialized for stream: {stream_id} and topic: {topic_id}."); Ok(()) } @@ -221,171 +177,6 @@ impl IggyProducer { }); } - pub async fn send(&self, messages: Vec<IggyMessage>) -> Result<(), IggyError> { - if messages.is_empty() { - trace!("No messages to send."); - return Ok(()); - } - - if self.can_send_immediately { - return self - .send_immediately(&self.stream_id, &self.topic_id, messages, None) - .await; - } - - self.send_buffered( - self.stream_id.clone(), - self.topic_id.clone(), - messages, - None, - ) - .await - } - - pub async fn send_one(&self, message: IggyMessage) -> Result<(), IggyError> { - self.send(vec![message]).await - } - - pub async fn send_with_partitioning( - &self, - messages: Vec<IggyMessage>, - partitioning: Option<Arc<Partitioning>>, - ) -> Result<(), IggyError> { - if messages.is_empty() { - trace!("No messages to send."); - return Ok(()); - } - - if self.can_send_immediately { - return self - .send_immediately(&self.stream_id, &self.topic_id, messages, partitioning) - .await; - } - - self.send_buffered( - self.stream_id.clone(), - self.topic_id.clone(), - messages, - partitioning, - ) - .await - } - - pub async fn send_to( - &self, - stream: Arc<Identifier>, - topic: Arc<Identifier>, - messages: Vec<IggyMessage>, - partitioning: Option<Arc<Partitioning>>, - ) -> Result<(), IggyError> { - if messages.is_empty() { - trace!("No messages to send."); - return Ok(()); - } - - if self.can_send_immediately { - return self - .send_immediately(&self.stream_id, &self.topic_id, messages, partitioning) - .await; - } - - self.send_buffered(stream, topic, messages, partitioning) - .await - } - - async fn send_buffered( - &self, - stream: Arc<Identifier>, - topic: Arc<Identifier>, - mut messages: Vec<IggyMessage>, - partitioning: Option<Arc<Partitioning>>, - ) -> Result<(), IggyError> { - self.encrypt_messages(&mut messages)?; - let partitioning = self.get_partitioning(&stream, &topic, &messages, partitioning)?; - let batch_length = self.batch_length.unwrap_or(MAX_BATCH_SIZE); - let batches = messages.chunks_mut(batch_length); - let mut current_batch = 1; - let batches_count = batches.len(); - for batch in batches { - if self.linger_time_micros > 0 { - Self::wait_before_sending( - self.linger_time_micros, - self.last_sent_at.load(ORDERING), - ) - .await; - } - - let messages_count = batch.len(); - trace!( - "Sending {messages_count} messages ({current_batch}/{batches_count} batch(es))..." - ); - self.last_sent_at - .store(IggyTimestamp::now().into(), ORDERING); - self.try_send_messages(&self.stream_id, &self.topic_id, &partitioning, batch) - .await?; - trace!("Sent {messages_count} messages ({current_batch}/{batches_count} batch(es))."); - current_batch += 1; - } - Ok(()) - } - - async fn send_immediately( - &self, - stream: &Identifier, - topic: &Identifier, - mut messages: Vec<IggyMessage>, - partitioning: Option<Arc<Partitioning>>, - ) -> Result<(), IggyError> { - trace!("No batch size specified, sending messages immediately."); - self.encrypt_messages(&mut messages)?; - let partitioning = self.get_partitioning(stream, topic, &messages, partitioning)?; - let batch_length = self.batch_length.unwrap_or(MAX_BATCH_SIZE); - if messages.len() <= batch_length { - self.last_sent_at - .store(IggyTimestamp::now().into(), ORDERING); - self.try_send_messages(stream, topic, &partitioning, &mut messages) - .await?; - return Ok(()); - } - - for batch in messages.chunks_mut(batch_length) { - self.last_sent_at - .store(IggyTimestamp::now().into(), ORDERING); - self.try_send_messages(stream, topic, &partitioning, batch) - .await?; - } - Ok(()) - } - - async fn wait_before_sending(interval: u64, last_sent_at: u64) { - if interval == 0 { - return; - } - - let now: u64 = IggyTimestamp::now().into(); - let elapsed = now - last_sent_at; - if elapsed >= interval { - trace!("No need to wait before sending messages. {now} - {last_sent_at} = {elapsed}"); - return; - } - - let remaining = interval - elapsed; - trace!( - "Waiting for {remaining} microseconds before sending messages... {interval} - {elapsed} = {remaining}" - ); - sleep(Duration::from_micros(remaining)).await; - } - - fn encrypt_messages(&self, messages: &mut [IggyMessage]) -> Result<(), IggyError> { - if let Some(encryptor) = &self.encryptor { - for message in messages { - message.payload = Bytes::from(encryptor.encrypt(&message.payload)?); - message.header.payload_length = message.payload.len() as u32; - } - } - Ok(()) - } - async fn try_send_messages( &self, stream: &Identifier, @@ -505,6 +296,16 @@ impl IggyProducer { } } + fn encrypt_messages(&self, messages: &mut [IggyMessage]) -> Result<(), IggyError> { + if let Some(encryptor) = &self.encryptor { + for message in messages { + message.payload = Bytes::from(encryptor.encrypt(&message.payload)?); + message.header.payload_length = message.payload.len() as u32; + } + } + Ok(()) + } + fn get_partitioning( &self, stream: &Identifier, @@ -525,4 +326,254 @@ impl IggyProducer { })) } } + + async fn wait_before_sending(interval: u64, last_sent_at: u64) { + if interval == 0 { + return; + } + + let now: u64 = IggyTimestamp::now().into(); + let elapsed = now - last_sent_at; + if elapsed >= interval { + trace!("No need to wait before sending messages. {now} - {last_sent_at} = {elapsed}"); + return; + } + + let remaining = interval - elapsed; + trace!( + "Waiting for {remaining} microseconds before sending messages... {interval} - {elapsed} = {remaining}" + ); + sleep(Duration::from_micros(remaining)).await; + } +} + +impl ProducerCoreBackend for ProducerCore { + async fn send_internal( + &self, + stream: &Identifier, + topic: &Identifier, + mut msgs: Vec<IggyMessage>, + partitioning: Option<Arc<Partitioning>>, + ) -> Result<(), IggyError> { + if msgs.is_empty() { + return Ok(()); + } + + if let Err(err) = self.encrypt_messages(&mut msgs) { + return Err(IggyError::ProducerSendFailed { + cause: err.to_string(), + failed: Arc::new(msgs), + }); + } + + let part = match self.get_partitioning(stream, topic, &msgs, partitioning.clone()) { + Ok(p) => p, + Err(err) => { + return Err(IggyError::ProducerSendFailed { + cause: err.to_string(), + failed: Arc::new(msgs), + }); + } + }; + + match &self.sync_config { + Some(cfg) => { + let linger_time_micros = match cfg.linger_time { + Some(t) => t.as_micros(), + None => 0, + }; + if linger_time_micros > 0 { + Self::wait_before_sending(linger_time_micros, self.last_sent_at.load(ORDERING)) + .await; + } + + let max = cfg.batch_length; + let mut index = 0; + while index < msgs.len() { + let end = (index + max).min(msgs.len()); + let chunk = &mut msgs[index..end]; + + if let Err(err) = self.try_send_messages(stream, topic, &part, chunk).await { + let failed_tail = msgs.split_off(index); + return Err(IggyError::ProducerSendFailed { + cause: err.to_string(), + failed: Arc::new(failed_tail), + }); + } + self.last_sent_at + .store(IggyTimestamp::now().into(), ORDERING); + index = end; + } + } + // background send on + _ => { + self.try_send_messages(stream, topic, &part, &mut msgs) + .await + .map_err(|err| IggyError::ProducerSendFailed { + cause: err.to_string(), + failed: Arc::new(msgs), + })?; + self.last_sent_at + .store(IggyTimestamp::now().into(), ORDERING); + } + } + + Ok(()) + } +} + +unsafe impl Send for IggyProducer {} +unsafe impl Sync for IggyProducer {} + +pub struct IggyProducer { + core: Arc<ProducerCore>, + dispatcher: Option<ProducerDispatcher>, +} + +impl IggyProducer { + #[allow(clippy::too_many_arguments)] + pub(crate) fn new( + client: IggySharedMut<Box<dyn Client>>, + stream: Identifier, + stream_name: String, + topic: Identifier, + topic_name: String, + partitioning: Option<Partitioning>, + encryptor: Option<Arc<EncryptorKind>>, + partitioner: Option<Arc<dyn Partitioner>>, + create_stream_if_not_exists: bool, + create_topic_if_not_exists: bool, + topic_partitions_count: u32, + topic_replication_factor: Option<u8>, + topic_message_expiry: IggyExpiry, + topic_max_size: MaxTopicSize, + send_retries_count: Option<u32>, + send_retries_interval: Option<IggyDuration>, + mode: SendMode, + ) -> Self { + let core = Arc::new(ProducerCore { + initialized: AtomicBool::new(true), + client: Arc::new(client), + can_send: Arc::new(AtomicBool::new(true)), + stream_id: Arc::new(stream), + stream_name, + topic_id: Arc::new(topic), + topic_name, + partitioning: partitioning.map(Arc::new), + encryptor, + partitioner, + create_stream_if_not_exists, + create_topic_if_not_exists, + topic_partitions_count, + topic_replication_factor, + topic_message_expiry, + topic_max_size, + default_partitioning: Arc::new(Partitioning::balanced()), + last_sent_at: Arc::new(AtomicU64::new(0)), + send_retries_count, + send_retries_interval, + sync_config: match mode { + SendMode::Sync(ref cfg) => Some(cfg.clone()), + _ => None, + }, + }); + let dispatcher = match mode { + SendMode::Background(cfg) => Some(ProducerDispatcher::new(core.clone(), cfg)), + _ => None, + }; + + Self { core, dispatcher } + } + + pub fn stream(&self) -> &Identifier { + &self.core.stream_id + } + + pub fn topic(&self) -> &Identifier { + &self.core.topic_id + } + + /// Initializes the producer by subscribing to diagnostic events, creating the stream and topic if they do not exist etc. + /// + /// Note: This method must be invoked before producing messages. + pub async fn init(&self) -> Result<(), IggyError> { + self.core.init().await + } + + pub async fn send(&self, messages: Vec<IggyMessage>) -> Result<(), IggyError> { + if messages.is_empty() { + trace!("No messages to send."); + return Ok(()); + } + + let stream_id = self.core.stream_id.clone(); + let topic_id = self.core.topic_id.clone(); + + match &self.dispatcher { + Some(disp) => disp.dispatch(messages, stream_id, topic_id, None).await, + None => { + self.core + .send_internal(&stream_id, &topic_id, messages, None) + .await + } + } + } + + pub async fn send_one(&self, message: IggyMessage) -> Result<(), IggyError> { + self.send(vec![message]).await + } + + pub async fn send_with_partitioning( + &self, + messages: Vec<IggyMessage>, + partitioning: Option<Arc<Partitioning>>, + ) -> Result<(), IggyError> { + if messages.is_empty() { + trace!("No messages to send."); + return Ok(()); + } + + let stream_id = self.core.stream_id.clone(); + let topic_id = self.core.topic_id.clone(); + + match &self.dispatcher { + Some(disp) => { + disp.dispatch(messages, stream_id, topic_id, partitioning) + .await + } + None => { + self.core + .send_internal(&stream_id, &topic_id, messages, partitioning) + .await + } + } + } + + pub async fn send_to( + &self, + stream: Arc<Identifier>, + topic: Arc<Identifier>, + messages: Vec<IggyMessage>, + partitioning: Option<Arc<Partitioning>>, + ) -> Result<(), IggyError> { + if messages.is_empty() { + trace!("No messages to send."); + return Ok(()); + } + + match &self.dispatcher { + Some(disp) => disp.dispatch(messages, stream, topic, partitioning).await, + None => { + self.core + .send_internal(&stream, &topic, messages, partitioning) + .await + } + } + } + + pub async fn shutdown(self) { + if let Some(disp) = self.dispatcher { + disp.shutdown().await; + } + } } diff --git a/core/sdk/src/clients/producer_builder.rs b/core/sdk/src/clients/producer_builder.rs index 76ecfeea..431f94ee 100644 --- a/core/sdk/src/clients/producer_builder.rs +++ b/core/sdk/src/clients/producer_builder.rs @@ -15,27 +15,234 @@ // specific language governing permissions and limitations // under the License. -use super::MAX_BATCH_SIZE; +use super::MAX_BATCH_LENGTH; +use crate::clients::producer_config::{BackgroundConfig, BackpressureMode, SyncConfig}; +use crate::clients::producer_error_callback::ErrorCallback; +use crate::clients::producer_error_callback::LogErrorCallback; +use crate::clients::producer_sharding::{BalancedSharding, Sharding}; use crate::prelude::IggyProducer; use iggy_binary_protocol::Client; use iggy_common::locking::IggySharedMut; use iggy_common::{ - EncryptorKind, Identifier, IggyDuration, IggyExpiry, MaxTopicSize, Partitioner, Partitioning, + EncryptorKind, Identifier, IggyByteSize, IggyDuration, IggyExpiry, MaxTopicSize, Partitioner, + Partitioning, }; use std::sync::Arc; -#[derive(Debug)] +pub struct BackgroundBuilder { + num_shards: Option<usize>, + batch_size: Option<usize>, + batch_length: Option<usize>, + failure_mode: Option<BackpressureMode>, + max_buffer_size: Option<IggyByteSize>, + linger_time: Option<IggyDuration>, + max_in_flight: Option<usize>, + + error_callback: Box<dyn ErrorCallback>, + sharding: Box<dyn Sharding>, +} + +impl Default for BackgroundBuilder { + fn default() -> Self { + let num_shards = default_shard_count(); + BackgroundBuilder { + num_shards: Some(num_shards), + sharding: Box::new(BalancedSharding::default()), + error_callback: Box::new(LogErrorCallback), + batch_size: Some(1_048_576), + batch_length: Some(1000), + failure_mode: Some(BackpressureMode::Block), + max_buffer_size: Some(IggyByteSize::from(32 * 1_048_576)), + linger_time: Some(IggyDuration::from(1000)), + max_in_flight: Some(num_shards * num_shards), + } + } +} + +impl BackgroundBuilder { + /// Sets the number of messages to batch before sending them, can be combined with `interval`. + pub fn batch_length(self, batch_length: u32) -> Self { + Self { + batch_length: if batch_length == 0 { + None + } else { + Some(batch_length.min(MAX_BATCH_LENGTH as u32) as usize) + }, + ..self + } + } + + /// Clears the batch size. + pub fn without_batch_length(self) -> Self { + Self { + batch_length: None, + ..self + } + } + + /// Sets the interval between sending the messages, can be combined with `batch_length`. + pub fn linger_time(self, interval: IggyDuration) -> Self { + Self { + linger_time: Some(interval), + ..self + } + } + + /// Clears the interval. + pub fn without_linger_time(self) -> Self { + Self { + linger_time: None, + ..self + } + } + + /// Sets the number of shards (background workers). + pub fn num_shards(self, value: usize) -> Self { + Self { + num_shards: Some(value), + ..self + } + } + + /// Sets the maximum size of a batch in bytes. + pub fn batch_size(self, value: usize) -> Self { + Self { + batch_size: Some(value), + ..self + } + } + + /// Sets the sharding strategy. + /// You can pass a custom implementation that implements the `Sharding` trait. + pub fn sharding(self, sharding: Box<dyn Sharding>) -> Self { + Self { sharding, ..self } + } + + /// Sets the maximum buffer size for all in-flight messages (in bytes). + pub fn max_buffer_size(self, value: IggyByteSize) -> Self { + Self { + max_buffer_size: Some(value), + ..self + } + } + + /// Sets the failure mode behavior (e.g., block, fail immediately, timeout). + pub fn failure_mode(self, mode: BackpressureMode) -> Self { + Self { + failure_mode: Some(mode), + ..self + } + } + + /// Sets the error callback for handling background sending errors. + pub fn error_callback(self, callback: Box<dyn ErrorCallback>) -> Self { + Self { + error_callback: callback, + ..self + } + } + + /// Sets the maximum number of in-flight batches/messages. + pub fn max_in_flight(self, value: usize) -> Self { + Self { + max_in_flight: Some(value), + ..self + } + } + + pub fn build(self) -> BackgroundConfig { + BackgroundConfig { + num_shards: self.num_shards.unwrap_or(8), + batch_size: self.batch_size, + batch_length: self.batch_length, + failure_mode: self.failure_mode.unwrap_or(BackpressureMode::Block), + max_buffer_size: self.max_buffer_size, + linger_time: self.linger_time.unwrap_or(IggyDuration::from(1000)), + error_callback: Arc::new(self.error_callback), + sharding: self.sharding, + max_in_flight: self.max_in_flight, + } + } +} + +pub struct SyncBuilder { + batch_length: Option<usize>, + linger_time: Option<IggyDuration>, +} + +impl Default for SyncBuilder { + fn default() -> Self { + Self { + batch_length: Some(1000), + linger_time: Some(IggyDuration::from(1000)), + } + } +} + +impl SyncBuilder { + /// Sets the number of messages to batch before sending them, can be combined with `interval`. + pub fn batch_length(self, batch_length: u32) -> Self { + Self { + batch_length: if batch_length == 0 { + None + } else { + Some(batch_length.min(MAX_BATCH_LENGTH as u32) as usize) + }, + ..self + } + } + + /// Clears the batch size. + pub fn without_batch_length(self) -> Self { + Self { + batch_length: None, + ..self + } + } + + /// Sets the interval between sending the messages, can be combined with `batch_length`. + pub fn linger_time(self, interval: IggyDuration) -> Self { + Self { + linger_time: Some(interval), + ..self + } + } + + /// Clears the interval. + pub fn without_linger_time(self) -> Self { + Self { + linger_time: None, + ..self + } + } + + pub fn build(self) -> SyncConfig { + SyncConfig { + batch_length: self.batch_length.unwrap_or(MAX_BATCH_LENGTH), + linger_time: self.linger_time, + } + } +} + +pub enum SendMode { + Sync(SyncConfig), + Background(BackgroundConfig), +} + +impl Default for SendMode { + fn default() -> Self { + SendMode::Sync(SyncBuilder::default().build()) + } +} + pub struct IggyProducerBuilder { client: IggySharedMut<Box<dyn Client>>, stream: Identifier, stream_name: String, topic: Identifier, topic_name: String, - batch_length: Option<usize>, - partitioning: Option<Partitioning>, encryptor: Option<Arc<EncryptorKind>>, partitioner: Option<Arc<dyn Partitioner>>, - linger_time: Option<IggyDuration>, create_stream_if_not_exists: bool, create_topic_if_not_exists: bool, topic_partitions_count: u32, @@ -44,6 +251,8 @@ pub struct IggyProducerBuilder { send_retries_interval: Option<IggyDuration>, topic_message_expiry: IggyExpiry, topic_max_size: MaxTopicSize, + partitioning: Option<Partitioning>, + mode: SendMode, } impl IggyProducerBuilder { @@ -63,11 +272,9 @@ impl IggyProducerBuilder { stream_name, topic, topic_name, - batch_length: Some(1000), partitioning: None, encryptor, partitioner, - linger_time: Some(IggyDuration::from(1000)), create_stream_if_not_exists: true, create_topic_if_not_exists: true, topic_partitions_count: 1, @@ -76,6 +283,7 @@ impl IggyProducerBuilder { topic_max_size: MaxTopicSize::ServerDefault, send_retries_count: Some(3), send_retries_interval: Some(IggyDuration::ONE_SECOND), + mode: SendMode::default(), } } @@ -89,42 +297,6 @@ impl IggyProducerBuilder { Self { topic, ..self } } - /// Sets the number of messages to batch before sending them, can be combined with `interval`. - pub fn batch_length(self, batch_length: u32) -> Self { - Self { - batch_length: if batch_length == 0 { - None - } else { - Some(batch_length.min(MAX_BATCH_SIZE as u32) as usize) - }, - ..self - } - } - - /// Clears the batch size. - pub fn without_batch_length(self) -> Self { - Self { - batch_length: None, - ..self - } - } - - /// Sets the interval between sending the messages, can be combined with `batch_length`. - pub fn linger_time(self, interval: IggyDuration) -> Self { - Self { - linger_time: Some(interval), - ..self - } - } - - /// Clears the interval. - pub fn without_linger_time(self) -> Self { - Self { - linger_time: None, - ..self - } - } - /// Sets the encryptor for encrypting the messages' payloads. pub fn encryptor(self, encryptor: Arc<EncryptorKind>) -> Self { Self { @@ -226,9 +398,36 @@ impl IggyProducerBuilder { } } - /// Builds the producer. + /// Configures the producer to use synchronous (immediate) sending mode. + /// + /// In sync mode, messages are sent immediately on `.send()` without background buffering. + /// + /// # Arguments + /// * `f` - A closure that modifies the `SyncBuilder` configuration. + pub fn sync<F>(mut self, f: F) -> Self + where + F: FnOnce(SyncBuilder) -> SyncBuilder, + { + let cfg = f(SyncBuilder::default()).build(); + self.mode = SendMode::Sync(cfg); + self + } + + /// Configures the producer to use background (asynchronous) sending mode. /// - /// Note: After building the producer, `init()` must be invoked before producing messages. + /// In background mode, messages are buffered and sent in batches via background tasks. + /// + /// # Arguments + /// * `f` - A closure that modifies the `BackgroundBuilder` configuration. + pub fn background<F>(mut self, f: F) -> Self + where + F: FnOnce(BackgroundBuilder) -> BackgroundBuilder, + { + let cfg = f(BackgroundBuilder::default()).build(); + self.mode = SendMode::Background(cfg); + self + } + pub fn build(self) -> IggyProducer { IggyProducer::new( self.client, @@ -236,11 +435,9 @@ impl IggyProducerBuilder { self.stream_name, self.topic, self.topic_name, - self.batch_length, self.partitioning, self.encryptor, self.partitioner, - self.linger_time, self.create_stream_if_not_exists, self.create_topic_if_not_exists, self.topic_partitions_count, @@ -249,6 +446,12 @@ impl IggyProducerBuilder { self.topic_max_size, self.send_retries_count, self.send_retries_interval, + self.mode, ) } } + +fn default_shard_count() -> usize { + let cpus = num_cpus::get(); + cpus.clamp(2, 16) +} diff --git a/core/sdk/src/clients/producer_config.rs b/core/sdk/src/clients/producer_config.rs new file mode 100644 index 00000000..0ba0e6ef --- /dev/null +++ b/core/sdk/src/clients/producer_config.rs @@ -0,0 +1,53 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +use std::sync::Arc; + +use iggy_common::{IggyByteSize, IggyDuration}; + +use crate::clients::producer_error_callback::ErrorCallback; +use crate::clients::producer_sharding::Sharding; + +#[derive(Debug, Clone)] +/// Determines how the `send_messages` API should behave when problem is encountered +pub enum BackpressureMode { + /// Block until the send succeeds + Block, + /// Block with a timeout, after which the send fails + BlockWithTimeout(IggyDuration), + /// Fail immediately without retrying + FailImmediately, +} + +#[derive(Debug)] +pub struct BackgroundConfig { + pub num_shards: usize, + pub batch_size: Option<usize>, + pub batch_length: Option<usize>, + pub failure_mode: BackpressureMode, + pub max_buffer_size: Option<IggyByteSize>, + pub max_in_flight: Option<usize>, + pub linger_time: IggyDuration, + pub error_callback: Arc<Box<dyn ErrorCallback + Send + Sync>>, + pub sharding: Box<dyn Sharding + Send + Sync>, +} + +#[derive(Clone)] +pub struct SyncConfig { + pub batch_length: usize, + pub linger_time: Option<IggyDuration>, +} diff --git a/core/sdk/src/clients/producer_dispatcher.rs b/core/sdk/src/clients/producer_dispatcher.rs new file mode 100644 index 00000000..c56df46d --- /dev/null +++ b/core/sdk/src/clients/producer_dispatcher.rs @@ -0,0 +1,447 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +use crate::clients::producer::ProducerCoreBackend; +use crate::clients::producer_config::{BackgroundConfig, BackpressureMode}; +use crate::clients::producer_error_callback::ErrorCtx; +use crate::clients::producer_sharding::{Shard, ShardMessage, ShardMessageWithPermits}; +use futures::FutureExt; +use iggy_common::{Identifier, IggyError, IggyMessage, Partitioning, Sizeable}; +use std::sync::Arc; +use std::sync::atomic::{AtomicBool, Ordering}; +use tokio::sync::Semaphore; + +use tokio::{sync::Notify, task::JoinHandle}; + +pub struct ProducerDispatcher { + shards: Vec<Shard>, + config: Arc<BackgroundConfig>, + closed: AtomicBool, + slots_permit: Arc<Semaphore>, + bytes_permit: Arc<Semaphore>, + shutdown_notify: Arc<Notify>, + _join_handle: JoinHandle<()>, +} + +impl ProducerDispatcher { + pub fn new(core: Arc<impl ProducerCoreBackend>, config: BackgroundConfig) -> Self { + let mut shards = Vec::with_capacity(config.num_shards); + let config = Arc::new(config); + let shutdown_notify = Arc::new(Notify::new()); + + let (err_tx, err_rx) = flume::unbounded::<ErrorCtx>(); + let err_callback = config.error_callback.clone(); + let shutdown_notify_clone = shutdown_notify.clone(); + let handle = tokio::spawn(async move { + loop { + tokio::select! { + maybe_message = err_rx.recv_async() => { + match maybe_message { + Ok(ctx) => { + if let Err(panic) = std::panic::AssertUnwindSafe(err_callback.call(ctx)) + .catch_unwind() + .await + { + tracing::error!("error_callback panicked: {:?}", panic); + } + } + Err(_) => break + } + } + _ = shutdown_notify_clone.notified() => { + tracing::debug!("error-callback worker finished"); + break + } + } + } + }); + + for _ in 0..config.num_shards { + shards.push(Shard::new(core.clone(), config.clone(), err_tx.clone())); + } + + let bytes_permit = match config.max_buffer_size { + Some(val) => val.as_bytes_u32() as usize, + None => usize::MAX, + }; + let slot_permit = match config.max_in_flight { + Some(val) => val, + None => usize::MAX, + }; + + Self { + shards, + config, + closed: AtomicBool::new(false), + bytes_permit: Arc::new(Semaphore::new(bytes_permit)), + slots_permit: Arc::new(Semaphore::new(slot_permit)), + shutdown_notify, + _join_handle: handle, + } + } + + pub async fn dispatch( + &self, + messages: Vec<IggyMessage>, + stream: Arc<Identifier>, + topic: Arc<Identifier>, + partitioning: Option<Arc<Partitioning>>, + ) -> Result<(), IggyError> { + if self.closed.load(Ordering::Relaxed) { + return Err(IggyError::ProducerClosed); + } + + let shard_message = ShardMessage { + messages, + stream, + topic, + partitioning, + }; + let batch_bytes = shard_message.get_size_bytes(); + + let permit_bytes = match self + .bytes_permit + .clone() + .try_acquire_many_owned(batch_bytes.as_bytes_u32()) + { + Ok(perm) => perm, + Err(_) => match self.config.failure_mode { + BackpressureMode::FailImmediately => { + return Err(IggyError::BackgroundSendBufferOverflow); + } + BackpressureMode::Block => self + .bytes_permit + .clone() + .acquire_many_owned(batch_bytes.as_bytes_u32()) + .await + .map_err(|_| IggyError::BackgroundSendError)?, + BackpressureMode::BlockWithTimeout(timeout_dur) => { + match tokio::time::timeout( + timeout_dur.get_duration(), + self.bytes_permit + .clone() + .acquire_many_owned(batch_bytes.as_bytes_u32()), + ) + .await + { + Ok(Ok(perm)) => perm, + Ok(Err(_)) => return Err(IggyError::BackgroundSendError), + Err(_) => return Err(IggyError::BackgroundSendTimeout), + } + } + }, + }; + + let permit_slot = match self.slots_permit.clone().try_acquire_owned() { + Ok(perm) => perm, + Err(_) => match self.config.failure_mode { + BackpressureMode::FailImmediately => { + drop(permit_bytes); + return Err(IggyError::BackgroundSendError); + } + BackpressureMode::Block => match self.slots_permit.clone().acquire_owned().await { + Ok(perm) => perm, + Err(_) => { + drop(permit_bytes); + return Err(IggyError::BackgroundSendError); + } + }, + BackpressureMode::BlockWithTimeout(timeout_dur) => { + match tokio::time::timeout( + timeout_dur.get_duration(), + self.slots_permit.clone().acquire_owned(), + ) + .await + { + Ok(Ok(perm)) => perm, + Ok(Err(_)) => { + drop(permit_bytes); + return Err(IggyError::BackgroundSendError); + } + Err(_) => { + drop(permit_bytes); + return Err(IggyError::BackgroundSendTimeout); + } + } + } + }, + }; + + let shard_ix = self.config.sharding.pick_shard( + self.shards.len(), + &shard_message.messages, + &shard_message.stream, + &shard_message.topic, + ); + debug_assert!(shard_ix < self.shards.len()); + let shard = &self.shards[shard_ix]; + + shard + .send(ShardMessageWithPermits::new( + shard_message, + permit_bytes, + permit_slot, + )) + .await + } + + pub async fn shutdown(&self) { + if self.closed.swap(true, Ordering::Relaxed) { + return; + } + + for shard in &self.shards { + shard.shutdown().await; + } + + self.shutdown_notify.notify_waiters(); + } +} + +#[cfg(test)] +mod tests { + use std::pin::Pin; + use std::sync::atomic::AtomicUsize; + use std::time::Duration; + + use bytes::Bytes; + use tokio::time::sleep; + + use crate::clients::producer::MockProducerCoreBackend; + use crate::clients::producer_builder::BackgroundBuilder; + use crate::clients::producer_error_callback::ErrorCallback; + use crate::clients::producer_sharding::Sharding; + + use super::*; + + fn dummy_identifier() -> Arc<Identifier> { + Arc::new(Identifier::numeric(1).unwrap()) + } + + fn dummy_message(size: usize) -> IggyMessage { + IggyMessage::builder() + .payload(Bytes::from(vec![0u8; size])) + .build() + .unwrap() + } + + #[tokio::test] + async fn test_dispatch_successful() { + let mut mock = MockProducerCoreBackend::new(); + mock.expect_send_internal() + .times(1) + .returning(|_, _, _, _| Box::pin(async { Ok(()) })); + + let msg = dummy_message(5); + let config = BackgroundBuilder::default() + .max_buffer_size(100.into()) + .max_in_flight(10) + .num_shards(1) + .build(); + + let dispatcher = ProducerDispatcher::new(Arc::new(mock), config); + + let result = dispatcher + .dispatch(vec![msg], dummy_identifier(), dummy_identifier(), None) + .await; + + sleep(Duration::from_millis(100)).await; + assert!(result.is_ok()); + } + + #[tokio::test] + async fn test_dispatch_fails_on_buffer_overflow_immediate() { + let mock = MockProducerCoreBackend::new(); + + let msg = dummy_message(200); + let config = BackgroundBuilder::default() + .max_buffer_size(100.into()) + .failure_mode(BackpressureMode::FailImmediately) + .num_shards(1) + .build(); + + let dispatcher = ProducerDispatcher::new(Arc::new(mock), config); + + let result = dispatcher + .dispatch(vec![msg], dummy_identifier(), dummy_identifier(), None) + .await; + + assert!(matches!( + result, + Err(IggyError::BackgroundSendBufferOverflow) + )); + } + + #[tokio::test] + async fn test_dispatch_times_out_on_block_with_timeout() { + let mock = MockProducerCoreBackend::new(); + + let msg = dummy_message(200); + let config = BackgroundBuilder::default() + .max_buffer_size(100.into()) + .max_in_flight(1) + .failure_mode(BackpressureMode::BlockWithTimeout( + Duration::from_millis(50).into(), + )) + .num_shards(1) + .build(); + + let dispatcher = ProducerDispatcher::new(Arc::new(mock), config); + + let _ = dispatcher + .bytes_permit + .clone() + .acquire_many_owned(100) + .await; + + let result = dispatcher + .dispatch(vec![msg], dummy_identifier(), dummy_identifier(), None) + .await; + + assert!(matches!(result, Err(IggyError::BackgroundSendTimeout))); + } + + #[tokio::test] + async fn test_dispatch_waits_and_succeeds_on_block_mode() { + let mut mock = MockProducerCoreBackend::new(); + mock.expect_send_internal() + .times(1) + .returning(|_, _, _, _| Box::pin(async { Ok(()) })); + + let msg = ShardMessage { + stream: dummy_identifier(), + topic: dummy_identifier(), + messages: vec![dummy_message(5)], + partitioning: None, + }; + + let config = BackgroundBuilder::default() + .max_buffer_size(msg.get_size_bytes()) + .max_in_flight(1) + .failure_mode(BackpressureMode::Block) + .num_shards(1) + .build(); + + let dispatcher = ProducerDispatcher::new(Arc::new(mock), config); + + let _block = dispatcher + .bytes_permit + .clone() + .acquire_many_owned(msg.get_size_bytes().as_bytes_u32()) + .await + .unwrap(); + + let msg_clone = ShardMessage { + stream: msg.stream.clone(), + topic: msg.topic.clone(), + messages: msg.messages, + partitioning: msg.partitioning.clone(), + }; + + tokio::spawn(async move { + tokio::time::sleep(Duration::from_millis(100)).await; + drop(_block); + }); + + let result = dispatcher + .dispatch( + msg_clone.messages, + msg_clone.topic, + msg_clone.stream, + msg_clone.partitioning, + ) + .await; + + tokio::time::sleep(Duration::from_millis(200)).await; + assert!(result.is_ok()); + } + + #[derive(Clone, Debug)] + struct TestSharding { + called: Arc<AtomicUsize>, + } + + impl Sharding for TestSharding { + fn pick_shard( + &self, + num_shards: usize, + _messages: &[IggyMessage], + _stream: &Identifier, + _topic: &Identifier, + ) -> usize { + self.called.fetch_add(1, Ordering::SeqCst); + num_shards - 1 + } + } + + #[derive(Clone, Debug)] + struct TestErrorCallback { + called: Arc<AtomicUsize>, + } + + impl ErrorCallback for TestErrorCallback { + fn call(&self, _ctx: ErrorCtx) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> { + self.called.fetch_add(1, Ordering::SeqCst); + Box::pin(async {}) + } + } + + #[tokio::test] + async fn test_custom_sharding_and_error_callback() { + let mut mock = MockProducerCoreBackend::new(); + mock.expect_send_internal() + .times(1) + .returning(|_, _, _, _| { + Box::pin(async { + Err(IggyError::ProducerSendFailed { + cause: "some_error".to_string(), + failed: Arc::new(vec![dummy_message(10)]), + }) + }) + }); + + let sharding_called = Arc::new(AtomicUsize::new(0)); + let error_called = Arc::new(AtomicUsize::new(0)); + + let config = BackgroundBuilder::default() + .num_shards(1) + .error_callback(Box::new(TestErrorCallback { + called: error_called.clone(), + })) + .sharding(Box::new(TestSharding { + called: sharding_called.clone(), + })) + .build(); + + let dispatcher = ProducerDispatcher::new(Arc::new(mock), config); + + let result = dispatcher + .dispatch( + vec![dummy_message(10)], + dummy_identifier(), + dummy_identifier(), + None, + ) + .await; + + tokio::time::sleep(Duration::from_millis(100)).await; + + assert!(result.is_ok()); + assert_eq!(sharding_called.load(Ordering::SeqCst), 1); + assert_eq!(error_called.load(Ordering::SeqCst), 1); + } +} diff --git a/core/sdk/src/clients/producer_error_callback.rs b/core/sdk/src/clients/producer_error_callback.rs new file mode 100644 index 00000000..59a6dc45 --- /dev/null +++ b/core/sdk/src/clients/producer_error_callback.rs @@ -0,0 +1,67 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +use std::fmt::Debug; +use std::pin::Pin; +use std::sync::Arc; + +use iggy_common::{Identifier, IggyMessage, Partitioning}; +use tracing::error; + +#[derive(Debug)] +pub struct ErrorCtx { + pub cause: String, + pub stream: Arc<Identifier>, + pub topic: Arc<Identifier>, + pub partitioning: Option<Arc<Partitioning>>, + pub messages: Arc<Vec<IggyMessage>>, +} + +/// A trait for handling background sending errors. +/// +/// This is used when a message batch fails to send in an asynchronous background task. +/// Implementors can define custom logic such as logging, retrying, alerting, etc. +pub trait ErrorCallback: Send + Sync + Debug + 'static { + fn call(&self, ctx: ErrorCtx) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>>; +} + +/// Default implementation of [`ErrorCallback`] that logs the error using `tracing::error!`. +/// +/// Logs include stream, topic, optional partitioning, number of messages, and the cause. +#[derive(Debug, Default)] +pub struct LogErrorCallback; + +impl ErrorCallback for LogErrorCallback { + fn call(&self, ctx: ErrorCtx) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> { + Box::pin(async move { + let partitioning = ctx + .partitioning + .as_ref() + .map(|p| format!("{:?}", p)) + .unwrap_or_else(|| "None".to_string()); + + error!( + cause = ctx.cause, + stream = %ctx.stream, + topic = %ctx.topic, + partitioning = %partitioning, + num_messages = ctx.messages.len(), + "Failed to send messages in background task", + ); + }) + } +} diff --git a/core/sdk/src/clients/producer_sharding.rs b/core/sdk/src/clients/producer_sharding.rs new file mode 100644 index 00000000..ab3757ec --- /dev/null +++ b/core/sdk/src/clients/producer_sharding.rs @@ -0,0 +1,437 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +use std::sync::Arc; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; + +use iggy_common::{Identifier, IggyByteSize, IggyError, IggyMessage, Partitioning, Sizeable}; +use tokio::sync::{Notify, OwnedSemaphorePermit}; +use tokio::task::JoinHandle; +use tracing::error; + +use crate::clients::producer::ProducerCoreBackend; +use crate::clients::producer_config::BackgroundConfig; +use crate::clients::producer_error_callback::ErrorCtx; + +/// A strategy for distributing messages across shards. +/// +/// Implementors of this trait define how to choose a shard for a given batch of messages. +/// This allows customizing message routing based on message content, stream/topic identifiers, +/// or round-robin load balancing. +pub trait Sharding: Send + Sync + std::fmt::Debug + 'static { + fn pick_shard( + &self, + num_shards: usize, + messages: &[IggyMessage], + stream: &Identifier, + topic: &Identifier, + ) -> usize; +} + +/// A simple round-robin sharding strategy. +/// Distributes messages evenly across all shards by incrementing an atomic counter. +#[derive(Default, Debug)] +pub struct BalancedSharding { + counter: AtomicUsize, +} + +impl Sharding for BalancedSharding { + /// Picks the next shard in a round-robin fashion. + fn pick_shard( + &self, + num_shards: usize, + _: &[IggyMessage], + _: &Identifier, + _: &Identifier, + ) -> usize { + self.counter.fetch_add(1, Ordering::Relaxed) % num_shards + } +} + +#[derive(Debug)] +pub struct ShardMessage { + pub stream: Arc<Identifier>, + pub topic: Arc<Identifier>, + pub messages: Vec<IggyMessage>, + pub partitioning: Option<Arc<Partitioning>>, +} + +impl Sizeable for ShardMessage { + fn get_size_bytes(&self) -> IggyByteSize { + let mut total = IggyByteSize::new(0); + total += self.stream.get_size_bytes(); + total += self.topic.get_size_bytes(); + for msg in &self.messages { + total += msg.get_size_bytes(); + } + total + } +} + +pub struct ShardMessageWithPermits { + pub inner: ShardMessage, + _bytes_permit: Option<OwnedSemaphorePermit>, + _slot_permit: Option<OwnedSemaphorePermit>, +} + +impl ShardMessageWithPermits { + pub fn new( + msg: ShardMessage, + permit_bytes: OwnedSemaphorePermit, + permit_slot: OwnedSemaphorePermit, + ) -> Self { + Self { + inner: msg, + _bytes_permit: Some(permit_bytes), + _slot_permit: Some(permit_slot), + } + } +} + +pub struct Shard { + tx: flume::Sender<ShardMessageWithPermits>, + shutdown_notify: Arc<Notify>, + closed: Arc<AtomicBool>, + _handle: JoinHandle<()>, +} + +impl Shard { + pub fn new( + core: Arc<impl ProducerCoreBackend>, + config: Arc<BackgroundConfig>, + err_sender: flume::Sender<ErrorCtx>, + ) -> Self { + let (tx, rx) = flume::bounded::<ShardMessageWithPermits>(256); + let shutdown_notify = Arc::new(Notify::new()); + let closed = Arc::new(AtomicBool::new(false)); + + let shutdown_notify_clone = shutdown_notify.clone(); + let closed_clone = closed.clone(); + let handle = tokio::spawn(async move { + let mut buffer = Vec::new(); + let mut buffer_bytes = 0; + let mut last_flush = tokio::time::Instant::now(); + + loop { + let deadline = last_flush + config.linger_time.get_duration(); + tokio::select! { + maybe_msg = rx.recv_async() => { + match maybe_msg { + Ok(msg) => { + buffer_bytes += msg.inner.get_size_bytes().as_bytes_usize(); + buffer.push(msg); + + let exceed_batch_len = config.batch_length.is_some_and(|len| buffer.len() >= len); + let exceed_batch_size = config.batch_size.is_some_and(|size| buffer_bytes >= size); + + if exceed_batch_len || exceed_batch_size { + Self::flush_buffer(&core, &mut buffer, &mut buffer_bytes, &err_sender).await; + last_flush = tokio::time::Instant::now(); + } + } + Err(_) => break, + } + } + _ = tokio::time::sleep_until(deadline) => { + if !buffer.is_empty() { + Self::flush_buffer(&core, &mut buffer, &mut buffer_bytes, &err_sender).await; + last_flush = tokio::time::Instant::now(); + } + } + _ = shutdown_notify_clone.notified() => { + closed_clone.store(true, Ordering::Release); + if !buffer.is_empty() { + Self::flush_buffer(&core, &mut buffer, &mut buffer_bytes, &err_sender).await; + } + break; + } + } + } + }); + + Self { + tx, + shutdown_notify, + closed, + _handle: handle, + } + } + + async fn flush_buffer( + core: &Arc<impl ProducerCoreBackend>, + buffer: &mut Vec<ShardMessageWithPermits>, + buffer_bytes: &mut usize, + err_sender: &flume::Sender<ErrorCtx>, + ) { + for msg in buffer.drain(..) { + let result = core + .send_internal( + &msg.inner.stream, + &msg.inner.topic, + msg.inner.messages, + msg.inner.partitioning.clone(), + ) + .await; + + if let Err(err) = result { + if let IggyError::ProducerSendFailed { failed, cause } = &err { + let ctx = ErrorCtx { + cause: cause.clone(), + stream: msg.inner.stream, + topic: msg.inner.topic, + partitioning: msg.inner.partitioning, + messages: failed.clone(), + }; + let _ = err_sender.send_async(ctx).await; + } else { + tracing::error!("background send failed: {err}"); + } + } + } + *buffer_bytes = 0; + } + + pub(crate) async fn send(&self, message: ShardMessageWithPermits) -> Result<(), IggyError> { + if self.closed.load(Ordering::Acquire) { + return Err(IggyError::ProducerClosed); + } + + self.tx.send_async(message).await.map_err(|e| { + error!("Failed to send_async: {e}"); + IggyError::BackgroundSendError + }) + } + + pub(crate) async fn shutdown(&self) { + self.shutdown_notify.notify_waiters(); + } +} + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use bytes::Bytes; + use iggy_common::IggyDuration; + use tokio::{ + sync::{Notify, Semaphore}, + time::sleep, + }; + + use super::*; + use crate::clients::{producer::MockProducerCoreBackend, producer_builder::BackgroundBuilder}; + + fn dummy_identifier() -> Arc<Identifier> { + Arc::new(Identifier::numeric(1).unwrap()) + } + + fn dummy_message(size: usize) -> IggyMessage { + IggyMessage::builder() + .payload(Bytes::from(vec![0u8; size])) + .build() + .unwrap() + } + + #[tokio::test] + async fn test_shard_flushes_by_batch_length() { + let mut mock = MockProducerCoreBackend::new(); + mock.expect_send_internal() + .times(10) + .returning(|_, _, _, _| Box::pin(async { Ok(()) })); + + let bb = BackgroundBuilder::default() + .batch_length(10) + .linger_time(IggyDuration::new_from_secs(1)) + .batch_size(10_000); + let config = Arc::new(bb.build()); + + let (permit_bytes, permit_slot) = ( + Arc::new(Semaphore::new(100_000)), + Arc::new(Semaphore::new(100)), + ); + + let shard = Shard::new(Arc::new(mock), config, flume::unbounded().0); + + for _ in 0..10 { + let message = ShardMessage { + stream: dummy_identifier(), + topic: dummy_identifier(), + messages: vec![dummy_message(1)], + partitioning: None, + }; + let wrapped = ShardMessageWithPermits::new( + message, + permit_bytes.clone().acquire_many_owned(1).await.unwrap(), + permit_slot.clone().acquire_owned().await.unwrap(), + ); + shard.send(wrapped).await.unwrap(); + } + + sleep(Duration::from_millis(100)).await; + } + + #[tokio::test] + async fn test_shard_flushes_by_batch_size() { + let mut mock = MockProducerCoreBackend::new(); + mock.expect_send_internal() + .times(1) + .returning(|_, _, _, _| Box::pin(async { Ok(()) })); + + let bb = BackgroundBuilder::default() + .batch_length(1000) + .linger_time(IggyDuration::new_from_secs(1)) + .batch_size(10_000); + let config = Arc::new(bb.build()); + + let (permit_bytes, permit_slot) = ( + Arc::new(Semaphore::new(10_000)), + Arc::new(Semaphore::new(100)), + ); + + let shard = Shard::new(Arc::new(mock), config, flume::unbounded().0); + + let message = ShardMessage { + stream: dummy_identifier(), + topic: dummy_identifier(), + messages: vec![dummy_message(10_000)], + partitioning: None, + }; + let wrapped = ShardMessageWithPermits::new( + message, + permit_bytes + .clone() + .acquire_many_owned(10_000) + .await + .unwrap(), + permit_slot.clone().acquire_owned().await.unwrap(), + ); + shard.send(wrapped).await.unwrap(); + + sleep(Duration::from_millis(100)).await; + } + + #[tokio::test] + async fn test_shard_flushes_by_timeout() { + let mut mock = MockProducerCoreBackend::new(); + mock.expect_send_internal() + .times(1) + .returning(|_, _, _, _| Box::pin(async { Ok(()) })); + + let bb = BackgroundBuilder::default() + .batch_length(10) + .linger_time(IggyDuration::new(Duration::from_millis(50))) + .batch_size(10_000); + let config = Arc::new(bb.build()); + + let (permit_bytes, permit_slot) = ( + Arc::new(Semaphore::new(10_000)), + Arc::new(Semaphore::new(100)), + ); + + let shard = Shard::new(Arc::new(mock), config, flume::unbounded().0); + + let message = ShardMessage { + stream: dummy_identifier(), + topic: dummy_identifier(), + messages: vec![dummy_message(1)], + partitioning: None, + }; + let wrapped = ShardMessageWithPermits::new( + message, + permit_bytes.clone().acquire_many_owned(1).await.unwrap(), + permit_slot.clone().acquire_owned().await.unwrap(), + ); + shard.send(wrapped).await.unwrap(); + + sleep(Duration::from_millis(100)).await; + } + + #[tokio::test] + async fn test_shard_forwards_error() { + let mut mock = MockProducerCoreBackend::new(); + let error = IggyError::ProducerSendFailed { + failed: Arc::new(vec![dummy_message(1)]), + cause: "test error".into(), + }; + + mock.expect_send_internal().returning(move |_, _, _, _| { + let err = error.clone(); + Box::pin(async move { Err(err) }) + }); + + let (err_tx, err_rx) = flume::unbounded(); + let bb = BackgroundBuilder::default(); + let config = Arc::new(bb.build()); + + let (permit_bytes, permit_slot) = ( + Arc::new(Semaphore::new(10_000)), + Arc::new(Semaphore::new(100)), + ); + + let shard = Shard::new(Arc::new(mock), config, err_tx); + + let message = ShardMessage { + stream: dummy_identifier(), + topic: dummy_identifier(), + messages: vec![dummy_message(1)], + partitioning: None, + }; + let wrapped = ShardMessageWithPermits::new( + message, + permit_bytes.clone().acquire_many_owned(1).await.unwrap(), + permit_slot.clone().acquire_owned().await.unwrap(), + ); + shard.send(wrapped).await.unwrap(); + + let err_ctx = err_rx.recv_async().await.unwrap(); + assert_eq!(err_ctx.cause, "test error"); + assert_eq!(err_ctx.messages.len(), 1); + } + + #[tokio::test] + async fn test_shard_send_error_on_closed_channel() { + let (tx, rx) = flume::bounded::<ShardMessageWithPermits>(1); + drop(rx); + + let shard = Shard { + tx, + shutdown_notify: Arc::new(Notify::new()), + closed: Arc::new(AtomicBool::new(false)), + _handle: tokio::spawn(async {}), + }; + + let (permit_bytes, permit_slot) = ( + Arc::new(Semaphore::new(10_000)), + Arc::new(Semaphore::new(100)), + ); + + let message = ShardMessage { + stream: dummy_identifier(), + topic: dummy_identifier(), + messages: vec![dummy_message(1)], + partitioning: None, + }; + let wrapped = ShardMessageWithPermits::new( + message, + permit_bytes.clone().acquire_many_owned(1).await.unwrap(), + permit_slot.clone().acquire_owned().await.unwrap(), + ); + + let result = shard.send(wrapped).await; + assert!(matches!(result, Err(IggyError::BackgroundSendError))); + } +} diff --git a/core/sdk/src/stream_builder/build/build_iggy_producer.rs b/core/sdk/src/stream_builder/build/build_iggy_producer.rs index 9b9a1cdf..afe8d122 100644 --- a/core/sdk/src/stream_builder/build/build_iggy_producer.rs +++ b/core/sdk/src/stream_builder/build/build_iggy_producer.rs @@ -56,8 +56,6 @@ pub(crate) async fn build_iggy_producer( trace!("Build iggy producer"); let mut builder = client .producer(stream, topic)? - .batch_length(batch_length) - .linger_time(linger_time) .partitioning(partitioning) .create_stream_if_not_exists() .send_retries(send_retries, send_retries_interval) @@ -66,14 +64,15 @@ pub(crate) async fn build_iggy_producer( topic_replication_factor, IggyExpiry::ServerDefault, MaxTopicSize::ServerDefault, - ); + ) + .sync(|b| b.batch_length(batch_length).linger_time(linger_time)); if let Some(encryptor) = config.encryptor() { builder = builder.encryptor(encryptor); } trace!("Initialize iggy producer"); - let mut producer = builder.build(); + let producer = builder.build(); producer.init().await.map_err(|err| { error!("Failed to initialize consumer: {err}"); err
