This is an automated email from the ASF dual-hosted git repository.
bashirbekov pushed a commit to branch feat/add-background-send
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/feat/add-background-send by
this push:
new 77c25b83 del
77c25b83 is described below
commit 77c25b83d1bb41d6efccf0e58d48240ef12696bf
Author: haze518 <[email protected]>
AuthorDate: Mon Jun 2 10:10:28 2025 +0600
del
---
core/sdk/src/clients/producer.rs | 166 +++++++++++++++-------------
core/sdk/src/clients/producer_builder.rs | 36 +++---
core/sdk/src/clients/producer_dispatcher.rs | 120 +++++++-------------
3 files changed, 151 insertions(+), 171 deletions(-)
diff --git a/core/sdk/src/clients/producer.rs b/core/sdk/src/clients/producer.rs
index ed22d5a3..cb0b77ce 100644
--- a/core/sdk/src/clients/producer.rs
+++ b/core/sdk/src/clients/producer.rs
@@ -17,6 +17,7 @@
*/
use super::{MAX_BATCH_LENGTH, ORDERING};
+use super::producer_dispatcher::{BackgroundConfig, BalancedSharding,
ProducerDispatcher, Sharding};
use bytes::Bytes;
use futures_util::StreamExt;
use iggy_binary_protocol::Client;
@@ -25,9 +26,7 @@ use iggy_common::{
CompressionAlgorithm, DiagnosticEvent, EncryptorKind, IdKind, Identifier,
IggyDuration,
IggyError, IggyExpiry, IggyMessage, IggyTimestamp, MaxTopicSize,
Partitioner, Partitioning,
};
-use super::producer_dispatcher::{ProducerDispatcher, BalancedSharding,
Sharding};
use std::fmt::Debug;
-use std::marker::PhantomData;
use std::sync::Arc;
use std::sync::atomic::Ordering;
use std::sync::atomic::{AtomicBool, AtomicU64};
@@ -49,6 +48,11 @@ pub trait AsyncSendStrategy: SendStrategy {
fn flush(&self) -> Result<(), IggyError>;
}
+pub(crate) enum SendMode {
+ Direct,
+ Chunked,
+}
+
pub struct ProducerCore {
initialized: AtomicBool,
can_send: Arc<AtomicBool>,
@@ -184,74 +188,77 @@ impl ProducerCore {
});
}
-// если не надо будет разбивать на чанки, то можем не кидать msg в ошибке
-// достаточно будет добавить функцию, которая будет принимать массив.
-// Тогда мы сможем передавать массив для отправки туда, при этом сохраняя
доступ до сообщений.
-// Надо будет подумать над этим.
-//
-pub(crate) async fn send_internal(
- &self,
- stream: &Identifier,
- topic: &Identifier,
- mut msgs: Vec<IggyMessage>,
- partitioning: Option<Arc<Partitioning>>,
- split_chunks: bool
-) -> Result<(), IggyError> {
- if msgs.is_empty() {
- return Ok(());
- }
-
- if let Err(err) = self.encrypt_messages(&mut msgs) {
- return Err(IggyError::ProducerSendFailed {
- cause: err.to_string(),
- failed: Arc::new(msgs),
- });
- }
+ pub(crate) async fn send_internal(
+ &self,
+ stream: &Identifier,
+ topic: &Identifier,
+ mut msgs: Vec<IggyMessage>,
+ partitioning: Option<Arc<Partitioning>>,
+ mode: SendMode,
+ ) -> Result<(), IggyError> {
+ if msgs.is_empty() {
+ return Ok(());
+ }
- let part = match self.get_partitioning(stream, topic, &msgs,
partitioning.clone()) {
- Ok(p) => p,
- Err(err) => {
+ if let Err(err) = self.encrypt_messages(&mut msgs) {
return Err(IggyError::ProducerSendFailed {
- cause: err.to_string(),
+ cause: err.to_string(),
failed: Arc::new(msgs),
});
}
- };
-
- if !self.can_send_immediately && self.linger_time_micros > 0 {
- Self::wait_before_sending(self.linger_time_micros,
- self.last_sent_at.load(ORDERING)).await;
- }
-
- if !split_chunks {
- self.try_send_messages(stream, topic, &part, &mut
msgs).await.map_err(|err| IggyError::ProducerSendFailed {
- cause: err.to_string(),
- failed: Arc::new(msgs),
- })?;
- self.last_sent_at.store(IggyTimestamp::now().into(), ORDERING);
- return Ok(())
- }
- let max = self.batch_length.unwrap_or(MAX_BATCH_LENGTH);
+ let part = match self.get_partitioning(stream, topic, &msgs,
partitioning.clone()) {
+ Ok(p) => p,
+ Err(err) => {
+ return Err(IggyError::ProducerSendFailed {
+ cause: err.to_string(),
+ failed: Arc::new(msgs),
+ });
+ }
+ };
- let mut index = 0;
- while index < msgs.len() {
- let end = (index + max).min(msgs.len());
- let chunk = &mut msgs[index..end];
+ match mode {
+ SendMode::Direct => {
+ self.try_send_messages(stream, topic, &part, &mut msgs)
+ .await
+ .map_err(|err| IggyError::ProducerSendFailed {
+ cause: err.to_string(),
+ failed: Arc::new(msgs),
+ })?;
+ self.last_sent_at
+ .store(IggyTimestamp::now().into(), ORDERING);
+ }
+ SendMode::Chunked => {
+ if !self.can_send_immediately && self.linger_time_micros > 0 {
+ Self::wait_before_sending(
+ self.linger_time_micros,
+ self.last_sent_at.load(ORDERING),
+ )
+ .await;
+ }
- if let Err(err) = self.try_send_messages(stream, topic, &part,
chunk).await {
- let failed_tail = msgs.split_off(index);
- return Err(IggyError::ProducerSendFailed {
- cause: err.to_string(),
- failed: Arc::new(failed_tail),
- });
+ let max = self.batch_length.unwrap_or(MAX_BATCH_LENGTH);
+ let mut index = 0;
+ while index < msgs.len() {
+ let end = (index + max).min(msgs.len());
+ let chunk = &mut msgs[index..end];
+
+ if let Err(err) = self.try_send_messages(stream, topic,
&part, chunk).await {
+ let failed_tail = msgs.split_off(index);
+ return Err(IggyError::ProducerSendFailed {
+ cause: err.to_string(),
+ failed: Arc::new(failed_tail),
+ });
+ }
+ self.last_sent_at
+ .store(IggyTimestamp::now().into(), ORDERING);
+ index = end;
+ }
+ }
}
- self.last_sent_at.store(IggyTimestamp::now().into(), ORDERING);
- index = end;
- }
- Ok(())
-}
+ Ok(())
+ }
async fn try_send_messages(
&self,
@@ -424,11 +431,11 @@ pub(crate) async fn send_internal(
}
pub struct ErrorCtx {
- pub cause: String,
- pub stream: Arc<Identifier>,
- pub topic: Arc<Identifier>,
+ pub cause: String,
+ pub stream: Arc<Identifier>,
+ pub topic: Arc<Identifier>,
pub partitioning: Option<Arc<Partitioning>>,
- pub messages: Arc<Vec<IggyMessage>>,
+ pub messages: Arc<Vec<IggyMessage>>,
}
pub trait ErrorCallback: Send + Sync + Debug + 'static {
@@ -446,7 +453,8 @@ impl std::fmt::Debug for LogErrorCallback {
impl ErrorCallback for LogErrorCallback {
fn call(&self, ctx: ErrorCtx) -> impl Future<Output = ()> + Send {
async move {
- let partitioning = ctx.partitioning
+ let partitioning = ctx
+ .partitioning
.as_ref()
.map(|p| format!("{:?}", p))
.unwrap_or_else(|| "None".to_string());
@@ -463,15 +471,12 @@ impl ErrorCallback for LogErrorCallback {
}
}
-
unsafe impl<S: Sharding, E: ErrorCallback> Send for IggyProducer<S, E> {}
unsafe impl<S: Sharding, E: ErrorCallback> Sync for IggyProducer<S, E> {}
pub struct IggyProducer<S: Sharding, E: ErrorCallback> {
core: Arc<ProducerCore>,
dispatcher: Option<ProducerDispatcher<S, E>>,
-
- _phantom: PhantomData<E>,
}
impl<S, E> IggyProducer<S, E>
@@ -527,10 +532,8 @@ where
send_retries_interval,
});
let dispatcher = match background_config {
- Some(config) => {
- Some(ProducerDispatcher::new(core.clone(), config))
- }
- None => None
+ Some(config) => Some(ProducerDispatcher::new(core.clone(),
config)),
+ None => None,
};
Self {
@@ -565,7 +568,11 @@ where
match &self.dispatcher {
Some(disp) => disp.dispatch(messages, stream_id, topic_id,
None).await,
- None => self.core.send_internal(&stream_id, &topic_id, messages,
None, true).await,
+ None => {
+ self.core
+ .send_internal(&stream_id, &topic_id, messages, None,
SendMode::Chunked)
+ .await
+ }
}
}
@@ -587,8 +594,15 @@ where
let topic_id = self.core.topic_id.clone();
match &self.dispatcher {
- Some(disp) => disp.dispatch(messages, stream_id, topic_id,
partitioning).await,
- None => self.core.send_internal(&stream_id, &topic_id, messages,
partitioning, true).await,
+ Some(disp) => {
+ disp.dispatch(messages, stream_id, topic_id, partitioning)
+ .await
+ }
+ None => {
+ self.core
+ .send_internal(&stream_id, &topic_id, messages,
partitioning, SendMode::Chunked)
+ .await
+ }
}
}
@@ -606,7 +620,7 @@ where
// todo add send via dispatcher
self.core
- .send_internal(&stream, &topic, messages, partitioning, true)
+ .send_internal(&stream, &topic, messages, partitioning,
SendMode::Chunked)
.await
}
diff --git a/core/sdk/src/clients/producer_builder.rs
b/core/sdk/src/clients/producer_builder.rs
index 89843800..3cadd295 100644
--- a/core/sdk/src/clients/producer_builder.rs
+++ b/core/sdk/src/clients/producer_builder.rs
@@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.
+use super::producer_dispatcher::{BackgroundConfig, Sharding};
use super::MAX_BATCH_LENGTH;
use crate::prelude::{IggyProducer, ErrorCallback};
use iggy_binary_protocol::Client;
@@ -24,8 +25,18 @@ use iggy_common::{
};
use std::sync::Arc;
+pub struct Unset;
+pub struct Sync;
+pub struct Bg;
+
+
+
#[derive(Debug)]
-pub struct IggyProducerBuilder {
+pub struct IggyProducerBuilder<S, E>
+where
+ S: Sharding,
+ E: ErrorCallback,
+{
client: IggySharedMut<Box<dyn Client>>,
stream: Identifier,
stream_name: String,
@@ -44,10 +55,14 @@ pub struct IggyProducerBuilder {
send_retries_interval: Option<IggyDuration>,
topic_message_expiry: IggyExpiry,
topic_max_size: MaxTopicSize,
- error_callback: Option<Arc<dyn ErrorCallback>>
+ background_config: Option<BackgroundConfig<S, E>>,
}
-impl IggyProducerBuilder {
+impl<S, E> IggyProducerBuilder<S, E>
+where
+ S: Sharding,
+ E: ErrorCallback,
+{
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
client: IggySharedMut<Box<dyn Client>>,
@@ -77,7 +92,7 @@ impl IggyProducerBuilder {
topic_max_size: MaxTopicSize::ServerDefault,
send_retries_count: Some(3),
send_retries_interval: Some(IggyDuration::ONE_SECOND),
- error_callback: None,
+ background_config: None,
}
}
@@ -103,6 +118,10 @@ impl IggyProducerBuilder {
}
}
+ pub fn with_background_config(self) -> Self {
+
+ }
+
/// Clears the batch size.
pub fn without_batch_length(self) -> Self {
Self {
@@ -127,13 +146,6 @@ impl IggyProducerBuilder {
}
}
- pub fn error_callback(self, cb: Arc<dyn ErrorCallback>) -> Self {
- Self {
- error_callback: Some(cb),
- ..self
- }
- }
-
/// Sets the encryptor for encrypting the messages' payloads.
pub fn encryptor(self, encryptor: Arc<EncryptorKind>) -> Self {
Self {
@@ -258,8 +270,6 @@ impl IggyProducerBuilder {
self.topic_max_size,
self.send_retries_count,
self.send_retries_interval,
- self.send_mode,
- self.error_callback,
)
}
}
diff --git a/core/sdk/src/clients/producer_dispatcher.rs
b/core/sdk/src/clients/producer_dispatcher.rs
index 818c958c..ecc19212 100644
--- a/core/sdk/src/clients/producer_dispatcher.rs
+++ b/core/sdk/src/clients/producer_dispatcher.rs
@@ -1,13 +1,18 @@
-use std::{sync::{
- atomic::{AtomicBool, AtomicUsize, Ordering}, Arc
-}, time::Duration};
+use std::{
+ sync::{
+ Arc,
+ atomic::{AtomicBool, AtomicUsize, Ordering},
+ },
+ time::Duration,
+};
+use futures::FutureExt;
use iggy_common::{
Identifier, IggyByteSize, IggyDuration, IggyError, IggyMessage,
Partitioning, Sizeable,
};
use tokio::{sync::Notify, task::JoinHandle};
use tracing::error;
-use futures::FutureExt;
+use super::producer::{ErrorCtx, ProducerCore, SendMode};
use crate::prelude::ErrorCallback;
@@ -25,18 +30,15 @@ pub enum BackpressureMode {
#[derive(Debug, Clone)]
pub struct BackgroundConfig<S: Sharding, E: ErrorCallback> {
pub max_in_flight: usize,
- pub in_flight_timeout: Option<IggyDuration>,
pub batch_size: Option<usize>,
pub batch_length: Option<usize>,
pub failure_mode: BackpressureMode,
- pub buffer_size: Option<IggyByteSize>,
- pub linger_time: Option<IggyDuration>,
+ pub max_buffer_size: Option<IggyByteSize>,
+ pub linger_time: IggyDuration,
pub error_callback: Arc<E>,
pub sharding: Box<S>,
}
-use super::{producer::{ErrorCtx, ProducerCore}};
-
#[derive(Debug)]
pub struct ShardMessage {
pub stream: Arc<Identifier>,
@@ -64,7 +66,7 @@ pub struct Shard {
}
impl Shard {
- pub fn new_(
+ pub fn new(
core: Arc<ProducerCore>,
global_buffer: Arc<AtomicUsize>,
config: Arc<BackgroundConfig<impl Sharding, impl ErrorCallback>>,
@@ -75,10 +77,11 @@ impl Shard {
let handle = tokio::spawn(async move {
let mut buffer = Vec::new();
- let mut buffer_bytes = 0usize;
+ let mut buffer_bytes = 0;
let mut last_flush = tokio::time::Instant::now();
loop {
+ let deadline = last_flush + config.linger_time.get_duration();
tokio::select! {
maybe_msg = rx.recv_async() => {
match maybe_msg {
@@ -99,7 +102,7 @@ impl Shard {
Err(_) => break,
}
}
- _ =
tokio::time::sleep(config.linger_time.map_or(Duration::from_secs(u64::MAX), |d|
d.get_duration())) => {
+ _ = tokio::time::sleep_until(deadline) => {
if !buffer.is_empty() {
Self::flush_buffer(&core, &mut buffer, &mut
buffer_bytes, &global_buffer, &err_sender, ¬ify).await;
last_flush = tokio::time::Instant::now();
@@ -108,6 +111,11 @@ impl Shard {
}
}
});
+
+ Self {
+ tx,
+ _handle: handle,
+ }
}
async fn flush_buffer(
@@ -121,7 +129,13 @@ impl Shard {
for msg in buffer.drain(..) {
let size = msg.get_size_bytes().as_bytes_usize();
if let Err(err) = core
- .send_internal(&msg.stream, &msg.topic, msg.messages,
msg.partitioning.clone(), false)
+ .send_internal(
+ &msg.stream,
+ &msg.topic,
+ msg.messages,
+ msg.partitioning.clone(),
+ SendMode::Direct,
+ )
.await
{
if let IggyError::ProducerSendFailed { failed, cause } = &err {
@@ -146,50 +160,6 @@ impl Shard {
*buffer_bytes = 0;
}
- pub fn new(
- core: Arc<ProducerCore>,
- current_buffered_size: Arc<AtomicUsize>,
- notify: Arc<Notify>,
- err_sender: flume::Sender<ErrorCtx>,
- ) -> Self {
- let (tx, rx) = flume::bounded::<ShardMessage>(10); // use from config
- let handle = tokio::spawn(async move {
- while let Ok(msg) = rx.recv_async().await {
- let size = msg.get_size_bytes();
- if let Err(err) = core
- .send_internal(
- &msg.stream,
- &msg.topic,
- msg.messages,
- msg.partitioning.clone(),
- )
- .await
- {
- if let IggyError::ProducerSendFailed { failed, cause } =
&err {
- let ctx = ErrorCtx {
- cause: cause.clone(),
- stream: msg.stream,
- topic: msg.topic,
- partitioning: msg.partitioning,
- messages: failed.clone(),
- };
- if err_sender.send_async(ctx).await.is_err() {
- tracing::warn!("error-queue receiver has been
dropped; lost error report");
- }
- } else {
- tracing::error!("background send failed: {err}");
- }
- }
- current_buffered_size.fetch_sub(size.as_bytes_usize(),
Ordering::Relaxed);
- notify.notify_waiters();
- }
- });
- Self {
- tx,
- _handle: handle,
- }
- }
-
async fn send_with_block(&self, message: ShardMessage) -> Result<(),
IggyError> {
self.tx.send_async(message).await.map_err(|e| {
error!("Failed to send_with_block: {e}");
@@ -265,7 +235,7 @@ impl Sharding for BalancedSharding {
pub struct ProducerDispatcher<S: Sharding, E: ErrorCallback> {
core: Arc<ProducerCore>,
shards: Vec<Shard>,
- current_buffered: Arc<AtomicUsize>, // todo добавить аналог, но по
length(in-flight)
+ global_buffer: Arc<AtomicUsize>,
notify: Arc<Notify>,
config: Arc<BackgroundConfig<S, E>>,
closed: AtomicBool,
@@ -277,13 +247,9 @@ where
S: Sharding,
E: ErrorCallback,
{
- pub fn new(
- core: Arc<ProducerCore>,
- config: BackgroundConfig<S, E>,
- ) -> Self {
+ pub fn new(core: Arc<ProducerCore>, config: BackgroundConfig<S, E>) ->
Self {
let mut shards = Vec::with_capacity(config.max_in_flight);
let current_buffered_size = Arc::new(AtomicUsize::new(0));
- // let current_buffered_length = Arc::new(AtomicUsize::new(0));
let notify = Arc::new(Notify::new());
let config = Arc::new(config);
@@ -305,6 +271,7 @@ where
shards.push(Shard::new(
core.clone(),
current_buffered_size.clone(),
+ config.clone(),
notify.clone(),
err_tx.clone(),
));
@@ -313,7 +280,7 @@ where
Self {
core,
shards,
- current_buffered: current_buffered_size,
+ global_buffer: current_buffered_size,
config,
notify,
closed: AtomicBool::new(false),
@@ -340,8 +307,8 @@ where
};
let batch_bytes = shard_message.get_size_bytes();
- let mut reserved = self.current_buffered.load(Ordering::Relaxed);
- if let Some(buffer_size) = &self.config.buffer_size {
+ let mut reserved = self.global_buffer.load(Ordering::Relaxed);
+ if let Some(buffer_size) = &self.config.max_buffer_size {
if batch_bytes.as_bytes_usize() > buffer_size.as_bytes_usize() {
return Err(IggyError::BackgroundSendBufferOverflow);
}
@@ -368,7 +335,7 @@ where
}
};
}
- match self.current_buffered.compare_exchange(
+ match self.global_buffer.compare_exchange(
reserved,
reserved + batch_bytes.as_bytes_usize(),
Ordering::AcqRel,
@@ -386,20 +353,9 @@ where
&shard_message.stream,
&shard_message.topic,
);
- let shard = self.shards.get(shard_ix).unwrap();
-
- let result = match self.config.failure_mode {
- BackpressureMode::Block =>
shard.send_with_block(shard_message).await,
- BackpressureMode::BlockWithTimeout(t) => {
- shard.send_with_timeout(shard_message, t).await
- }
- BackpressureMode::FailImmediately =>
shard.send_with_fail(shard_message),
- };
- if result.is_err() {
- self.current_buffered
- .fetch_sub(batch_bytes.as_bytes_usize(), Ordering::Relaxed);
- }
- result
+ debug_assert!(shard_ix < self.shards.len());
+ let shard = &self.shards[shard_ix];
+ shard.send_with_block(shard_message).await
}
pub async fn shutdown(mut self) {
@@ -414,6 +370,6 @@ where
futures::future::join_all(handles).await;
let _ = self._join_handle.await;
- debug_assert_eq!(self.current_buffered.load(Ordering::Relaxed), 0);
+ debug_assert_eq!(self.global_buffer.load(Ordering::Relaxed), 0);
}
}