This is an automated email from the ASF dual-hosted git repository. bashirbekov pushed a commit to branch feat/add-background-send in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 1685b8798cd95f7b1b9db2784866d0e0f79d47ed Author: haze518 <[email protected]> AuthorDate: Thu May 29 06:06:08 2025 +0600 del --- core/sdk/src/clients/producer.rs | 133 +++++++++++++++++++++------------------ 1 file changed, 71 insertions(+), 62 deletions(-) diff --git a/core/sdk/src/clients/producer.rs b/core/sdk/src/clients/producer.rs index ba2ccc94..d9355358 100644 --- a/core/sdk/src/clients/producer.rs +++ b/core/sdk/src/clients/producer.rs @@ -1,4 +1,4 @@ -use super::send_mode::{shardMessage, BackgroundConfig, BackpressureMode, Dispatcher, SendMode}; +use super::send_mode::{BackgroundConfig, BackpressureMode, Dispatcher, SendMode, shardMessage}; /* Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -36,6 +36,20 @@ use tokio::task::JoinHandle; use tokio::time::{Interval, sleep}; use tracing::{error, info, trace, warn}; +pub trait SendStrategy: Send + Sync + 'static { + fn send_batch( + &self, + stream: &Identifier, + topic: &Identifier, + msgs: Vec<IggyMessage>, + partitioning: Option<Arc<Partitioning>>, + ) -> impl Future<Output = Result<(), IggyError>> + Send; +} + +pub trait AsyncSendStrategy: SendStrategy { + fn flush(&self) -> Result<(), IggyError>; +} + pub struct ProducerCore { initialized: AtomicBool, can_send: Arc<AtomicBool>, @@ -70,10 +84,11 @@ pub struct ProducerCore { impl ProducerCore { pub async fn init(&self) -> Result<(), IggyError> { - if self.initialized.compare_exchange( - false, true, - Ordering::SeqCst, Ordering::SeqCst, - ).is_err() { + if self + .initialized + .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst) + .is_err() + { return Ok(()); } @@ -376,7 +391,6 @@ impl ProducerCore { } } - pub trait ErrorCallback: Send + Sync + Debug { fn call(&self, error: IggyError, messages: Vec<IggyMessage>); } @@ -415,37 +429,37 @@ impl IggyProducer { error_callback: Option<Arc<dyn ErrorCallback>>, ) -> Self { let core = Arc::new(ProducerCore { - initialized: AtomicBool::new(true), - client: Arc::new(client), - can_send: Arc::new(AtomicBool::new(true)), - stream_id: Arc::new(stream), - stream_name, - topic_id: Arc::new(topic), - topic_name, - batch_length, - partitioning: partitioning.map(Arc::new), - encryptor, - partitioner, - linger_time_micros: linger_time.map_or(0, |i| i.as_micros()), - create_stream_if_not_exists, - create_topic_if_not_exists, - topic_partitions_count, - topic_replication_factor, - topic_message_expiry, - topic_max_size, - default_partitioning: Arc::new(Partitioning::balanced()), - can_send_immediately: linger_time.is_none(), - last_sent_at: Arc::new(AtomicU64::new(0)), - send_retries_count, - send_retries_interval, - _join_handle: None, - sema: Arc::new(Semaphore::new(10)), - sender: None, - error_callback, - shard_number: default_shard_count(), - }); + initialized: AtomicBool::new(true), + client: Arc::new(client), + can_send: Arc::new(AtomicBool::new(true)), + stream_id: Arc::new(stream), + stream_name, + topic_id: Arc::new(topic), + topic_name, + batch_length, + partitioning: partitioning.map(Arc::new), + encryptor, + partitioner, + linger_time_micros: linger_time.map_or(0, |i| i.as_micros()), + create_stream_if_not_exists, + create_topic_if_not_exists, + topic_partitions_count, + topic_replication_factor, + topic_message_expiry, + topic_max_size, + default_partitioning: Arc::new(Partitioning::balanced()), + can_send_immediately: linger_time.is_none(), + last_sent_at: Arc::new(AtomicU64::new(0)), + send_retries_count, + send_retries_interval, + _join_handle: None, + sema: Arc::new(Semaphore::new(10)), + sender: None, + error_callback, + shard_number: default_shard_count(), + }); let num_shards = default_shard_count(); // todo потом заменмить на норм значепние - let config = BackgroundConfig{ + let config = BackgroundConfig { max_in_flight: 4, in_flight_timeout: None, batch_size: None, @@ -489,24 +503,23 @@ impl IggyProducer { match &self.send_mode { SendMode::Sync => { - self.core.send_internal(&stream_id, &topic_id, messages, None).await - - } - SendMode::Background => { - match &self.dispatcher { - Some(disp) => { - disp.dispatch(shardMessage{ - messages, - stream: stream_id, - topic: topic_id, - partitioning: None, - }).await.map_err(|err| IggyError::BackgroundSendError) - } - None => Ok(()) - } + self.core + .send_internal(&stream_id, &topic_id, messages, None) + .await } + SendMode::Background => match &self.dispatcher { + Some(disp) => disp + .dispatch(shardMessage { + messages, + stream: stream_id, + topic: topic_id, + partitioning: None, + }) + .await + .map_err(|err| IggyError::BackgroundSendError), + None => Ok(()), + }, } - } pub async fn send_one(&self, message: IggyMessage) -> Result<(), IggyError> { @@ -526,7 +539,9 @@ impl IggyProducer { let stream_id = &self.core.stream_id; let topic_id = &self.core.topic_id; - self.core.send_internal(stream_id, topic_id, messages, partitioning).await + self.core + .send_internal(stream_id, topic_id, messages, partitioning) + .await } pub async fn send_to( @@ -541,7 +556,9 @@ impl IggyProducer { return Ok(()); } - self.core.send_internal(&stream, &topic, messages, partitioning).await + self.core + .send_internal(&stream, &topic, messages, partitioning) + .await } } @@ -549,11 +566,3 @@ fn default_shard_count() -> usize { let cpus = num_cpus::get(); cpus.clamp(2, 16) } - -fn default_sharder(msg: &IggyMessage) -> &[u8] { - if let Some(h) = &msg.user_headers { - return &h[..h.len().min(16)]; - } - - &msg.payload[..msg.payload.len().min(16)] -}
