This is an automated email from the ASF dual-hosted git repository. bashirbekov pushed a commit to branch feat/add-background-send in repository https://gitbox.apache.org/repos/asf/iggy.git
commit d51e69b52dea71a6aca0215daff60b237af87031 Author: haze518 <[email protected]> AuthorDate: Wed May 28 08:44:44 2025 +0600 del --- core/integration/benches/send.rs | 178 ++--- .../tests/examples/test_new_publisher.rs | 149 ++-- core/sdk/src/clients/producer.rs | 861 +++++++-------------- core/sdk/src/clients/send_mode.rs | 109 ++- 4 files changed, 555 insertions(+), 742 deletions(-) diff --git a/core/integration/benches/send.rs b/core/integration/benches/send.rs index 1bb7b316..39273f27 100644 --- a/core/integration/benches/send.rs +++ b/core/integration/benches/send.rs @@ -1,102 +1,102 @@ -use std::sync::Arc; +// use std::sync::Arc; -use bytes::Bytes; -use criterion::{criterion_group, criterion_main, Criterion}; -use iggy::clients::send_mode::{BackgroundConfig, BackpressureMode, SendMode}; -use iggy::prelude::*; -use iggy::{clients::client::IggyClient, prelude::TcpClient}; -use iggy_common::TcpClientConfig; -use integration::test_server::{IpAddrKind, TestServer}; -use tokio::runtime::Runtime; +// use bytes::Bytes; +// use criterion::{criterion_group, criterion_main, Criterion}; +// use iggy::clients::send_mode::{BackgroundConfig, BackpressureMode, SendMode}; +// use iggy::prelude::*; +// use iggy::{clients::client::IggyClient, prelude::TcpClient}; +// use iggy_common::TcpClientConfig; +// use integration::test_server::{IpAddrKind, TestServer}; +// use tokio::runtime::Runtime; -fn bench_send(c: &mut Criterion) { - let rt = Runtime::new().unwrap(); +// fn bench_send(c: &mut Criterion) { +// let rt = Runtime::new().unwrap(); - // SETUP вне замера - let (mut producer, batch): (IggyProducer, Vec<IggyMessage>) = rt.block_on(async { - // старт сервера - let mut server = TestServer::new(None, true, None, IpAddrKind::V4); - server.start(); +// // SETUP вне замера +// let (mut producer, batch): (IggyProducer, Vec<IggyMessage>) = rt.block_on(async { +// // старт сервера +// let mut server = TestServer::new(None, true, None, IpAddrKind::V4); +// server.start(); - let tcp_client_config = TcpClientConfig { - server_address: server.get_raw_tcp_addr().unwrap(), - ..TcpClientConfig::default() - }; +// let tcp_client_config = TcpClientConfig { +// server_address: server.get_raw_tcp_addr().unwrap(), +// ..TcpClientConfig::default() +// }; - let client = Box::new(TcpClient::create(Arc::new(tcp_client_config)).unwrap()); - let client = IggyClient::create(client, None, None); +// let client = Box::new(TcpClient::create(Arc::new(tcp_client_config)).unwrap()); +// let client = IggyClient::create(client, None, None); - client.connect().await.unwrap(); - client - .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD) - .await - .unwrap(); - client - .create_stream("sample-stream", Some(1)) - .await - .unwrap(); - client - .create_topic( - &1.try_into().unwrap(), - "sample-topic", - 1, - CompressionAlgorithm::default(), - None, - None, - IggyExpiry::NeverExpire, - MaxTopicSize::ServerDefault, - ) - .await - .unwrap(); +// client.connect().await.unwrap(); +// client +// .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD) +// .await +// .unwrap(); +// client +// .create_stream("sample-stream", Some(1)) +// .await +// .unwrap(); +// client +// .create_topic( +// &1.try_into().unwrap(), +// "sample-topic", +// 1, +// CompressionAlgorithm::default(), +// None, +// None, +// IggyExpiry::NeverExpire, +// MaxTopicSize::ServerDefault, +// ) +// .await +// .unwrap(); - let mut producer = client - .producer("1", "1") - .unwrap() - .batch_length(10) - .send_mode(SendMode::Background(BackgroundConfig { - max_in_flight: 10, - in_flight_timeout: None, - batch_size: None, - failure_mode: BackpressureMode::Block, - })) - .build(); +// let mut producer = client +// .producer("1", "1") +// .unwrap() +// .batch_length(10) +// .send_mode(SendMode::Background(BackgroundConfig { +// max_in_flight: 10, +// in_flight_timeout: None, +// batch_size: None, +// failure_mode: BackpressureMode::Block, +// })) +// .build(); - producer.init().await.unwrap(); +// producer.init().await.unwrap(); - // заранее создаём батч - let mut messages = Vec::new(); - for _ in 0..10 { - let payload = Bytes::from(vec![0u8; 1024]); - let msg = IggyMessage::builder() - .payload(payload) - .build() - .unwrap(); - messages.push(msg); - } +// // заранее создаём батч +// let mut messages = Vec::new(); +// for _ in 0..10 { +// let payload = Bytes::from(vec![0u8; 1024]); +// let msg = IggyMessage::builder() +// .payload(payload) +// .build() +// .unwrap(); +// messages.push(msg); +// } - (producer, messages) - }); +// (producer, messages) +// }); - c.bench_function("producer.send(batch)", |b| { - b.to_async(&rt).iter(|| { - let batch = create_batch(); - producer.send(batch) - }); - }); -} +// c.bench_function("producer.send(batch)", |b| { +// b.to_async(&rt).iter(|| { +// let batch = create_batch(); +// producer.send(batch) +// }); +// }); +// } -fn create_batch() -> Vec<IggyMessage> { - let mut messages = Vec::new(); - for _ in 0..1 { - let payload = Bytes::from(vec![0u8; 1024]); - let msg = IggyMessage::builder() - .payload(payload) - .build() - .unwrap(); - messages.push(msg); - } - messages -} +// fn create_batch() -> Vec<IggyMessage> { +// let mut messages = Vec::new(); +// for _ in 0..1 { +// let payload = Bytes::from(vec![0u8; 1024]); +// let msg = IggyMessage::builder() +// .payload(payload) +// .build() +// .unwrap(); +// messages.push(msg); +// } +// messages +// } -criterion_group!(benches, bench_send); -criterion_main!(benches); +// criterion_group!(benches, bench_send); +// criterion_main!(benches); diff --git a/core/integration/tests/examples/test_new_publisher.rs b/core/integration/tests/examples/test_new_publisher.rs index 771fac8f..d9a2517e 100644 --- a/core/integration/tests/examples/test_new_publisher.rs +++ b/core/integration/tests/examples/test_new_publisher.rs @@ -15,7 +15,6 @@ use integration::test_server::{IpAddrKind, TestServer}; #[tokio::test] async fn test_new_publisher() { - // new let mut server = TestServer::new(None, true, None, IpAddrKind::V4); server.start(); @@ -26,106 +25,90 @@ async fn test_new_publisher() { let client = Box::new(TcpClient::create(Arc::new(tcp_client_config)).unwrap()); let client = IggyClient::create(client, None, None); - // setup client.connect().await.unwrap(); - let ping_result = client.ping().await; - assert!(ping_result.is_ok(), "Failed to ping server"); - - client - .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD) - .await - .unwrap(); - client - .create_stream("sample-stream", Some(1)) - .await - .unwrap(); - client - .create_topic( - &1.try_into().unwrap(), - "sample-topic", - 1, - CompressionAlgorithm::default(), - None, - None, - IggyExpiry::NeverExpire, - MaxTopicSize::ServerDefault, - ) - .await - .unwrap(); - - let mut producer = client + assert!(client.ping().await.is_ok(), "Failed to ping server"); + + client.login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD).await.unwrap(); + client.create_stream("sample-stream", Some(1)).await.unwrap(); + client.create_topic( + &1.try_into().unwrap(), + "sample-topic", + 1, + CompressionAlgorithm::default(), + None, + None, + IggyExpiry::NeverExpire, + MaxTopicSize::ServerDefault, + ).await.unwrap(); + + let producer = client .producer("1", "1") .unwrap() .batch_length(10) - .send_mode(SendMode::Sync) - // .send_mode(SendMode::Background(BackgroundConfig { - // max_in_flight: 10, - // in_flight_timeout: None, - // batch_size: None, - // failure_mode: BackpressureMode::Block, - // })) + .send_mode(SendMode::Background) .build(); producer.init().await.unwrap(); - // produce - let mut send_batches = 0; - - let batch_limit = 10000; - let mut count = 0; - let mut t = Vec::new(); - while send_batches < batch_limit { + let batches_to_send = 10_000; + let messages_per_batch = 10; + let total_expected = batches_to_send * messages_per_batch; + + for _ in 0..batches_to_send { let start = Instant::now(); - let mut messages = Vec::new(); - for _ in 0..10 { - let payload = Bytes::from(vec![0u8; 1024]); - let msg = IggyMessage::builder() - .payload(payload) - .build() - .unwrap(); - messages.push(msg); - count += 1; - } + let messages: Vec<_> = (0..messages_per_batch) + .map(|_| { + IggyMessage::builder() + .payload(Bytes::from(vec![0u8; 1024])) + .build() + .unwrap() + }) + .collect(); producer.send(messages).await.unwrap(); - send_batches += 1; - let duration = start.elapsed().as_millis(); - t.push(duration); + t.push(start.elapsed().as_millis()); + } + + let mut consumer = client + .consumer_group("some-consumer", "1", "1") + .unwrap() + .auto_commit(AutoCommit::When(AutoCommitWhen::PollingMessages)) + .create_consumer_group_if_not_exists() + .auto_join_consumer_group() + .polling_strategy(PollingStrategy::next()) + .poll_interval(IggyDuration::from_str("1ms").unwrap()) + .batch_length(10) + .build(); + + consumer.init().await.unwrap(); + + let mut received = 0; + while let Some(msg) = consumer.next().await { + match msg { + Ok(_) => { + received += 1; + if received >= total_expected { + break; + } + } + Err(e) => panic!("Consumer error: {}", e), + } } - // let mut consumer = client - // .consumer_group("some-consumer", "1", "1") - // .unwrap() - // .auto_commit(AutoCommit::When(AutoCommitWhen::PollingMessages)) - // .create_consumer_group_if_not_exists() - // .auto_join_consumer_group() - // .polling_strategy(PollingStrategy::next()) - // .poll_interval(IggyDuration::from_str("1ms").unwrap()) - // .batch_length(10) - // .build(); - - // consumer.init().await.unwrap(); - - // let mut consumed_batches = 0; - // while let Some(message) = consumer.next().await { - // if consumed_batches >= batch_limit { - // break - // } - - // if let Ok(message) = message { - // // println!("got message"); - // consumed_batches += 1; - // } else if let Err(error) = message { - // panic!("{}", error.to_string()); - // } - // } + assert_eq!( + received, total_expected, + "Not all messages received: got {}, expected {}", + received, total_expected + ); let total: u128 = t.iter().sum(); let avg = total as f64 / t.len() as f64; - println!("Среднее время выполнения одного батча: {:.3} мс", avg); + println!("Среднее время отправки одного батча: {:.3} мс", avg); } -// sync: Время выполнения: 13.937269459s + +// sync: avg send: 1.561ms; overall: 46.71s +// async: avg send: 0.356ms; overall: 28.67s diff --git a/core/sdk/src/clients/producer.rs b/core/sdk/src/clients/producer.rs index 9ba3026d..ba2ccc94 100644 --- a/core/sdk/src/clients/producer.rs +++ b/core/sdk/src/clients/producer.rs @@ -1,4 +1,4 @@ -use super::send_mode::{BackpressureMode, SendMode}; +use super::send_mode::{shardMessage, BackgroundConfig, BackpressureMode, Dispatcher, SendMode}; /* Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -28,41 +28,16 @@ use iggy_common::{ }; use std::fmt::Debug; use std::sync::Arc; +use std::sync::atomic::Ordering; use std::sync::atomic::{AtomicBool, AtomicU64}; -use std::time::{Duration, Instant}; +use std::time::Duration; use tokio::sync::Semaphore; use tokio::task::JoinHandle; use tokio::time::{Interval, sleep}; use tracing::{error, info, trace, warn}; -use dashmap::DashMap; -use std::sync::atomic::AtomicUsize; -use std::sync::atomic::Ordering; - -struct shard { - tx: flume::Sender<Vec<IggyMessage>>, - _join_handle: JoinHandle<()>, -} - -impl shard { - fn new(id: usize) -> Self { - let (tx, rx) = flume::bounded::<Vec<IggyMessage>>(10); // todo добавить размер в конфигурацию - let handle = tokio::spawn(async move { - while let Ok(message) = rx.recv_async().await { // todo поменять на match - - } - }); - } -} -pub trait ErrorCallback: Send + Sync + Debug { - fn call(&self, error: IggyError, messages: Vec<IggyMessage>); -} - -unsafe impl Send for IggyProducer {} -unsafe impl Sync for IggyProducer {} - -pub struct IggyProducer { - initialized: bool, +pub struct ProducerCore { + initialized: AtomicBool, can_send: Arc<AtomicBool>, client: Arc<IggySharedMut<Box<dyn Client>>>, stream_id: Arc<Identifier>, @@ -71,7 +46,6 @@ pub struct IggyProducer { topic_name: String, batch_length: Option<usize>, partitioning: Option<Arc<Partitioning>>, - send_mode: Arc<SendMode>, encryptor: Option<Arc<EncryptorKind>>, partitioner: Option<Arc<dyn Partitioner>>, linger_time_micros: u64, @@ -92,79 +66,14 @@ pub struct IggyProducer { sender: Option<Arc<flume::Sender<Vec<IggyMessage>>>>, error_callback: Option<Arc<dyn ErrorCallback>>, shard_number: usize, - // todo добавить ShardStrategy } -impl IggyProducer { - #[allow(clippy::too_many_arguments)] - pub(crate) fn new( - client: IggySharedMut<Box<dyn Client>>, - stream: Identifier, - stream_name: String, - topic: Identifier, - topic_name: String, - batch_length: Option<usize>, - partitioning: Option<Partitioning>, - encryptor: Option<Arc<EncryptorKind>>, - partitioner: Option<Arc<dyn Partitioner>>, - linger_time: Option<IggyDuration>, - create_stream_if_not_exists: bool, - create_topic_if_not_exists: bool, - topic_partitions_count: u32, - topic_replication_factor: Option<u8>, - topic_message_expiry: IggyExpiry, - topic_max_size: MaxTopicSize, - send_retries_count: Option<u32>, - send_retries_interval: Option<IggyDuration>, - send_mode: SendMode, - error_callback: Option<Arc<dyn ErrorCallback>>, - ) -> Self { - Self { - initialized: false, - client: Arc::new(client), - can_send: Arc::new(AtomicBool::new(true)), - stream_id: Arc::new(stream), - stream_name, - topic_id: Arc::new(topic), - topic_name, - batch_length, - partitioning: partitioning.map(Arc::new), - send_mode: Arc::new(send_mode), - encryptor, - partitioner, - linger_time_micros: linger_time.map_or(0, |i| i.as_micros()), - create_stream_if_not_exists, - create_topic_if_not_exists, - topic_partitions_count, - topic_replication_factor, - topic_message_expiry, - topic_max_size, - default_partitioning: Arc::new(Partitioning::balanced()), - can_send_immediately: linger_time.is_none(), - last_sent_at: Arc::new(AtomicU64::new(0)), - send_retries_count, - send_retries_interval, - _join_handle: None, - sema: Arc::new(Semaphore::new(10)), - sender: None, - error_callback, - shard_number: default_shard_count(), - } - } - - pub fn stream(&self) -> &Identifier { - &self.stream_id - } - - pub fn topic(&self) -> &Identifier { - &self.topic_id - } - - /// Initializes the producer by subscribing to diagnostic events, creating the stream and topic if they do not exist etc. - /// - /// Note: This method must be invoked before producing messages. - pub async fn init(&mut self) -> Result<(), IggyError> { - if self.initialized { +impl ProducerCore { + pub async fn init(&self) -> Result<(), IggyError> { + if self.initialized.compare_exchange( + false, true, + Ordering::SeqCst, Ordering::SeqCst, + ).is_err() { return Ok(()); } @@ -222,85 +131,6 @@ impl IggyProducer { .await?; } - let (tx, rx) = flume::bounded::<Vec<IggyMessage>>(0); // todo задать какое-то значение - let stream_id = self.stream_id.clone(); - let topic_id = self.topic_id.clone(); - let partitioning = self.partitioning.clone(); - let default_partitioning = self.default_partitioning.clone(); - let partitioner = self.partitioner.clone(); - let client = self.client.clone(); - let send_retries_count = self.send_retries_count.clone(); - let send_retries_interval = self.send_retries_interval.clone(); - let can_send = self.can_send.clone(); - let send_batches = AtomicUsize::new(0); - let num_shard = self.shard_number; - // let sema = self.sema.clone(); - - let handle = tokio::spawn(async move { - while let Ok(mut batch) = rx.recv_async().await { - // let sema = sema.clone(); - // let partitioner = partitioner.clone(); - // let stream_id = stream_id.clone(); - // let topic_id = topic_id.clone(); - // let partitioning = partitioning.clone(); - // let default_partitioning = default_partitioning.clone(); - // let client = client.clone(); - // let can_send = can_send.clone(); - - // if round_robin - let shard_ix = send_batches.fetch_add(1, Ordering::SeqCst) % num_shard; - - - let partitioning = get_partitioning( - &partitioner, - &stream_id, - &topic_id, - &batch, - partitioning.clone(), - &partitioning, - default_partitioning.clone(), - ) - .unwrap(); - try_send_messages( - client.clone(), - send_retries_count, - send_retries_interval, - can_send.clone(), - &stream_id, - &topic_id, - &partitioning, - &mut batch, - ) - .await - .unwrap(); - // tokio::spawn(async move { - // let _permit = sema.acquire().await.unwrap(); - // let partitioning = get_partitioning( - // &partitioner, - // &stream_id, - // &topic_id, - // &batch, - // partitioning.clone(), - // &partitioning, - // default_partitioning.clone(), - // ) - // .unwrap(); - // try_send_messages( - // client.clone(), - // send_retries_count, - // send_retries_interval, - // can_send.clone(), - // &stream_id, - // &topic_id, - // &partitioning, - // &mut batch, - // ).await.unwrap(); - // }); - } - }); - self._join_handle = Some(handle); - self.sender = Some(Arc::new(tx)); - self.initialized = true; info!( "Producer has been initialized for stream: {} and topic: {}.", self.stream_id.clone(), @@ -346,214 +176,184 @@ impl IggyProducer { }); } - pub async fn send(&self, messages: Vec<IggyMessage>) -> Result<(), IggyError> { - if messages.is_empty() { - trace!("No messages to send."); + pub(crate) async fn send_internal( + &self, + stream: &Identifier, + topic: &Identifier, + mut msgs: Vec<IggyMessage>, + partitioning: Option<Arc<Partitioning>>, + ) -> Result<(), IggyError> { + if msgs.is_empty() { return Ok(()); } - if self.can_send_immediately { - return self - .send_immediately(&self.stream_id, &self.topic_id, messages, None) - .await; - } + self.encrypt_messages(&mut msgs)?; - self.send_buffered( - self.stream_id.clone(), - self.topic_id.clone(), - messages, - None, - ) - .await - } + let part = self.get_partitioning(stream, topic, &msgs, partitioning)?; - // todod добавить канал для считывания - pub async fn send_async(&self, messages: Vec<IggyMessage>) { - let sender = self.sender.clone(); - sender.unwrap().send_async(messages).await.unwrap(); - } + let max = self.batch_length.unwrap_or(MAX_BATCH_LENGTH); - pub async fn send_one(&self, message: IggyMessage) -> Result<(), IggyError> { - self.send(vec![message]).await + if !self.can_send_immediately && self.linger_time_micros > 0 { + Self::wait_before_sending(self.linger_time_micros, self.last_sent_at.load(ORDERING)) + .await; + } + + for chunk in msgs.chunks_mut(max) { + self.last_sent_at + .store(IggyTimestamp::now().into(), ORDERING); + self.try_send_messages(stream, topic, &part, chunk).await?; + } + Ok(()) } - pub async fn send_with_partitioning( + async fn try_send_messages( &self, - messages: Vec<IggyMessage>, - partitioning: Option<Arc<Partitioning>>, + stream: &Identifier, + topic: &Identifier, + partitioning: &Arc<Partitioning>, + messages: &mut [IggyMessage], ) -> Result<(), IggyError> { - if messages.is_empty() { - trace!("No messages to send."); - return Ok(()); - } + let client = self.client.read().await; + let Some(max_retries) = self.send_retries_count else { + return client + .send_messages(stream, topic, partitioning, messages) + .await; + }; - if self.can_send_immediately { - return self - .send_immediately(&self.stream_id, &self.topic_id, messages, partitioning) + if max_retries == 0 { + return client + .send_messages(stream, topic, partitioning, messages) .await; } - self.send_buffered( - self.stream_id.clone(), - self.topic_id.clone(), - messages, + let mut timer = if let Some(interval) = self.send_retries_interval { + let mut timer = tokio::time::interval(interval.get_duration()); + timer.tick().await; + Some(timer) + } else { + None + }; + + self.wait_until_connected(max_retries, stream, topic, &mut timer) + .await?; + self.send_with_retries( + max_retries, + stream, + topic, partitioning, + messages, + &mut timer, ) .await } - pub async fn send_to( + async fn wait_until_connected( &self, - stream: Arc<Identifier>, - topic: Arc<Identifier>, - messages: Vec<IggyMessage>, - partitioning: Option<Arc<Partitioning>>, + max_retries: u32, + stream: &Identifier, + topic: &Identifier, + timer: &mut Option<Interval>, ) -> Result<(), IggyError> { - if messages.is_empty() { - trace!("No messages to send."); - return Ok(()); - } + let mut retries = 0; + while !self.can_send.load(ORDERING) { + retries += 1; + if retries > max_retries { + error!( + "Failed to send messages to topic: {topic}, stream: {stream} \ + after {max_retries} retries. Client is disconnected." + ); + return Err(IggyError::CannotSendMessagesDueToClientDisconnection); + } - if self.can_send_immediately { - return self - .send_immediately(&self.stream_id, &self.topic_id, messages, partitioning) - .await; - } + error!( + "Trying to send messages to topic: {topic}, stream: {stream} \ + but the client is disconnected. Retrying {retries}/{max_retries}..." + ); - self.send_buffered(stream, topic, messages, partitioning) - .await + if let Some(timer) = timer.as_mut() { + trace!( + "Waiting for the next retry to send messages to topic: {topic}, \ + stream: {stream} for disconnected client..." + ); + timer.tick().await; + } + } + Ok(()) } - // TODO add batch_size - async fn send_buffered( + async fn send_with_retries( &self, - stream: Arc<Identifier>, - topic: Arc<Identifier>, - mut messages: Vec<IggyMessage>, - partitioning: Option<Arc<Partitioning>>, + max_retries: u32, + stream: &Identifier, + topic: &Identifier, + partitioning: &Arc<Partitioning>, + messages: &mut [IggyMessage], + timer: &mut Option<Interval>, ) -> Result<(), IggyError> { - self.encrypt_messages(&mut messages)?; - let default_partitioning = self.default_partitioning.clone(); - let partitioner = self.partitioner.clone(); - let partitioning = get_partitioning( - &partitioner, - &stream, - &topic, - &messages, - partitioning, - &self.partitioning, - default_partitioning, - )?; - let batch_length = self.batch_length.unwrap_or(MAX_BATCH_LENGTH); - let batches = messages.chunks_mut(batch_length); - let mut current_batch = 1; - let batches_count = batches.len(); - for batch in batches { - if self.linger_time_micros > 0 { - Self::wait_before_sending( - self.linger_time_micros, - self.last_sent_at.load(ORDERING), - ) - .await; - } + let client = self.client.read().await; + let mut retries = 0; + loop { + match client + .send_messages(stream, topic, partitioning, messages) + .await + { + Ok(_) => return Ok(()), + Err(error) => { + retries += 1; + if retries > max_retries { + error!( + "Failed to send messages to topic: {topic}, stream: {stream} \ + after {max_retries} retries. {error}." + ); + return Err(error); + } - let messages_count = batch.len(); - trace!( - "Sending {messages_count} messages ({current_batch}/{batches_count} batch(es))..." - ); - self.last_sent_at - .store(IggyTimestamp::now().into(), ORDERING); + error!( + "Failed to send messages to topic: {topic}, stream: {stream}. \ + {error} Retrying {retries}/{max_retries}..." + ); - let client = self.client.clone(); - let send_retries_count = self.send_retries_count.clone(); - let send_retries_interval = self.send_retries_interval.clone(); - let can_send = self.can_send.clone(); - - let sender = self.sender.clone(); - let send_mode = self.send_mode.clone(); - try_send_messages_new( - BackpressureMode::Block, - sender, - self.error_callback.clone(), - send_mode, - client, - send_retries_count, - send_retries_interval, - can_send, - &self.stream_id, - &self.topic_id, - &partitioning, - batch, - ) - .await?; + if let Some(t) = timer.as_mut() { + trace!( + "Waiting for the next retry to send messages to topic: {topic}, \ + stream: {stream}..." + ); + t.tick().await; + } + } + } + } + } - trace!("Sent {messages_count} messages ({current_batch}/{batches_count} batch(es))."); - current_batch += 1; + fn encrypt_messages(&self, messages: &mut [IggyMessage]) -> Result<(), IggyError> { + if let Some(encryptor) = &self.encryptor { + for message in messages { + message.payload = Bytes::from(encryptor.encrypt(&message.payload)?); + message.header.payload_length = message.payload.len() as u32; + } } Ok(()) } - async fn send_immediately( + fn get_partitioning( &self, stream: &Identifier, topic: &Identifier, - mut messages: Vec<IggyMessage>, + messages: &[IggyMessage], partitioning: Option<Arc<Partitioning>>, - ) -> Result<(), IggyError> { - trace!("No batch size specified, sending messages immediately."); - self.encrypt_messages(&mut messages)?; - let default_partitioning = self.default_partitioning.clone(); - let partitioner = self.partitioner.clone(); - let client = self.client.clone(); - let send_retries_count = self.send_retries_count.clone(); - let send_retries_interval = self.send_retries_interval.clone(); - let can_send = self.can_send.clone(); - - let partitioning = get_partitioning( - &partitioner, - &stream, - &topic, - &messages, - partitioning, - &self.partitioning, - default_partitioning, - )?; - let batch_length = self.batch_length.unwrap_or(MAX_BATCH_LENGTH); - if messages.len() <= batch_length { - self.last_sent_at - .store(IggyTimestamp::now().into(), ORDERING); - try_send_messages( - client.clone(), - send_retries_count, - send_retries_interval, - can_send, - stream, - topic, - &partitioning, - &mut messages, - ) - .await?; - return Ok(()); - } - - for batch in messages.chunks_mut(batch_length) { - let client = client.clone(); - let can_send = can_send.clone(); - self.last_sent_at - .store(IggyTimestamp::now().into(), ORDERING); - try_send_messages( - client, - send_retries_count, - send_retries_interval, - can_send, - stream, - topic, - &partitioning, - batch, - ) - .await?; + ) -> Result<Arc<Partitioning>, IggyError> { + if let Some(partitioner) = &self.partitioner { + trace!("Calculating partition id using custom partitioner."); + let partition_id = partitioner.calculate_partition_id(stream, topic, messages)?; + Ok(Arc::new(Partitioning::partition_id(partition_id))) + } else { + trace!("Using the provided partitioning."); + Ok(partitioning.unwrap_or_else(|| { + self.partitioning + .clone() + .unwrap_or_else(|| self.default_partitioning.clone()) + })) } - Ok(()) } async fn wait_before_sending(interval: u64, last_sent_at: u64) { @@ -574,247 +374,174 @@ impl IggyProducer { ); sleep(Duration::from_micros(remaining)).await; } +} - fn encrypt_messages(&self, messages: &mut [IggyMessage]) -> Result<(), IggyError> { - if let Some(encryptor) = &self.encryptor { - for message in messages { - message.payload = Bytes::from(encryptor.encrypt(&message.payload)?); - message.header.payload_length = message.payload.len() as u32; - } - } - Ok(()) - } + +pub trait ErrorCallback: Send + Sync + Debug { + fn call(&self, error: IggyError, messages: Vec<IggyMessage>); } -fn get_partitioning( - partitioner: &Option<Arc<dyn Partitioner>>, - stream: &Identifier, - topic: &Identifier, - messages: &[IggyMessage], - partitioning: Option<Arc<Partitioning>>, - producer_partitioning: &Option<Arc<Partitioning>>, - default_partitioning: Arc<Partitioning>, -) -> Result<Arc<Partitioning>, IggyError> { - if let Some(partitioner) = partitioner { - trace!("Calculating partition id using custom partitioner."); - let partition_id = partitioner.calculate_partition_id(stream, topic, messages)?; - Ok(Arc::new(Partitioning::partition_id(partition_id))) - } else { - trace!("Using the provided partitioning."); - Ok(partitioning.unwrap_or_else(|| { - producer_partitioning - .clone() - .unwrap_or_else(|| default_partitioning.clone()) - })) - } +unsafe impl Send for IggyProducer {} +unsafe impl Sync for IggyProducer {} + +pub struct IggyProducer { + core: Arc<ProducerCore>, + send_mode: SendMode, + dispatcher: Option<Dispatcher>, } -async fn wait_until_connected( - can_send: Arc<AtomicBool>, - max_retries: u32, - stream: &Identifier, - topic: &Identifier, - timer: &mut Option<Interval>, -) -> Result<(), IggyError> { - let mut retries = 0; - while !can_send.load(ORDERING) { - retries += 1; - if retries > max_retries { - error!( - "Failed to send messages to topic: {topic}, stream: {stream} \ - after {max_retries} retries. Client is disconnected." - ); - return Err(IggyError::CannotSendMessagesDueToClientDisconnection); +impl IggyProducer { + #[allow(clippy::too_many_arguments)] + pub(crate) fn new( + client: IggySharedMut<Box<dyn Client>>, + stream: Identifier, + stream_name: String, + topic: Identifier, + topic_name: String, + batch_length: Option<usize>, + partitioning: Option<Partitioning>, + encryptor: Option<Arc<EncryptorKind>>, + partitioner: Option<Arc<dyn Partitioner>>, + linger_time: Option<IggyDuration>, + create_stream_if_not_exists: bool, + create_topic_if_not_exists: bool, + topic_partitions_count: u32, + topic_replication_factor: Option<u8>, + topic_message_expiry: IggyExpiry, + topic_max_size: MaxTopicSize, + send_retries_count: Option<u32>, + send_retries_interval: Option<IggyDuration>, + send_mode: SendMode, + error_callback: Option<Arc<dyn ErrorCallback>>, + ) -> Self { + let core = Arc::new(ProducerCore { + initialized: AtomicBool::new(true), + client: Arc::new(client), + can_send: Arc::new(AtomicBool::new(true)), + stream_id: Arc::new(stream), + stream_name, + topic_id: Arc::new(topic), + topic_name, + batch_length, + partitioning: partitioning.map(Arc::new), + encryptor, + partitioner, + linger_time_micros: linger_time.map_or(0, |i| i.as_micros()), + create_stream_if_not_exists, + create_topic_if_not_exists, + topic_partitions_count, + topic_replication_factor, + topic_message_expiry, + topic_max_size, + default_partitioning: Arc::new(Partitioning::balanced()), + can_send_immediately: linger_time.is_none(), + last_sent_at: Arc::new(AtomicU64::new(0)), + send_retries_count, + send_retries_interval, + _join_handle: None, + sema: Arc::new(Semaphore::new(10)), + sender: None, + error_callback, + shard_number: default_shard_count(), + }); + let num_shards = default_shard_count(); // todo потом заменмить на норм значепние + let config = BackgroundConfig{ + max_in_flight: 4, + in_flight_timeout: None, + batch_size: None, + failure_mode: BackpressureMode::Block, + }; + let mut dispatcher = None; + if send_mode == SendMode::Background { + dispatcher = Some(Dispatcher::new(core.clone(), num_shards, Arc::new(config))); } - error!( - "Trying to send messages to topic: {topic}, stream: {stream} \ - but the client is disconnected. Retrying {retries}/{max_retries}..." - ); - - if let Some(timer) = timer.as_mut() { - trace!( - "Waiting for the next retry to send messages to topic: {topic}, \ - stream: {stream} for disconnected client..." - ); - timer.tick().await; + Self { + core, + send_mode, + dispatcher: dispatcher, } } - Ok(()) -} -async fn send_with_retries( - client: Arc<IggySharedMut<Box<dyn Client>>>, - max_retries: u32, - stream: &Identifier, - topic: &Identifier, - partitioning: &Arc<Partitioning>, - messages: &mut [IggyMessage], - timer: &mut Option<Interval>, -) -> Result<(), IggyError> { - let client = client.read().await; - let mut retries = 0; - loop { - match client - .send_messages(stream, topic, partitioning, messages) - .await - { - Ok(_) => return Ok(()), - Err(error) => { - retries += 1; - if retries > max_retries { - error!( - "Failed to send messages to topic: {topic}, stream: {stream} \ - after {max_retries} retries. {error}." - ); - return Err(error); - } + pub fn stream(&self) -> &Identifier { + &self.core.stream_id + } - error!( - "Failed to send messages to topic: {topic}, stream: {stream}. \ - {error} Retrying {retries}/{max_retries}..." - ); + pub fn topic(&self) -> &Identifier { + &self.core.topic_id + } - if let Some(t) = timer.as_mut() { - trace!( - "Waiting for the next retry to send messages to topic: {topic}, \ - stream: {stream}..." - ); - t.tick().await; + /// Initializes the producer by subscribing to diagnostic events, creating the stream and topic if they do not exist etc. + /// + /// Note: This method must be invoked before producing messages. + pub async fn init(&self) -> Result<(), IggyError> { + self.core.init().await + } + + pub async fn send(&self, messages: Vec<IggyMessage>) -> Result<(), IggyError> { + if messages.is_empty() { + trace!("No messages to send."); + return Ok(()); + } + + let stream_id = self.core.stream_id.clone(); + let topic_id = self.core.topic_id.clone(); + + match &self.send_mode { + SendMode::Sync => { + self.core.send_internal(&stream_id, &topic_id, messages, None).await + + } + SendMode::Background => { + match &self.dispatcher { + Some(disp) => { + disp.dispatch(shardMessage{ + messages, + stream: stream_id, + topic: topic_id, + partitioning: None, + }).await.map_err(|err| IggyError::BackgroundSendError) + } + None => Ok(()) } } } - } -} -async fn try_send_messages( - client: Arc<IggySharedMut<Box<dyn Client>>>, - send_retries_count: Option<u32>, - send_retries_interval: Option<IggyDuration>, - can_send: Arc<AtomicBool>, - stream: &Identifier, - topic: &Identifier, - partitioning: &Arc<Partitioning>, - messages: &mut [IggyMessage], -) -> Result<(), IggyError> { - let rw_client = client.read().await; - let Some(max_retries) = send_retries_count else { - return rw_client - .send_messages(stream, topic, partitioning, messages) - .await; - }; - - if max_retries == 0 { - return rw_client - .send_messages(stream, topic, partitioning, messages) - .await; } - let mut timer = if let Some(interval) = send_retries_interval { - let mut timer = tokio::time::interval(interval.get_duration()); - timer.tick().await; - Some(timer) - } else { - None - }; - - wait_until_connected(can_send.clone(), max_retries, stream, topic, &mut timer).await?; - send_with_retries( - client.clone(), - max_retries, - stream, - topic, - partitioning, - messages, - &mut timer, - ) - .await -} + pub async fn send_one(&self, message: IggyMessage) -> Result<(), IggyError> { + self.send(vec![message]).await + } -async fn try_send_messages_new( - backpressure_mode: BackpressureMode, - sender: Option<Arc<flume::Sender<Vec<IggyMessage>>>>, - error_callback: Option<Arc<dyn ErrorCallback>>, - send_mode: Arc<SendMode>, - client: Arc<IggySharedMut<Box<dyn Client>>>, - send_retries_count: Option<u32>, - send_retries_interval: Option<IggyDuration>, - can_send: Arc<AtomicBool>, - stream: &Identifier, - topic: &Identifier, - partitioning: &Arc<Partitioning>, - messages: &mut [IggyMessage], -) -> Result<(), IggyError> { - match &*send_mode { - SendMode::Sync => { - try_send_messages( - client, - send_retries_count, - send_retries_interval, - can_send, - stream, - topic, - partitioning, - messages, - ) - .await + pub async fn send_with_partitioning( + &self, + messages: Vec<IggyMessage>, + partitioning: Option<Arc<Partitioning>>, + ) -> Result<(), IggyError> { + if messages.is_empty() { + trace!("No messages to send."); + return Ok(()); } - SendMode::Background(_cfg) => { - let out: Vec<IggyMessage> = messages.iter_mut().map(std::mem::take).collect(); - let sender = sender.as_ref().expect("init() must set sender").clone(); - - match sender.try_send(out) { - Ok(()) => Ok(()), - - Err(flume::TrySendError::Full(out)) => match backpressure_mode { - BackpressureMode::Block => match sender.send_async(out).await { - Ok(()) => Ok(()), - Err(flume::SendError(out)) => { - if let Some(cb) = error_callback { - cb.call(IggyError::BackgroundSendError, out); - } - Err(IggyError::BackgroundSendError) - } - }, - - BackpressureMode::BlockWithTimeout(t) => { - match tokio::time::timeout(t.get_duration(), sender.send_async(out)).await { - Ok(Ok(())) => Ok(()), - - Ok(Err(flume::SendError(out))) => { - if let Some(cb) = error_callback { - cb.call(IggyError::BackgroundSendTimeout, out); - } - Err(IggyError::BackgroundSendTimeout) - } - - Err(_) => { - if let Some(cb) = &error_callback { - cb.call(IggyError::BackgroundSendTimeout, vec![]); - } - Err(IggyError::BackgroundSendTimeout) - } - } - } + let stream_id = &self.core.stream_id; + let topic_id = &self.core.topic_id; - BackpressureMode::FailImmediately => { - if let Some(cb) = &error_callback { - cb.call(IggyError::BackgroundSendBufferFull, out); - } - Err(IggyError::BackgroundSendBufferFull) - } - }, + self.core.send_internal(stream_id, topic_id, messages, partitioning).await + } - Err(flume::TrySendError::Disconnected(out)) => { - if let Some(cb) = &error_callback { - cb.call(IggyError::BackgroundWorkerDisconnected, out); - } - error!("Background worker has shut down."); - Err(IggyError::BackgroundWorkerDisconnected) - } - } + pub async fn send_to( + &self, + stream: Arc<Identifier>, + topic: Arc<Identifier>, + messages: Vec<IggyMessage>, + partitioning: Option<Arc<Partitioning>>, + ) -> Result<(), IggyError> { + if messages.is_empty() { + trace!("No messages to send."); + return Ok(()); } + + self.core.send_internal(&stream, &topic, messages, partitioning).await } } diff --git a/core/sdk/src/clients/send_mode.rs b/core/sdk/src/clients/send_mode.rs index 98b3abef..6ed07d54 100644 --- a/core/sdk/src/clients/send_mode.rs +++ b/core/sdk/src/clients/send_mode.rs @@ -1,10 +1,16 @@ -use iggy_common::IggyDuration; +use std::sync::{atomic::{AtomicUsize, Ordering}, Arc}; -#[derive(Debug, Clone, Default)] +use iggy_common::{Identifier, IggyDuration, IggyError, IggyMessage, Partitioning}; +use tokio::task::JoinHandle; +use tracing::error; + +use super::producer::ProducerCore; + +#[derive(Debug, Clone, Default, PartialEq)] pub enum SendMode { #[default] Sync, - Background(BackgroundConfig), + Background, } #[derive(Debug, Clone)] @@ -25,3 +31,100 @@ pub struct BackgroundConfig { pub batch_size: Option<usize>, pub failure_mode: BackpressureMode, } + +pub struct shardMessage { + pub stream: Arc<Identifier>, + pub topic: Arc<Identifier>, + pub messages: Vec<IggyMessage>, + pub partitioning: Option<Arc<Partitioning>>, +} + +struct shard { + core: Arc<ProducerCore>, + tx: flume::Sender<shardMessage>, + _join_handle: JoinHandle<()>, +} + +impl shard { + fn new(id: usize, producer_core: Arc<ProducerCore>) -> Self { + let (tx, rx) = flume::bounded::<shardMessage>(10); // todo добавить размер в конфигурацию + let core = producer_core.clone(); + let handle = tokio::spawn(async move { + while let Ok(message) = rx.recv_async().await { + // todo поменять на match + core.send_internal(&message.stream, &message.topic, message.messages, message.partitioning).await.map_err(|e| { + error!("{e}"); + }).unwrap(); + } + }); + Self { + core: producer_core, + tx, + _join_handle: handle, + } + } + + async fn send( + &self, + message: shardMessage, + ) -> Result<(), IggyError> { + self.tx.send_async(message).await.map_err(|_| IggyError::BackgroundSendError) + } + + async fn send_timeout(&self, message: shardMessage, timeout: IggyDuration) -> Result<(), IggyError> { + match tokio::time::timeout(timeout.get_duration(), self.tx.send_async(message)).await { + Ok(Ok(())) => Ok(()), + Ok(Err(e)) => Err(IggyError::BackgroundSendTimeout), + Err(_) => Err(IggyError::BackgroundSendTimeout) + } + } + + fn send_with_fail(&self, message: shardMessage) -> Result<(), IggyError> { + self.tx.try_send(message).map_err(|_| IggyError::BackgroundSendError) + } +} + +pub struct Dispatcher { + config: Arc<BackgroundConfig>, + sender: flume::Sender<shardMessage>, + _join_handle: JoinHandle<()>, +} + +impl Dispatcher { + pub fn new(core: Arc<ProducerCore>, num_shards: usize, config: Arc<BackgroundConfig>) -> Self { + let mut shards = Vec::with_capacity(num_shards); + for i in 0..num_shards { + shards.push(shard::new(i, core.clone())); + } + + let (tx, rx) = flume::bounded::<shardMessage>(0); + let sent = AtomicUsize::new(0); + let inner_config = config.clone(); + let handle = tokio::spawn(async move { + loop { + if let Ok(msg) = rx.recv_async().await { + let ix = sent.fetch_add(1, Ordering::SeqCst) % num_shards; + let shard = shards.get(ix).unwrap(); + let result = match inner_config.failure_mode { + BackpressureMode::Block => shard.send(msg).await, + BackpressureMode::BlockWithTimeout(t) => shard.send_timeout(msg, t).await, + BackpressureMode::FailImmediately => shard.send_with_fail(msg), + }; + if let Err(e) = result { + // todo добавить канал для ошибок + error!("{}", e); + } + } + } + }); + Self { + config, + sender: tx, + _join_handle: handle, + } + } + + pub async fn dispatch(&self, msg: shardMessage) -> Result<(), flume::SendError<shardMessage>> { + self.sender.send_async(msg).await + } +}
