This is an automated email from the ASF dual-hosted git repository. bashirbekov pushed a commit to branch feat/add-background-send in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 1263492d30cebf2515e7cc622b5c6d473d14fac0 Author: haze518 <[email protected]> AuthorDate: Mon Jun 2 06:24:15 2025 +0600 del --- core/common/src/error/iggy_error.rs | 11 +- .../tests/examples/test_new_publisher.rs | 3 +- core/sdk/src/clients/mod.rs | 1 - core/sdk/src/clients/producer.rs | 216 +++++++++++------- core/sdk/src/clients/producer_builder.rs | 10 - core/sdk/src/clients/producer_dispatcher.rs | 253 +++++++++++++++++---- core/sdk/src/clients/send_mode.rs | 131 ----------- 7 files changed, 352 insertions(+), 273 deletions(-) diff --git a/core/common/src/error/iggy_error.rs b/core/common/src/error/iggy_error.rs index 9fe20359..487224f1 100644 --- a/core/common/src/error/iggy_error.rs +++ b/core/common/src/error/iggy_error.rs @@ -16,8 +16,10 @@ * under the License. */ -use crate::utils::byte_size::IggyByteSize; +use std::sync::Arc; + use crate::utils::topic_size::MaxTopicSize; +use crate::{IggyMessage, utils::byte_size::IggyByteSize}; use strum::{EnumDiscriminants, FromRepr, IntoStaticStr}; use thiserror::Error; @@ -371,6 +373,13 @@ pub enum IggyError { BackgroundWorkerDisconnected = 4054, #[error("Background send buffer overflow")] BackgroundSendBufferOverflow = 4055, + #[error("Producer send failed")] + ProducerSendFailed { + cause: String, + failed: Arc<Vec<IggyMessage>>, + } = 4056, + #[error("Producer closed")] + ProducerClosed = 4057, #[error("Invalid offset: {0}")] InvalidOffset(u64) = 4100, #[error("Consumer group with ID: {0} for topic with ID: {1} was not found.")] diff --git a/core/integration/tests/examples/test_new_publisher.rs b/core/integration/tests/examples/test_new_publisher.rs index d9a2517e..5e425e2d 100644 --- a/core/integration/tests/examples/test_new_publisher.rs +++ b/core/integration/tests/examples/test_new_publisher.rs @@ -6,7 +6,6 @@ use std::time::Instant; use bytes::Bytes; use futures::StreamExt; -use iggy::clients::send_mode::{BackgroundConfig, BackpressureMode, SendMode}; use iggy::prelude::defaults::*; use iggy::prelude::*; use iggy::{clients::client::IggyClient, prelude::TcpClient}; @@ -45,7 +44,7 @@ async fn test_new_publisher() { .producer("1", "1") .unwrap() .batch_length(10) - .send_mode(SendMode::Background) + // .send_mode(SendMode::Background) .build(); producer.init().await.unwrap(); diff --git a/core/sdk/src/clients/mod.rs b/core/sdk/src/clients/mod.rs index 6f8f5f7c..7524e9e5 100644 --- a/core/sdk/src/clients/mod.rs +++ b/core/sdk/src/clients/mod.rs @@ -32,7 +32,6 @@ pub mod consumer; pub mod consumer_builder; pub mod producer; pub mod producer_builder; -pub mod send_mode; mod producer_dispatcher; const ORDERING: std::sync::atomic::Ordering = std::sync::atomic::Ordering::SeqCst; diff --git a/core/sdk/src/clients/producer.rs b/core/sdk/src/clients/producer.rs index 2b45ae38..ed22d5a3 100644 --- a/core/sdk/src/clients/producer.rs +++ b/core/sdk/src/clients/producer.rs @@ -1,4 +1,3 @@ -use super::send_mode::{BackgroundConfig, BackpressureMode, Dispatcher, SendMode, shardMessage}; /* Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -26,13 +25,13 @@ use iggy_common::{ CompressionAlgorithm, DiagnosticEvent, EncryptorKind, IdKind, Identifier, IggyDuration, IggyError, IggyExpiry, IggyMessage, IggyTimestamp, MaxTopicSize, Partitioner, Partitioning, }; +use super::producer_dispatcher::{ProducerDispatcher, BalancedSharding, Sharding}; use std::fmt::Debug; +use std::marker::PhantomData; use std::sync::Arc; use std::sync::atomic::Ordering; use std::sync::atomic::{AtomicBool, AtomicU64}; use std::time::Duration; -use tokio::sync::Semaphore; -use tokio::task::JoinHandle; use tokio::time::{Interval, sleep}; use tracing::{error, info, trace, warn}; @@ -74,12 +73,6 @@ pub struct ProducerCore { last_sent_at: Arc<AtomicU64>, send_retries_count: Option<u32>, send_retries_interval: Option<IggyDuration>, - - _join_handle: Option<JoinHandle<()>>, - sema: Arc<Semaphore>, - sender: Option<Arc<flume::Sender<Vec<IggyMessage>>>>, - error_callback: Option<Arc<dyn ErrorCallback>>, - shard_number: usize, } impl ProducerCore { @@ -191,37 +184,75 @@ impl ProducerCore { }); } - pub(crate) async fn send_internal( - &self, - stream: &Identifier, - topic: &Identifier, - mut msgs: Vec<IggyMessage>, - partitioning: Option<Arc<Partitioning>>, - ) -> Result<(), IggyError> { - if msgs.is_empty() { - return Ok(()); +// если не надо будет разбивать на чанки, то можем не кидать msg в ошибке +// достаточно будет добавить функцию, которая будет принимать массив. +// Тогда мы сможем передавать массив для отправки туда, при этом сохраняя доступ до сообщений. +// Надо будет подумать над этим. +// +pub(crate) async fn send_internal( + &self, + stream: &Identifier, + topic: &Identifier, + mut msgs: Vec<IggyMessage>, + partitioning: Option<Arc<Partitioning>>, + split_chunks: bool +) -> Result<(), IggyError> { + if msgs.is_empty() { + return Ok(()); + } + + if let Err(err) = self.encrypt_messages(&mut msgs) { + return Err(IggyError::ProducerSendFailed { + cause: err.to_string(), + failed: Arc::new(msgs), + }); + } + + let part = match self.get_partitioning(stream, topic, &msgs, partitioning.clone()) { + Ok(p) => p, + Err(err) => { + return Err(IggyError::ProducerSendFailed { + cause: err.to_string(), + failed: Arc::new(msgs), + }); } + }; - self.encrypt_messages(&mut msgs)?; + if !self.can_send_immediately && self.linger_time_micros > 0 { + Self::wait_before_sending(self.linger_time_micros, + self.last_sent_at.load(ORDERING)).await; + } - let part = self.get_partitioning(stream, topic, &msgs, partitioning)?; + if !split_chunks { + self.try_send_messages(stream, topic, &part, &mut msgs).await.map_err(|err| IggyError::ProducerSendFailed { + cause: err.to_string(), + failed: Arc::new(msgs), + })?; + self.last_sent_at.store(IggyTimestamp::now().into(), ORDERING); + return Ok(()) + } - // todo add batch_size or batch_length - let max = self.batch_length.unwrap_or(MAX_BATCH_LENGTH); + let max = self.batch_length.unwrap_or(MAX_BATCH_LENGTH); - if !self.can_send_immediately && self.linger_time_micros > 0 { - Self::wait_before_sending(self.linger_time_micros, self.last_sent_at.load(ORDERING)) - .await; - } + let mut index = 0; + while index < msgs.len() { + let end = (index + max).min(msgs.len()); + let chunk = &mut msgs[index..end]; - for chunk in msgs.chunks_mut(max) { - self.last_sent_at - .store(IggyTimestamp::now().into(), ORDERING); - self.try_send_messages(stream, topic, &part, chunk).await?; + if let Err(err) = self.try_send_messages(stream, topic, &part, chunk).await { + let failed_tail = msgs.split_off(index); + return Err(IggyError::ProducerSendFailed { + cause: err.to_string(), + failed: Arc::new(failed_tail), + }); } - Ok(()) + self.last_sent_at.store(IggyTimestamp::now().into(), ORDERING); + index = end; } + Ok(()) +} + async fn try_send_messages( &self, stream: &Identifier, @@ -392,20 +423,62 @@ impl ProducerCore { } } -pub trait ErrorCallback: Send + Sync + Debug { - fn call(&self, error: IggyError, messages: Vec<IggyMessage>); +pub struct ErrorCtx { + pub cause: String, + pub stream: Arc<Identifier>, + pub topic: Arc<Identifier>, + pub partitioning: Option<Arc<Partitioning>>, + pub messages: Arc<Vec<IggyMessage>>, +} + +pub trait ErrorCallback: Send + Sync + Debug + 'static { + fn call(&self, ctx: ErrorCtx) -> impl Future<Output = ()> + Send; +} + +pub struct LogErrorCallback; + +impl std::fmt::Debug for LogErrorCallback { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("LogErrorCallback").finish() + } +} + +impl ErrorCallback for LogErrorCallback { + fn call(&self, ctx: ErrorCtx) -> impl Future<Output = ()> + Send { + async move { + let partitioning = ctx.partitioning + .as_ref() + .map(|p| format!("{:?}", p)) + .unwrap_or_else(|| "None".to_string()); + + error!( + cause = ctx.cause, + stream = %ctx.stream, + topic = %ctx.topic, + partitioning = %partitioning, + num_messages = ctx.messages.len(), + "Failed to send messages in background task", + ); + } + } } -unsafe impl Send for IggyProducer {} -unsafe impl Sync for IggyProducer {} -pub struct IggyProducer { +unsafe impl<S: Sharding, E: ErrorCallback> Send for IggyProducer<S, E> {} +unsafe impl<S: Sharding, E: ErrorCallback> Sync for IggyProducer<S, E> {} + +pub struct IggyProducer<S: Sharding, E: ErrorCallback> { core: Arc<ProducerCore>, - send_mode: SendMode, - dispatcher: Option<Dispatcher>, + dispatcher: Option<ProducerDispatcher<S, E>>, + + _phantom: PhantomData<E>, } -impl IggyProducer { +impl<S, E> IggyProducer<S, E> +where + S: Sharding, + E: ErrorCallback, +{ #[allow(clippy::too_many_arguments)] pub(crate) fn new( client: IggySharedMut<Box<dyn Client>>, @@ -426,8 +499,7 @@ impl IggyProducer { topic_max_size: MaxTopicSize, send_retries_count: Option<u32>, send_retries_interval: Option<IggyDuration>, - send_mode: SendMode, - error_callback: Option<Arc<dyn ErrorCallback>>, + background_config: Option<BackgroundConfig<S, E>>, ) -> Self { let core = Arc::new(ProducerCore { initialized: AtomicBool::new(true), @@ -453,27 +525,16 @@ impl IggyProducer { last_sent_at: Arc::new(AtomicU64::new(0)), send_retries_count, send_retries_interval, - _join_handle: None, - sema: Arc::new(Semaphore::new(10)), - sender: None, - error_callback, - shard_number: default_shard_count(), }); - let num_shards = default_shard_count(); // todo потом заменмить на норм значепние - let config = BackgroundConfig { - max_in_flight: 4, - in_flight_timeout: None, - batch_size: None, - failure_mode: BackpressureMode::Block, + let dispatcher = match background_config { + Some(config) => { + Some(ProducerDispatcher::new(core.clone(), config)) + } + None => None }; - let mut dispatcher = None; - if send_mode == SendMode::Background { - dispatcher = Some(Dispatcher::new(core.clone(), num_shards, Arc::new(config))); - } Self { core, - send_mode, dispatcher: dispatcher, } } @@ -502,24 +563,9 @@ impl IggyProducer { let stream_id = self.core.stream_id.clone(); let topic_id = self.core.topic_id.clone(); - match &self.send_mode { - SendMode::Sync => { - self.core - .send_internal(&stream_id, &topic_id, messages, None) - .await - } - SendMode::Background => match &self.dispatcher { - Some(disp) => disp - .dispatch(shardMessage { - messages, - stream: stream_id, - topic: topic_id, - partitioning: None, - }) - .await - .map_err(|err| IggyError::BackgroundSendError), - None => Ok(()), - }, + match &self.dispatcher { + Some(disp) => disp.dispatch(messages, stream_id, topic_id, None).await, + None => self.core.send_internal(&stream_id, &topic_id, messages, None, true).await, } } @@ -537,12 +583,13 @@ impl IggyProducer { return Ok(()); } - let stream_id = &self.core.stream_id; - let topic_id = &self.core.topic_id; + let stream_id = self.core.stream_id.clone(); + let topic_id = self.core.topic_id.clone(); - self.core - .send_internal(stream_id, topic_id, messages, partitioning) - .await + match &self.dispatcher { + Some(disp) => disp.dispatch(messages, stream_id, topic_id, partitioning).await, + None => self.core.send_internal(&stream_id, &topic_id, messages, partitioning, true).await, + } } pub async fn send_to( @@ -557,10 +604,17 @@ impl IggyProducer { return Ok(()); } + // todo add send via dispatcher self.core - .send_internal(&stream, &topic, messages, partitioning) + .send_internal(&stream, &topic, messages, partitioning, true) .await } + + pub async fn shutdown(self) { + if let Some(disp) = self.dispatcher { + disp.shutdown().await; + } + } } fn default_shard_count() -> usize { diff --git a/core/sdk/src/clients/producer_builder.rs b/core/sdk/src/clients/producer_builder.rs index 4d9590f4..89843800 100644 --- a/core/sdk/src/clients/producer_builder.rs +++ b/core/sdk/src/clients/producer_builder.rs @@ -15,7 +15,6 @@ // specific language governing permissions and limitations // under the License. -use super::send_mode::{self, SendMode}; use super::MAX_BATCH_LENGTH; use crate::prelude::{IggyProducer, ErrorCallback}; use iggy_binary_protocol::Client; @@ -37,7 +36,6 @@ pub struct IggyProducerBuilder { encryptor: Option<Arc<EncryptorKind>>, partitioner: Option<Arc<dyn Partitioner>>, linger_time: Option<IggyDuration>, - send_mode: SendMode, create_stream_if_not_exists: bool, create_topic_if_not_exists: bool, topic_partitions_count: u32, @@ -70,7 +68,6 @@ impl IggyProducerBuilder { partitioning: None, encryptor, partitioner, - send_mode: SendMode::default(), linger_time: Some(IggyDuration::from(1000)), create_stream_if_not_exists: true, create_topic_if_not_exists: true, @@ -130,13 +127,6 @@ impl IggyProducerBuilder { } } - pub fn send_mode(self, send_mode: SendMode) -> Self { - Self { - send_mode, - ..self - } - } - pub fn error_callback(self, cb: Arc<dyn ErrorCallback>) -> Self { Self { error_callback: Some(cb), diff --git a/core/sdk/src/clients/producer_dispatcher.rs b/core/sdk/src/clients/producer_dispatcher.rs index 2f6a25d7..818c958c 100644 --- a/core/sdk/src/clients/producer_dispatcher.rs +++ b/core/sdk/src/clients/producer_dispatcher.rs @@ -1,15 +1,41 @@ -use std::sync::{ - Arc, - atomic::{AtomicUsize, Ordering}, -}; +use std::{sync::{ + atomic::{AtomicBool, AtomicUsize, Ordering}, Arc +}, time::Duration}; use iggy_common::{ Identifier, IggyByteSize, IggyDuration, IggyError, IggyMessage, Partitioning, Sizeable, }; use tokio::{sync::Notify, task::JoinHandle}; use tracing::error; +use futures::FutureExt; + +use crate::prelude::ErrorCallback; + +#[derive(Debug, Clone)] +/// Determines how the `send_messages` API should behave when problem is encountered +pub enum BackpressureMode { + /// Block until the send succeeds + Block, + /// Block with a timeout, after which the send fails + BlockWithTimeout(IggyDuration), + /// Fail immediately without retrying + FailImmediately, +} + +#[derive(Debug, Clone)] +pub struct BackgroundConfig<S: Sharding, E: ErrorCallback> { + pub max_in_flight: usize, + pub in_flight_timeout: Option<IggyDuration>, + pub batch_size: Option<usize>, + pub batch_length: Option<usize>, + pub failure_mode: BackpressureMode, + pub buffer_size: Option<IggyByteSize>, + pub linger_time: Option<IggyDuration>, + pub error_callback: Arc<E>, + pub sharding: Box<S>, +} -use super::{producer::ProducerCore, send_mode::BackgroundConfig}; +use super::{producer::{ErrorCtx, ProducerCore}}; #[derive(Debug)] pub struct ShardMessage { @@ -38,23 +64,123 @@ pub struct Shard { } impl Shard { + pub fn new_( + core: Arc<ProducerCore>, + global_buffer: Arc<AtomicUsize>, + config: Arc<BackgroundConfig<impl Sharding, impl ErrorCallback>>, + notify: Arc<Notify>, + err_sender: flume::Sender<ErrorCtx>, + ) -> Self { + let (tx, rx) = flume::bounded::<ShardMessage>(1024); + + let handle = tokio::spawn(async move { + let mut buffer = Vec::new(); + let mut buffer_bytes = 0usize; + let mut last_flush = tokio::time::Instant::now(); + + loop { + tokio::select! { + maybe_msg = rx.recv_async() => { + match maybe_msg { + Ok(msg) => { + buffer_bytes += msg.get_size_bytes().as_bytes_usize(); + buffer.push(msg); + + let exceed_batch_len = config.batch_length + .map_or(false, |len| buffer.len() >= len); + let exceed_batch_size = config.batch_size + .map_or(false, |size| buffer_bytes >= size); + + if exceed_batch_len || exceed_batch_size { + Self::flush_buffer(&core, &mut buffer, &mut buffer_bytes, &global_buffer, &err_sender, ¬ify).await; + last_flush = tokio::time::Instant::now(); + } + } + Err(_) => break, + } + } + _ = tokio::time::sleep(config.linger_time.map_or(Duration::from_secs(u64::MAX), |d| d.get_duration())) => { + if !buffer.is_empty() { + Self::flush_buffer(&core, &mut buffer, &mut buffer_bytes, &global_buffer, &err_sender, ¬ify).await; + last_flush = tokio::time::Instant::now(); + } + } + } + } + }); + } + + async fn flush_buffer( + core: &Arc<ProducerCore>, + buffer: &mut Vec<ShardMessage>, + buffer_bytes: &mut usize, + global_buffer: &Arc<AtomicUsize>, + err_sender: &flume::Sender<ErrorCtx>, + notify: &Arc<Notify>, + ) { + for msg in buffer.drain(..) { + let size = msg.get_size_bytes().as_bytes_usize(); + if let Err(err) = core + .send_internal(&msg.stream, &msg.topic, msg.messages, msg.partitioning.clone(), false) + .await + { + if let IggyError::ProducerSendFailed { failed, cause } = &err { + let ctx = ErrorCtx { + cause: cause.clone(), + stream: msg.stream, + topic: msg.topic, + partitioning: msg.partitioning, + messages: failed.clone(), + }; + if err_sender.send_async(ctx).await.is_err() { + tracing::warn!("error-queue receiver has been dropped; lost error report"); + } + } else { + tracing::error!("background send failed: {err}"); + } + } + + global_buffer.fetch_sub(size, Ordering::Relaxed); + notify.notify_waiters(); + } + *buffer_bytes = 0; + } + pub fn new( core: Arc<ProducerCore>, - current_buffered: Arc<AtomicUsize>, + current_buffered_size: Arc<AtomicUsize>, notify: Arc<Notify>, + err_sender: flume::Sender<ErrorCtx>, ) -> Self { let (tx, rx) = flume::bounded::<ShardMessage>(10); // use from config let handle = tokio::spawn(async move { while let Ok(msg) = rx.recv_async().await { let size = msg.get_size_bytes(); - if let Err(e) = core - .send_internal(&msg.stream, &msg.topic, msg.messages, msg.partitioning) + if let Err(err) = core + .send_internal( + &msg.stream, + &msg.topic, + msg.messages, + msg.partitioning.clone(), + ) .await { - // send to err chan - // error!("{:?}", e); + if let IggyError::ProducerSendFailed { failed, cause } = &err { + let ctx = ErrorCtx { + cause: cause.clone(), + stream: msg.stream, + topic: msg.topic, + partitioning: msg.partitioning, + messages: failed.clone(), + }; + if err_sender.send_async(ctx).await.is_err() { + tracing::warn!("error-queue receiver has been dropped; lost error report"); + } + } else { + tracing::error!("background send failed: {err}"); + } } - current_buffered.fetch_sub(size.as_bytes_usize(), Ordering::Relaxed); + current_buffered_size.fetch_sub(size.as_bytes_usize(), Ordering::Relaxed); notify.notify_waiters(); } }); @@ -95,9 +221,14 @@ impl Shard { IggyError::BackgroundSendError }) } + + async fn shutdown(self) { + drop(self.tx); + let _ = self._handle.await; + } } -pub trait Sharding { +pub trait Sharding: Default + Send + Sync + 'static { fn pick_shard( &self, shards: &[Shard], @@ -107,11 +238,11 @@ pub trait Sharding { ) -> usize; } -pub struct RoundRobinSharding { +pub struct BalancedSharding { counter: AtomicUsize, } -impl Default for RoundRobinSharding { +impl Default for BalancedSharding { fn default() -> Self { Self { counter: AtomicUsize::new(0), @@ -119,7 +250,7 @@ impl Default for RoundRobinSharding { } } -impl Sharding for RoundRobinSharding { +impl Sharding for BalancedSharding { fn pick_shard( &self, shards: &[Shard], @@ -131,55 +262,62 @@ impl Sharding for RoundRobinSharding { } } -pub enum Backpressure { - /// Block until the send succeeds - Block, - /// Block with a timeout, after which the send fails - BlockWithTimeout(IggyDuration), - /// Fail immediately without retrying - FailImmediately, -} - -struct ProducerDispatcher<S: Sharding> { +pub struct ProducerDispatcher<S: Sharding, E: ErrorCallback> { core: Arc<ProducerCore>, - backpressure: Backpressure, - sharding: S, shards: Vec<Shard>, - current_buffered: Arc<AtomicUsize>, + current_buffered: Arc<AtomicUsize>, // todo добавить аналог, но по length(in-flight) notify: Arc<Notify>, - config: Arc<BackgroundConfig>, + config: Arc<BackgroundConfig<S, E>>, + closed: AtomicBool, + _join_handle: JoinHandle<()>, } -impl<S> ProducerDispatcher<S> +impl<S, E> ProducerDispatcher<S, E> where S: Sharding, + E: ErrorCallback, { pub fn new( core: Arc<ProducerCore>, - backpressure: Backpressure, - config: Arc<BackgroundConfig>, - sharding: S, + config: BackgroundConfig<S, E>, ) -> Self { let mut shards = Vec::with_capacity(config.max_in_flight); - let current_buffered = Arc::new(AtomicUsize::new(0)); + let current_buffered_size = Arc::new(AtomicUsize::new(0)); + // let current_buffered_length = Arc::new(AtomicUsize::new(0)); let notify = Arc::new(Notify::new()); + let config = Arc::new(config); + + let (err_tx, err_rx) = flume::unbounded::<ErrorCtx>(); + let err_callback = config.error_callback.clone(); + let handle = tokio::spawn(async move { + while let Ok(ctx) = err_rx.recv_async().await { + if let Err(panic) = std::panic::AssertUnwindSafe(err_callback.call(ctx)) + .catch_unwind() + .await + { + tracing::error!("error_callback panicked: {:?}", panic); + } + } + tracing::debug!("error-callback worker finished"); + }); for _ in 0..config.max_in_flight { shards.push(Shard::new( core.clone(), - current_buffered.clone(), + current_buffered_size.clone(), notify.clone(), + err_tx.clone(), )); } Self { core, - backpressure, - sharding, shards, - current_buffered, + current_buffered: current_buffered_size, config, notify, + closed: AtomicBool::new(false), + _join_handle: handle, } } @@ -190,6 +328,10 @@ where topic: Arc<Identifier>, partitioning: Option<Arc<Partitioning>>, ) -> Result<(), IggyError> { + if self.closed.load(Ordering::Relaxed) { + return Err(IggyError::ProducerClosed); + } + let shard_message = ShardMessage { messages, stream, @@ -207,12 +349,12 @@ where if buffer_size.as_bytes_usize() != 0 && reserved + batch_bytes.as_bytes_usize() > buffer_size.as_bytes_usize() { - match self.backpressure { - Backpressure::Block => { + match self.config.failure_mode { + BackpressureMode::Block => { self.notify.notified().await; continue; } - Backpressure::BlockWithTimeout(t) => { + BackpressureMode::BlockWithTimeout(t) => { if tokio::time::timeout(t.get_duration(), self.notify.notified()) .await .is_err() @@ -221,7 +363,7 @@ where } continue; } - Backpressure::FailImmediately => { + BackpressureMode::FailImmediately => { return Err(IggyError::BackgroundSendBufferOverflow); } }; @@ -238,7 +380,7 @@ where } } - let shard_ix = self.sharding.pick_shard( + let shard_ix = self.config.sharding.pick_shard( &self.shards, &shard_message.messages, &shard_message.stream, @@ -246,10 +388,12 @@ where ); let shard = self.shards.get(shard_ix).unwrap(); - let result = match self.backpressure { - Backpressure::Block => shard.send_with_block(shard_message).await, - Backpressure::BlockWithTimeout(t) => shard.send_with_timeout(shard_message, t).await, - Backpressure::FailImmediately => shard.send_with_fail(shard_message), + let result = match self.config.failure_mode { + BackpressureMode::Block => shard.send_with_block(shard_message).await, + BackpressureMode::BlockWithTimeout(t) => { + shard.send_with_timeout(shard_message, t).await + } + BackpressureMode::FailImmediately => shard.send_with_fail(shard_message), }; if result.is_err() { self.current_buffered @@ -257,4 +401,19 @@ where } result } + + pub async fn shutdown(mut self) { + if self.closed.swap(true, Ordering::Relaxed) { + return; + } + + let mut handles = Vec::with_capacity(self.shards.len()); + for shard in self.shards.drain(..) { + handles.push(shard.shutdown()); + } + + futures::future::join_all(handles).await; + let _ = self._join_handle.await; + debug_assert_eq!(self.current_buffered.load(Ordering::Relaxed), 0); + } } diff --git a/core/sdk/src/clients/send_mode.rs b/core/sdk/src/clients/send_mode.rs deleted file mode 100644 index 63145db3..00000000 --- a/core/sdk/src/clients/send_mode.rs +++ /dev/null @@ -1,131 +0,0 @@ -use std::sync::{atomic::{AtomicUsize, Ordering}, Arc}; - -use iggy_common::{Identifier, IggyByteSize, IggyDuration, IggyError, IggyMessage, Partitioning}; -use tokio::task::JoinHandle; -use tracing::error; - -use super::producer::ProducerCore; - -#[derive(Debug, Clone, Default, PartialEq)] -pub enum SendMode { - #[default] - Sync, - Background, -} - -#[derive(Debug, Clone)] -/// Determines how the `send_messages` API should behave when problem is encountered -pub enum BackpressureMode { - /// Block until the send succeeds - Block, - /// Block with a timeout, after which the send fails - BlockWithTimeout(IggyDuration), - /// Fail immediately without retrying - FailImmediately, -} - -#[derive(Debug, Clone)] -pub struct BackgroundConfig { - pub max_in_flight: usize, - pub in_flight_timeout: Option<IggyDuration>, - pub batch_size: Option<usize>, - pub failure_mode: BackpressureMode, - pub buffer_size: Option<IggyByteSize>, // rename: maximum_buffer_size -} - -pub struct shardMessage { - pub stream: Arc<Identifier>, - pub topic: Arc<Identifier>, - pub messages: Vec<IggyMessage>, - pub partitioning: Option<Arc<Partitioning>>, -} - -struct shard { - core: Arc<ProducerCore>, - tx: flume::Sender<shardMessage>, - _join_handle: JoinHandle<()>, -} - -impl shard { - fn new(id: usize, producer_core: Arc<ProducerCore>) -> Self { - let (tx, rx) = flume::bounded::<shardMessage>(10); // todo добавить размер в конфигурацию - let core = producer_core.clone(); - let handle = tokio::spawn(async move { - while let Ok(message) = rx.recv_async().await { - // todo поменять на match - core.send_internal(&message.stream, &message.topic, message.messages, message.partitioning).await.map_err(|e| { - error!("{e}"); - }).unwrap(); - } - }); - Self { - core: producer_core, - tx, - _join_handle: handle, - } - } - - async fn send( - &self, - message: shardMessage, - ) -> Result<(), IggyError> { - self.tx.send_async(message).await.map_err(|_| IggyError::BackgroundSendError) - } - - async fn send_timeout(&self, message: shardMessage, timeout: IggyDuration) -> Result<(), IggyError> { - match tokio::time::timeout(timeout.get_duration(), self.tx.send_async(message)).await { - Ok(Ok(())) => Ok(()), - Ok(Err(e)) => Err(IggyError::BackgroundSendTimeout), - Err(_) => Err(IggyError::BackgroundSendTimeout) - } - } - - fn send_with_fail(&self, message: shardMessage) -> Result<(), IggyError> { - self.tx.try_send(message).map_err(|_| IggyError::BackgroundSendError) - } -} - -pub struct Dispatcher { - config: Arc<BackgroundConfig>, - sender: flume::Sender<shardMessage>, - _join_handle: JoinHandle<()>, -} - -impl Dispatcher { - pub fn new(core: Arc<ProducerCore>, num_shards: usize, config: Arc<BackgroundConfig>) -> Self { - let mut shards = Vec::with_capacity(num_shards); - for i in 0..num_shards { - shards.push(shard::new(i, core.clone())); - } - - let (tx, rx) = flume::bounded::<shardMessage>(0); - let sent = AtomicUsize::new(0); - let inner_config = config.clone(); - let handle = tokio::spawn(async move { - loop { - if let Ok(msg) = rx.recv_async().await { - let ix = sent.fetch_add(1, Ordering::SeqCst) % num_shards; - let shard = shards.get(ix).unwrap(); - let result = match inner_config.failure_mode { - BackpressureMode::Block => shard.send(msg).await, - BackpressureMode::BlockWithTimeout(t) => shard.send_timeout(msg, t).await, - BackpressureMode::FailImmediately => shard.send_with_fail(msg), - }; - if let Err(e) = result { - // todo добавить канал для ошибок - error!("{}", e); - } - } - } - }); - Self { - config, - sender: tx, - _join_handle: handle, - } - } - - pub async fn dispatch(&self, msg: shardMessage) -> Result<(), flume::SendError<shardMessage>> { - self.sender.send_async(msg).await - } -}
