This is an automated email from the ASF dual-hosted git repository.
bashirbekov pushed a commit to branch feat/add-background-send
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/feat/add-background-send by
this push:
new 0ae1eddd del
0ae1eddd is described below
commit 0ae1eddd38ea670a34756b45e6a54c27330186c7
Author: haze518 <[email protected]>
AuthorDate: Tue May 27 07:29:34 2025 +0600
del
---
core/sdk/src/clients/producer.rs | 552 ++++++++++++++++++++++-----------------
1 file changed, 316 insertions(+), 236 deletions(-)
diff --git a/core/sdk/src/clients/producer.rs b/core/sdk/src/clients/producer.rs
index 9ba3026d..e69c4808 100644
--- a/core/sdk/src/clients/producer.rs
+++ b/core/sdk/src/clients/producer.rs
@@ -19,6 +19,7 @@ use super::send_mode::{BackpressureMode, SendMode};
use super::{MAX_BATCH_LENGTH, ORDERING};
use bytes::Bytes;
+use dashmap::DashMap;
use futures_util::StreamExt;
use iggy_binary_protocol::Client;
use iggy_common::locking::{IggySharedMut, IggySharedMutFn};
@@ -28,41 +29,17 @@ use iggy_common::{
};
use std::fmt::Debug;
use std::sync::Arc;
+use std::sync::atomic::AtomicUsize;
+use std::sync::atomic::Ordering;
use std::sync::atomic::{AtomicBool, AtomicU64};
use std::time::{Duration, Instant};
use tokio::sync::Semaphore;
use tokio::task::JoinHandle;
use tokio::time::{Interval, sleep};
use tracing::{error, info, trace, warn};
-use dashmap::DashMap;
-use std::sync::atomic::AtomicUsize;
-use std::sync::atomic::Ordering;
-
-struct shard {
- tx: flume::Sender<Vec<IggyMessage>>,
- _join_handle: JoinHandle<()>,
-}
-
-impl shard {
- fn new(id: usize) -> Self {
- let (tx, rx) = flume::bounded::<Vec<IggyMessage>>(10); // todo
добавить размер в конфигурацию
- let handle = tokio::spawn(async move {
- while let Ok(message) = rx.recv_async().await { // todo поменять
на match
-
- }
- });
- }
-}
-
-pub trait ErrorCallback: Send + Sync + Debug {
- fn call(&self, error: IggyError, messages: Vec<IggyMessage>);
-}
-unsafe impl Send for IggyProducer {}
-unsafe impl Sync for IggyProducer {}
-
-pub struct IggyProducer {
- initialized: bool,
+struct producerCore {
+ initialized: AtomicBool,
can_send: Arc<AtomicBool>,
client: Arc<IggySharedMut<Box<dyn Client>>>,
stream_id: Arc<Identifier>,
@@ -92,79 +69,14 @@ pub struct IggyProducer {
sender: Option<Arc<flume::Sender<Vec<IggyMessage>>>>,
error_callback: Option<Arc<dyn ErrorCallback>>,
shard_number: usize,
- // todo добавить ShardStrategy
}
-impl IggyProducer {
- #[allow(clippy::too_many_arguments)]
- pub(crate) fn new(
- client: IggySharedMut<Box<dyn Client>>,
- stream: Identifier,
- stream_name: String,
- topic: Identifier,
- topic_name: String,
- batch_length: Option<usize>,
- partitioning: Option<Partitioning>,
- encryptor: Option<Arc<EncryptorKind>>,
- partitioner: Option<Arc<dyn Partitioner>>,
- linger_time: Option<IggyDuration>,
- create_stream_if_not_exists: bool,
- create_topic_if_not_exists: bool,
- topic_partitions_count: u32,
- topic_replication_factor: Option<u8>,
- topic_message_expiry: IggyExpiry,
- topic_max_size: MaxTopicSize,
- send_retries_count: Option<u32>,
- send_retries_interval: Option<IggyDuration>,
- send_mode: SendMode,
- error_callback: Option<Arc<dyn ErrorCallback>>,
- ) -> Self {
- Self {
- initialized: false,
- client: Arc::new(client),
- can_send: Arc::new(AtomicBool::new(true)),
- stream_id: Arc::new(stream),
- stream_name,
- topic_id: Arc::new(topic),
- topic_name,
- batch_length,
- partitioning: partitioning.map(Arc::new),
- send_mode: Arc::new(send_mode),
- encryptor,
- partitioner,
- linger_time_micros: linger_time.map_or(0, |i| i.as_micros()),
- create_stream_if_not_exists,
- create_topic_if_not_exists,
- topic_partitions_count,
- topic_replication_factor,
- topic_message_expiry,
- topic_max_size,
- default_partitioning: Arc::new(Partitioning::balanced()),
- can_send_immediately: linger_time.is_none(),
- last_sent_at: Arc::new(AtomicU64::new(0)),
- send_retries_count,
- send_retries_interval,
- _join_handle: None,
- sema: Arc::new(Semaphore::new(10)),
- sender: None,
- error_callback,
- shard_number: default_shard_count(),
- }
- }
-
- pub fn stream(&self) -> &Identifier {
- &self.stream_id
- }
-
- pub fn topic(&self) -> &Identifier {
- &self.topic_id
- }
-
- /// Initializes the producer by subscribing to diagnostic events, creating
the stream and topic if they do not exist etc.
- ///
- /// Note: This method must be invoked before producing messages.
- pub async fn init(&mut self) -> Result<(), IggyError> {
- if self.initialized {
+impl producerCore {
+ pub async fn init(&self) -> Result<(), IggyError> {
+ if self.initialized.compare_exchange(
+ false, true,
+ Ordering::SeqCst, Ordering::SeqCst,
+ ).is_err() {
return Ok(());
}
@@ -231,26 +143,14 @@ impl IggyProducer {
let client = self.client.clone();
let send_retries_count = self.send_retries_count.clone();
let send_retries_interval = self.send_retries_interval.clone();
- let can_send = self.can_send.clone();
let send_batches = AtomicUsize::new(0);
let num_shard = self.shard_number;
- // let sema = self.sema.clone();
+ let can_send = self.can_send.clone();
let handle = tokio::spawn(async move {
while let Ok(mut batch) = rx.recv_async().await {
- // let sema = sema.clone();
- // let partitioner = partitioner.clone();
- // let stream_id = stream_id.clone();
- // let topic_id = topic_id.clone();
- // let partitioning = partitioning.clone();
- // let default_partitioning = default_partitioning.clone();
- // let client = client.clone();
- // let can_send = can_send.clone();
-
// if round_robin
let shard_ix = send_batches.fetch_add(1, Ordering::SeqCst) %
num_shard;
-
-
let partitioning = get_partitioning(
&partitioner,
&stream_id,
@@ -273,34 +173,10 @@ impl IggyProducer {
)
.await
.unwrap();
- // tokio::spawn(async move {
- // let _permit = sema.acquire().await.unwrap();
- // let partitioning = get_partitioning(
- // &partitioner,
- // &stream_id,
- // &topic_id,
- // &batch,
- // partitioning.clone(),
- // &partitioning,
- // default_partitioning.clone(),
- // )
- // .unwrap();
- // try_send_messages(
- // client.clone(),
- // send_retries_count,
- // send_retries_interval,
- // can_send.clone(),
- // &stream_id,
- // &topic_id,
- // &partitioning,
- // &mut batch,
- // ).await.unwrap();
- // });
}
});
- self._join_handle = Some(handle);
- self.sender = Some(Arc::new(tx));
- self.initialized = true;
+ // self._join_handle = Some(handle);
+ // self.sender = Some(Arc::new(tx));
info!(
"Producer has been initialized for stream: {} and topic: {}.",
self.stream_id.clone(),
@@ -346,12 +222,314 @@ impl IggyProducer {
});
}
+ async fn send_internal(
+ &self,
+ stream: &Identifier,
+ topic: &Identifier,
+ mut msgs: Vec<IggyMessage>,
+ partitioning: Option<Arc<Partitioning>>,
+ ) -> Result<(), IggyError> {
+ if msgs.is_empty() {
+ return Ok(());
+ }
+
+ self.encrypt_messages(&mut msgs)?;
+
+ let part = self.get_partitioning(stream, topic, &msgs, partitioning)?;
+
+ let max = self.batch_length.unwrap_or(MAX_BATCH_LENGTH);
+
+ if self.can_send_immediately && self.linger_time_micros > 0 {
+ Self::wait_before_sending(self.linger_time_micros,
self.last_sent_at.load(ORDERING))
+ .await;
+ }
+
+ for chunk in msgs.chunks_mut(max) {
+ self.last_sent_at
+ .store(IggyTimestamp::now().into(), ORDERING);
+ self.try_send_messages(stream, topic, &part, chunk).await?;
+ }
+ Ok(())
+ }
+
+ async fn try_send_messages(
+ &self,
+ stream: &Identifier,
+ topic: &Identifier,
+ partitioning: &Arc<Partitioning>,
+ messages: &mut [IggyMessage],
+ ) -> Result<(), IggyError> {
+ let client = self.client.read().await;
+ let Some(max_retries) = self.send_retries_count else {
+ return client
+ .send_messages(stream, topic, partitioning, messages)
+ .await;
+ };
+
+ if max_retries == 0 {
+ return client
+ .send_messages(stream, topic, partitioning, messages)
+ .await;
+ }
+
+ let mut timer = if let Some(interval) = self.send_retries_interval {
+ let mut timer = tokio::time::interval(interval.get_duration());
+ timer.tick().await;
+ Some(timer)
+ } else {
+ None
+ };
+
+ self.wait_until_connected(max_retries, stream, topic, &mut timer)
+ .await?;
+ self.send_with_retries(
+ max_retries,
+ stream,
+ topic,
+ partitioning,
+ messages,
+ &mut timer,
+ )
+ .await
+ }
+
+ async fn wait_until_connected(
+ &self,
+ max_retries: u32,
+ stream: &Identifier,
+ topic: &Identifier,
+ timer: &mut Option<Interval>,
+ ) -> Result<(), IggyError> {
+ let mut retries = 0;
+ while !self.can_send.load(ORDERING) {
+ retries += 1;
+ if retries > max_retries {
+ error!(
+ "Failed to send messages to topic: {topic}, stream:
{stream} \
+ after {max_retries} retries. Client is disconnected."
+ );
+ return
Err(IggyError::CannotSendMessagesDueToClientDisconnection);
+ }
+
+ error!(
+ "Trying to send messages to topic: {topic}, stream: {stream} \
+ but the client is disconnected. Retrying
{retries}/{max_retries}..."
+ );
+
+ if let Some(timer) = timer.as_mut() {
+ trace!(
+ "Waiting for the next retry to send messages to topic:
{topic}, \
+ stream: {stream} for disconnected client..."
+ );
+ timer.tick().await;
+ }
+ }
+ Ok(())
+ }
+
+ async fn send_with_retries(
+ &self,
+ max_retries: u32,
+ stream: &Identifier,
+ topic: &Identifier,
+ partitioning: &Arc<Partitioning>,
+ messages: &mut [IggyMessage],
+ timer: &mut Option<Interval>,
+ ) -> Result<(), IggyError> {
+ let client = self.client.read().await;
+ let mut retries = 0;
+ loop {
+ match client
+ .send_messages(stream, topic, partitioning, messages)
+ .await
+ {
+ Ok(_) => return Ok(()),
+ Err(error) => {
+ retries += 1;
+ if retries > max_retries {
+ error!(
+ "Failed to send messages to topic: {topic},
stream: {stream} \
+ after {max_retries} retries. {error}."
+ );
+ return Err(error);
+ }
+
+ error!(
+ "Failed to send messages to topic: {topic}, stream:
{stream}. \
+ {error} Retrying {retries}/{max_retries}..."
+ );
+
+ if let Some(t) = timer.as_mut() {
+ trace!(
+ "Waiting for the next retry to send messages to
topic: {topic}, \
+ stream: {stream}..."
+ );
+ t.tick().await;
+ }
+ }
+ }
+ }
+ }
+
+ fn encrypt_messages(&self, messages: &mut [IggyMessage]) -> Result<(),
IggyError> {
+ if let Some(encryptor) = &self.encryptor {
+ for message in messages {
+ message.payload =
Bytes::from(encryptor.encrypt(&message.payload)?);
+ message.header.payload_length = message.payload.len() as u32;
+ }
+ }
+ Ok(())
+ }
+
+ fn get_partitioning(
+ &self,
+ stream: &Identifier,
+ topic: &Identifier,
+ messages: &[IggyMessage],
+ partitioning: Option<Arc<Partitioning>>,
+ ) -> Result<Arc<Partitioning>, IggyError> {
+ if let Some(partitioner) = &self.partitioner {
+ trace!("Calculating partition id using custom partitioner.");
+ let partition_id = partitioner.calculate_partition_id(stream,
topic, messages)?;
+ Ok(Arc::new(Partitioning::partition_id(partition_id)))
+ } else {
+ trace!("Using the provided partitioning.");
+ Ok(partitioning.unwrap_or_else(|| {
+ self.partitioning
+ .clone()
+ .unwrap_or_else(|| self.default_partitioning.clone())
+ }))
+ }
+ }
+
+ async fn wait_before_sending(interval: u64, last_sent_at: u64) {
+ if interval == 0 {
+ return;
+ }
+
+ let now: u64 = IggyTimestamp::now().into();
+ let elapsed = now - last_sent_at;
+ if elapsed >= interval {
+ trace!("No need to wait before sending messages. {now} -
{last_sent_at} = {elapsed}");
+ return;
+ }
+
+ let remaining = interval - elapsed;
+ trace!(
+ "Waiting for {remaining} microseconds before sending messages...
{interval} - {elapsed} = {remaining}"
+ );
+ sleep(Duration::from_micros(remaining)).await;
+ }
+}
+
+struct shard {
+ tx: flume::Sender<Vec<IggyMessage>>,
+ _join_handle: JoinHandle<()>,
+}
+
+// impl shard {
+// fn new(id: usize) -> Self {
+// let (tx, rx) = flume::bounded::<Vec<IggyMessage>>(10); // todo
добавить размер в конфигурацию
+// let handle = tokio::spawn(async move {
+// while let Ok(message) = rx.recv_async().await { // todo
поменять на match
+// }
+// });
+// }
+// }
+
+pub trait ErrorCallback: Send + Sync + Debug {
+ fn call(&self, error: IggyError, messages: Vec<IggyMessage>);
+}
+
+unsafe impl Send for IggyProducer {}
+unsafe impl Sync for IggyProducer {}
+
+pub struct IggyProducer {
+ core: Arc<producerCore>,
+}
+
+impl IggyProducer {
+ #[allow(clippy::too_many_arguments)]
+ pub(crate) fn new(
+ client: IggySharedMut<Box<dyn Client>>,
+ stream: Identifier,
+ stream_name: String,
+ topic: Identifier,
+ topic_name: String,
+ batch_length: Option<usize>,
+ partitioning: Option<Partitioning>,
+ encryptor: Option<Arc<EncryptorKind>>,
+ partitioner: Option<Arc<dyn Partitioner>>,
+ linger_time: Option<IggyDuration>,
+ create_stream_if_not_exists: bool,
+ create_topic_if_not_exists: bool,
+ topic_partitions_count: u32,
+ topic_replication_factor: Option<u8>,
+ topic_message_expiry: IggyExpiry,
+ topic_max_size: MaxTopicSize,
+ send_retries_count: Option<u32>,
+ send_retries_interval: Option<IggyDuration>,
+ send_mode: SendMode,
+ error_callback: Option<Arc<dyn ErrorCallback>>,
+ ) -> Self {
+ Self {
+ core: Arc::new(producerCore {
+ initialized: AtomicBool::new(true),
+ client: Arc::new(client),
+ can_send: Arc::new(AtomicBool::new(true)),
+ stream_id: Arc::new(stream),
+ stream_name,
+ topic_id: Arc::new(topic),
+ topic_name,
+ batch_length,
+ partitioning: partitioning.map(Arc::new),
+ send_mode: Arc::new(send_mode),
+ encryptor,
+ partitioner,
+ linger_time_micros: linger_time.map_or(0, |i| i.as_micros()),
+ create_stream_if_not_exists,
+ create_topic_if_not_exists,
+ topic_partitions_count,
+ topic_replication_factor,
+ topic_message_expiry,
+ topic_max_size,
+ default_partitioning: Arc::new(Partitioning::balanced()),
+ can_send_immediately: linger_time.is_none(),
+ last_sent_at: Arc::new(AtomicU64::new(0)),
+ send_retries_count,
+ send_retries_interval,
+ _join_handle: None,
+ sema: Arc::new(Semaphore::new(10)),
+ sender: None,
+ error_callback,
+ shard_number: default_shard_count(),
+ }),
+ }
+ }
+
+ pub fn stream(&self) -> &Identifier {
+ &self.core.stream_id
+ }
+
+ pub fn topic(&self) -> &Identifier {
+ &self.core.topic_id
+ }
+
+ /// Initializes the producer by subscribing to diagnostic events, creating
the stream and topic if they do not exist etc.
+ ///
+ /// Note: This method must be invoked before producing messages.
+ pub async fn init(&mut self) -> Result<(), IggyError> {
+ self.core.init().await
+ }
+
pub async fn send(&self, messages: Vec<IggyMessage>) -> Result<(),
IggyError> {
if messages.is_empty() {
trace!("No messages to send.");
return Ok(());
}
+
+
if self.can_send_immediately {
return self
.send_immediately(&self.stream_id, &self.topic_id, messages,
None)
@@ -367,12 +545,6 @@ impl IggyProducer {
.await
}
- // todod добавить канал для считывания
- pub async fn send_async(&self, messages: Vec<IggyMessage>) {
- let sender = self.sender.clone();
- sender.unwrap().send_async(messages).await.unwrap();
- }
-
pub async fn send_one(&self, message: IggyMessage) -> Result<(),
IggyError> {
self.send(vec![message]).await
}
@@ -492,98 +664,6 @@ impl IggyProducer {
}
Ok(())
}
-
- async fn send_immediately(
- &self,
- stream: &Identifier,
- topic: &Identifier,
- mut messages: Vec<IggyMessage>,
- partitioning: Option<Arc<Partitioning>>,
- ) -> Result<(), IggyError> {
- trace!("No batch size specified, sending messages immediately.");
- self.encrypt_messages(&mut messages)?;
- let default_partitioning = self.default_partitioning.clone();
- let partitioner = self.partitioner.clone();
- let client = self.client.clone();
- let send_retries_count = self.send_retries_count.clone();
- let send_retries_interval = self.send_retries_interval.clone();
- let can_send = self.can_send.clone();
-
- let partitioning = get_partitioning(
- &partitioner,
- &stream,
- &topic,
- &messages,
- partitioning,
- &self.partitioning,
- default_partitioning,
- )?;
- let batch_length = self.batch_length.unwrap_or(MAX_BATCH_LENGTH);
- if messages.len() <= batch_length {
- self.last_sent_at
- .store(IggyTimestamp::now().into(), ORDERING);
- try_send_messages(
- client.clone(),
- send_retries_count,
- send_retries_interval,
- can_send,
- stream,
- topic,
- &partitioning,
- &mut messages,
- )
- .await?;
- return Ok(());
- }
-
- for batch in messages.chunks_mut(batch_length) {
- let client = client.clone();
- let can_send = can_send.clone();
- self.last_sent_at
- .store(IggyTimestamp::now().into(), ORDERING);
- try_send_messages(
- client,
- send_retries_count,
- send_retries_interval,
- can_send,
- stream,
- topic,
- &partitioning,
- batch,
- )
- .await?;
- }
- Ok(())
- }
-
- async fn wait_before_sending(interval: u64, last_sent_at: u64) {
- if interval == 0 {
- return;
- }
-
- let now: u64 = IggyTimestamp::now().into();
- let elapsed = now - last_sent_at;
- if elapsed >= interval {
- trace!("No need to wait before sending messages. {now} -
{last_sent_at} = {elapsed}");
- return;
- }
-
- let remaining = interval - elapsed;
- trace!(
- "Waiting for {remaining} microseconds before sending messages...
{interval} - {elapsed} = {remaining}"
- );
- sleep(Duration::from_micros(remaining)).await;
- }
-
- fn encrypt_messages(&self, messages: &mut [IggyMessage]) -> Result<(),
IggyError> {
- if let Some(encryptor) = &self.encryptor {
- for message in messages {
- message.payload =
Bytes::from(encryptor.encrypt(&message.payload)?);
- message.header.payload_length = message.payload.len() as u32;
- }
- }
- Ok(())
- }
}
fn get_partitioning(