This is an automated email from the ASF dual-hosted git repository. bashirbekov pushed a commit to branch feat/add-background-send in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 85334f59ad52bbf1663d3e1a88676ff1e4189ffd Author: haze518 <[email protected]> AuthorDate: Sat May 24 09:22:23 2025 +0600 old variant --- Cargo.lock | 146 ++++++ core/common/src/error/iggy_error.rs | 8 + core/common/src/types/message/iggy_message.rs | 2 +- core/integration/Cargo.toml | 5 + core/integration/benches/send.rs | 102 ++++ core/integration/tests/examples/mod.rs | 1 + .../tests/examples/test_new_publisher.rs | 131 +++++ core/sdk/src/clients/client.rs | 1 + core/sdk/src/clients/mod.rs | 3 +- core/sdk/src/clients/producer.rs | 531 +++++++++++++++------ core/sdk/src/clients/producer_builder.rs | 27 +- core/sdk/src/clients/send_mode.rs | 27 ++ core/sdk/src/prelude.rs | 2 +- 13 files changed, 846 insertions(+), 140 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 96dee5e0..f5ed8db8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -343,6 +343,12 @@ dependencies = [ "libc", ] +[[package]] +name = "anes" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4b46cbb362ab8752921c97e041f5e366ee6297bd428a31275b9fcf1e380f7299" + [[package]] name = "anstream" version = "0.6.18" @@ -1260,6 +1266,12 @@ dependencies = [ "thiserror 2.0.12", ] +[[package]] +name = "cast" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5" + [[package]] name = "cc" version = "1.2.22" @@ -1327,6 +1339,33 @@ dependencies = [ "windows-link", ] +[[package]] +name = "ciborium" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42e69ffd6f0917f5c029256a24d0161db17cea3997d185db0d35926308770f0e" +dependencies = [ + "ciborium-io", + "ciborium-ll", + "serde", +] + +[[package]] +name = "ciborium-io" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05afea1e0a06c9be33d539b876f1ce3692f4afea2cb41f740e7743225ed1c757" + +[[package]] +name = "ciborium-ll" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57663b653d948a338bfb3eeba9bb2fd5fcfaecb9e199e87e1eda4d9e8b240fd9" +dependencies = [ + "ciborium-io", + "half", +] + [[package]] name = "cipher" version = "0.4.4" @@ -1620,6 +1659,40 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "criterion" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3bf7af66b0989381bd0be551bd7cc91912a655a58c6918420c9527b1fd8b4679" +dependencies = [ + "anes", + "cast", + "ciborium", + "clap", + "criterion-plot", + "itertools 0.13.0", + "num-traits", + "oorandom", + "plotters", + "rayon", + "regex", + "serde", + "serde_json", + "tinytemplate", + "tokio", + "walkdir", +] + +[[package]] +name = "criterion-plot" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6b50826342786a51a89e2da3a28f1c32b06e387201bc2d19791f622c673706b1" +dependencies = [ + "cast", + "itertools 0.10.5", +] + [[package]] name = "crossbeam" version = "0.8.4" @@ -3016,6 +3089,16 @@ dependencies = [ "tracing", ] +[[package]] +name = "half" +version = "2.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "459196ed295495a68f7d7fe1d84f6c4b7ff0e21fe3017b2f283c6fac3ad803c9" +dependencies = [ + "cfg-if", + "crunchy", +] + [[package]] name = "handlebars" version = "4.5.0" @@ -3672,6 +3755,7 @@ dependencies = [ "async-trait", "bytes", "chrono", + "criterion", "ctor", "derive_more 2.0.1", "env_logger", @@ -3719,6 +3803,15 @@ version = "1.70.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7943c866cc5cd64cbc25b2e01621d07fa8eb2a1a23160ee81ce38704e97b8ecf" +[[package]] +name = "itertools" +version = "0.10.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b0fd2260e829bddf4cb6ea802289de2f86d6a7a690192fbe91b3f46e0f2c8473" +dependencies = [ + "either", +] + [[package]] name = "itertools" version = "0.12.1" @@ -3728,6 +3821,15 @@ dependencies = [ "either", ] +[[package]] +name = "itertools" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "413ee7dfc52ee1a4949ceeb7dbc8a33f2d6c088194d9f922fb8318faf1f01186" +dependencies = [ + "either", +] + [[package]] name = "itertools" version = "0.14.0" @@ -4564,6 +4666,12 @@ version = "1.21.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "42f5e15c9953c5e4ccceeb2e7382a716482c34515315f7b03532b8b4e8393d2d" +[[package]] +name = "oorandom" +version = "11.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d6790f58c7ff633d8771f42965289203411a5e5c68388703c06e14f24770b41e" + [[package]] name = "opaque-debug" version = "0.3.1" @@ -4958,6 +5066,34 @@ version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7edddbd0b52d732b21ad9a5fab5c704c14cd949e5e9a1ec5929a24fded1b904c" +[[package]] +name = "plotters" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5aeb6f403d7a4911efb1e33402027fc44f29b5bf6def3effcc22d7bb75f2b747" +dependencies = [ + "num-traits", + "plotters-backend", + "plotters-svg", + "wasm-bindgen", + "web-sys", +] + +[[package]] +name = "plotters-backend" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df42e13c12958a16b3f7f4386b9ab1f3e7933914ecea48da7139435263a4172a" + +[[package]] +name = "plotters-svg" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51bae2ac328883f7acdfea3d66a7c35751187f870bc81f94563733a154d7a670" +dependencies = [ + "plotters-backend", +] + [[package]] name = "polonius-the-crab" version = "0.2.1" @@ -6647,6 +6783,16 @@ dependencies = [ "zerovec", ] +[[package]] +name = "tinytemplate" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be4d6b5f19ff7664e8c98d03e2139cb510db9b0a60b55f8e8709b689d939b6bc" +dependencies = [ + "serde", + "serde_json", +] + [[package]] name = "tinyvec" version = "1.9.0" diff --git a/core/common/src/error/iggy_error.rs b/core/common/src/error/iggy_error.rs index b8076e2e..5e812288 100644 --- a/core/common/src/error/iggy_error.rs +++ b/core/common/src/error/iggy_error.rs @@ -361,6 +361,14 @@ pub enum IggyError { TooSmallMessage(u32, u32) = 4037, #[error("Cannot sed messages due to client disconnection")] CannotSendMessagesDueToClientDisconnection = 4050, + #[error("Background send error")] + BackgroundSendError = 4051, + #[error("Background send timeout")] + BackgroundSendTimeout = 4052, + #[error("Background send buffer is full")] + BackgroundSendBufferFull = 4053, + #[error("Background worker disconnected")] + BackgroundWorkerDisconnected = 4054, #[error("Invalid offset: {0}")] InvalidOffset(u64) = 4100, #[error("Consumer group with ID: {0} for topic with ID: {1} was not found.")] diff --git a/core/common/src/types/message/iggy_message.rs b/core/common/src/types/message/iggy_message.rs index 54c3a841..ddd4076f 100644 --- a/core/common/src/types/message/iggy_message.rs +++ b/core/common/src/types/message/iggy_message.rs @@ -105,7 +105,7 @@ pub const MAX_USER_HEADERS_SIZE: u32 = 100 * 1000; /// .build() /// .unwrap(); /// ``` -#[derive(Debug, PartialEq)] +#[derive(Debug, PartialEq, Default)] pub struct IggyMessage { /// Message metadata pub header: IggyMessageHeader, diff --git a/core/integration/Cargo.toml b/core/integration/Cargo.toml index 46cc8847..1eaf83ce 100644 --- a/core/integration/Cargo.toml +++ b/core/integration/Cargo.toml @@ -32,6 +32,7 @@ assert_cmd = "2.0.17" async-trait = { workspace = true } bytes = { workspace = true } chrono = { workspace = true } +criterion = { version = "0.6.0", features = ["async", "async_tokio"] } ctor = "0.4.2" derive_more = { workspace = true } env_logger = { workspace = true } @@ -55,3 +56,7 @@ tracing-subscriber = { workspace = true } twox-hash = { workspace = true } uuid = { workspace = true } zip = { workspace = true } + +[[bench]] +name = "send" +harness = false diff --git a/core/integration/benches/send.rs b/core/integration/benches/send.rs new file mode 100644 index 00000000..1bb7b316 --- /dev/null +++ b/core/integration/benches/send.rs @@ -0,0 +1,102 @@ +use std::sync::Arc; + +use bytes::Bytes; +use criterion::{criterion_group, criterion_main, Criterion}; +use iggy::clients::send_mode::{BackgroundConfig, BackpressureMode, SendMode}; +use iggy::prelude::*; +use iggy::{clients::client::IggyClient, prelude::TcpClient}; +use iggy_common::TcpClientConfig; +use integration::test_server::{IpAddrKind, TestServer}; +use tokio::runtime::Runtime; + +fn bench_send(c: &mut Criterion) { + let rt = Runtime::new().unwrap(); + + // SETUP вне замера + let (mut producer, batch): (IggyProducer, Vec<IggyMessage>) = rt.block_on(async { + // старт сервера + let mut server = TestServer::new(None, true, None, IpAddrKind::V4); + server.start(); + + let tcp_client_config = TcpClientConfig { + server_address: server.get_raw_tcp_addr().unwrap(), + ..TcpClientConfig::default() + }; + + let client = Box::new(TcpClient::create(Arc::new(tcp_client_config)).unwrap()); + let client = IggyClient::create(client, None, None); + + client.connect().await.unwrap(); + client + .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD) + .await + .unwrap(); + client + .create_stream("sample-stream", Some(1)) + .await + .unwrap(); + client + .create_topic( + &1.try_into().unwrap(), + "sample-topic", + 1, + CompressionAlgorithm::default(), + None, + None, + IggyExpiry::NeverExpire, + MaxTopicSize::ServerDefault, + ) + .await + .unwrap(); + + let mut producer = client + .producer("1", "1") + .unwrap() + .batch_length(10) + .send_mode(SendMode::Background(BackgroundConfig { + max_in_flight: 10, + in_flight_timeout: None, + batch_size: None, + failure_mode: BackpressureMode::Block, + })) + .build(); + + producer.init().await.unwrap(); + + // заранее создаём батч + let mut messages = Vec::new(); + for _ in 0..10 { + let payload = Bytes::from(vec![0u8; 1024]); + let msg = IggyMessage::builder() + .payload(payload) + .build() + .unwrap(); + messages.push(msg); + } + + (producer, messages) + }); + + c.bench_function("producer.send(batch)", |b| { + b.to_async(&rt).iter(|| { + let batch = create_batch(); + producer.send(batch) + }); + }); +} + +fn create_batch() -> Vec<IggyMessage> { + let mut messages = Vec::new(); + for _ in 0..1 { + let payload = Bytes::from(vec![0u8; 1024]); + let msg = IggyMessage::builder() + .payload(payload) + .build() + .unwrap(); + messages.push(msg); + } + messages +} + +criterion_group!(benches, bench_send); +criterion_main!(benches); diff --git a/core/integration/tests/examples/mod.rs b/core/integration/tests/examples/mod.rs index 015eaa87..8d960e66 100644 --- a/core/integration/tests/examples/mod.rs +++ b/core/integration/tests/examples/mod.rs @@ -20,6 +20,7 @@ mod test_basic; mod test_getting_started; mod test_message_envelope; mod test_message_headers; +mod test_new_publisher; use assert_cmd::Command; use iggy::clients::client::IggyClient; diff --git a/core/integration/tests/examples/test_new_publisher.rs b/core/integration/tests/examples/test_new_publisher.rs new file mode 100644 index 00000000..771fac8f --- /dev/null +++ b/core/integration/tests/examples/test_new_publisher.rs @@ -0,0 +1,131 @@ +// temporary file, do not forget to delete! + +use std::str::FromStr; +use std::sync::Arc; +use std::time::Instant; + +use bytes::Bytes; +use futures::StreamExt; +use iggy::clients::send_mode::{BackgroundConfig, BackpressureMode, SendMode}; +use iggy::prelude::defaults::*; +use iggy::prelude::*; +use iggy::{clients::client::IggyClient, prelude::TcpClient}; +use iggy_common::TcpClientConfig; +use integration::test_server::{IpAddrKind, TestServer}; + +#[tokio::test] +async fn test_new_publisher() { + // new + let mut server = TestServer::new(None, true, None, IpAddrKind::V4); + server.start(); + + let tcp_client_config = TcpClientConfig { + server_address: server.get_raw_tcp_addr().unwrap(), + ..TcpClientConfig::default() + }; + let client = Box::new(TcpClient::create(Arc::new(tcp_client_config)).unwrap()); + let client = IggyClient::create(client, None, None); + + // setup + client.connect().await.unwrap(); + let ping_result = client.ping().await; + assert!(ping_result.is_ok(), "Failed to ping server"); + + client + .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD) + .await + .unwrap(); + client + .create_stream("sample-stream", Some(1)) + .await + .unwrap(); + client + .create_topic( + &1.try_into().unwrap(), + "sample-topic", + 1, + CompressionAlgorithm::default(), + None, + None, + IggyExpiry::NeverExpire, + MaxTopicSize::ServerDefault, + ) + .await + .unwrap(); + + let mut producer = client + .producer("1", "1") + .unwrap() + .batch_length(10) + .send_mode(SendMode::Sync) + // .send_mode(SendMode::Background(BackgroundConfig { + // max_in_flight: 10, + // in_flight_timeout: None, + // batch_size: None, + // failure_mode: BackpressureMode::Block, + // })) + .build(); + + producer.init().await.unwrap(); + + // produce + let mut send_batches = 0; + + let batch_limit = 10000; + let mut count = 0; + + let mut t = Vec::new(); + while send_batches < batch_limit { + let start = Instant::now(); + + let mut messages = Vec::new(); + for _ in 0..10 { + let payload = Bytes::from(vec![0u8; 1024]); + let msg = IggyMessage::builder() + .payload(payload) + .build() + .unwrap(); + messages.push(msg); + count += 1; + } + + producer.send(messages).await.unwrap(); + send_batches += 1; + + let duration = start.elapsed().as_millis(); + t.push(duration); + } + + // let mut consumer = client + // .consumer_group("some-consumer", "1", "1") + // .unwrap() + // .auto_commit(AutoCommit::When(AutoCommitWhen::PollingMessages)) + // .create_consumer_group_if_not_exists() + // .auto_join_consumer_group() + // .polling_strategy(PollingStrategy::next()) + // .poll_interval(IggyDuration::from_str("1ms").unwrap()) + // .batch_length(10) + // .build(); + + // consumer.init().await.unwrap(); + + // let mut consumed_batches = 0; + // while let Some(message) = consumer.next().await { + // if consumed_batches >= batch_limit { + // break + // } + + // if let Ok(message) = message { + // // println!("got message"); + // consumed_batches += 1; + // } else if let Err(error) = message { + // panic!("{}", error.to_string()); + // } + // } + + let total: u128 = t.iter().sum(); + let avg = total as f64 / t.len() as f64; + println!("Среднее время выполнения одного батча: {:.3} мс", avg); +} + +// sync: Время выполнения: 13.937269459s diff --git a/core/sdk/src/clients/client.rs b/core/sdk/src/clients/client.rs index 3e6e1c2b..6bbd224a 100644 --- a/core/sdk/src/clients/client.rs +++ b/core/sdk/src/clients/client.rs @@ -155,6 +155,7 @@ impl IggyClient { topic.to_owned(), self.encryptor.clone(), None, + )) } } diff --git a/core/sdk/src/clients/mod.rs b/core/sdk/src/clients/mod.rs index be854f1b..7067d2c2 100644 --- a/core/sdk/src/clients/mod.rs +++ b/core/sdk/src/clients/mod.rs @@ -32,6 +32,7 @@ pub mod consumer; pub mod consumer_builder; pub mod producer; pub mod producer_builder; +pub mod send_mode; const ORDERING: std::sync::atomic::Ordering = std::sync::atomic::Ordering::SeqCst; -const MAX_BATCH_SIZE: usize = 1000000; +const MAX_BATCH_LENGTH: usize = 1000000; diff --git a/core/sdk/src/clients/producer.rs b/core/sdk/src/clients/producer.rs index f3706e3a..c0457538 100644 --- a/core/sdk/src/clients/producer.rs +++ b/core/sdk/src/clients/producer.rs @@ -1,3 +1,4 @@ +use super::send_mode::{BackpressureMode, SendMode}; /* Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -15,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -use super::{MAX_BATCH_SIZE, ORDERING}; +use super::{MAX_BATCH_LENGTH, ORDERING}; use bytes::Bytes; use futures_util::StreamExt; @@ -25,12 +26,19 @@ use iggy_common::{ CompressionAlgorithm, DiagnosticEvent, EncryptorKind, IdKind, Identifier, IggyDuration, IggyError, IggyExpiry, IggyMessage, IggyTimestamp, MaxTopicSize, Partitioner, Partitioning, }; +use std::fmt::Debug; use std::sync::Arc; use std::sync::atomic::{AtomicBool, AtomicU64}; -use std::time::Duration; +use std::time::{Duration, Instant}; +use tokio::sync::Semaphore; +use tokio::task::JoinHandle; use tokio::time::{Interval, sleep}; use tracing::{error, info, trace, warn}; +pub trait ErrorCallback: Send + Sync + Debug { + fn call(&self, error: IggyError, messages: Vec<IggyMessage>); +} + unsafe impl Send for IggyProducer {} unsafe impl Sync for IggyProducer {} @@ -44,6 +52,7 @@ pub struct IggyProducer { topic_name: String, batch_length: Option<usize>, partitioning: Option<Arc<Partitioning>>, + send_mode: Arc<SendMode>, encryptor: Option<Arc<EncryptorKind>>, partitioner: Option<Arc<dyn Partitioner>>, linger_time_micros: u64, @@ -58,6 +67,11 @@ pub struct IggyProducer { last_sent_at: Arc<AtomicU64>, send_retries_count: Option<u32>, send_retries_interval: Option<IggyDuration>, + + _join_handle: Option<JoinHandle<()>>, + sema: Arc<Semaphore>, + sender: Option<Arc<flume::Sender<Vec<IggyMessage>>>>, + error_callback: Option<Arc<dyn ErrorCallback>>, } impl IggyProducer { @@ -72,7 +86,7 @@ impl IggyProducer { partitioning: Option<Partitioning>, encryptor: Option<Arc<EncryptorKind>>, partitioner: Option<Arc<dyn Partitioner>>, - interval: Option<IggyDuration>, + linger_time: Option<IggyDuration>, create_stream_if_not_exists: bool, create_topic_if_not_exists: bool, topic_partitions_count: u32, @@ -81,6 +95,8 @@ impl IggyProducer { topic_max_size: MaxTopicSize, send_retries_count: Option<u32>, send_retries_interval: Option<IggyDuration>, + send_mode: SendMode, + error_callback: Option<Arc<dyn ErrorCallback>>, ) -> Self { Self { initialized: false, @@ -92,9 +108,10 @@ impl IggyProducer { topic_name, batch_length, partitioning: partitioning.map(Arc::new), + send_mode: Arc::new(send_mode), encryptor, partitioner, - linger_time_micros: interval.map_or(0, |i| i.as_micros()), + linger_time_micros: linger_time.map_or(0, |i| i.as_micros()), create_stream_if_not_exists, create_topic_if_not_exists, topic_partitions_count, @@ -102,10 +119,14 @@ impl IggyProducer { topic_message_expiry, topic_max_size, default_partitioning: Arc::new(Partitioning::balanced()), - can_send_immediately: interval.is_none(), + can_send_immediately: linger_time.is_none(), last_sent_at: Arc::new(AtomicU64::new(0)), send_retries_count, send_retries_interval, + _join_handle: None, + sema: Arc::new(Semaphore::new(10)), + sender: None, + error_callback, } } @@ -179,8 +200,84 @@ impl IggyProducer { .await?; } + let (tx, rx) = flume::bounded::<Vec<IggyMessage>>(0); // todo задать какое-то значение + let stream_id = self.stream_id.clone(); + let topic_id = self.topic_id.clone(); + let partitioning = self.partitioning.clone(); + let default_partitioning = self.default_partitioning.clone(); + let partitioner = self.partitioner.clone(); + let client = self.client.clone(); + let send_retries_count = self.send_retries_count.clone(); + let send_retries_interval = self.send_retries_interval.clone(); + let can_send = self.can_send.clone(); + let sema = self.sema.clone(); + + let handle = tokio::spawn(async move { + while let Ok(mut batch) = rx.recv_async().await { + // let sema = sema.clone(); + // let partitioner = partitioner.clone(); + // let stream_id = stream_id.clone(); + // let topic_id = topic_id.clone(); + // let partitioning = partitioning.clone(); + // let default_partitioning = default_partitioning.clone(); + // let client = client.clone(); + // let can_send = can_send.clone(); + + let partitioning = get_partitioning( + &partitioner, + &stream_id, + &topic_id, + &batch, + partitioning.clone(), + &partitioning, + default_partitioning.clone(), + ) + .unwrap(); + try_send_messages( + client.clone(), + send_retries_count, + send_retries_interval, + can_send.clone(), + &stream_id, + &topic_id, + &partitioning, + &mut batch, + ) + .await + .unwrap(); + // tokio::spawn(async move { + // let _permit = sema.acquire().await.unwrap(); + // let partitioning = get_partitioning( + // &partitioner, + // &stream_id, + // &topic_id, + // &batch, + // partitioning.clone(), + // &partitioning, + // default_partitioning.clone(), + // ) + // .unwrap(); + // try_send_messages( + // client.clone(), + // send_retries_count, + // send_retries_interval, + // can_send.clone(), + // &stream_id, + // &topic_id, + // &partitioning, + // &mut batch, + // ).await.unwrap(); + // }); + } + }); + self._join_handle = Some(handle); + self.sender = Some(Arc::new(tx)); self.initialized = true; - info!("Producer has been initialized for stream: {stream_id} and topic: {topic_id}."); + info!( + "Producer has been initialized for stream: {} and topic: {}.", + self.stream_id.clone(), + self.topic_id.clone() + ); Ok(()) } @@ -242,6 +339,12 @@ impl IggyProducer { .await } + // todod добавить канал для считывания + pub async fn send_async(&self, messages: Vec<IggyMessage>) { + let sender = self.sender.clone(); + sender.unwrap().send_async(messages).await.unwrap(); + } + pub async fn send_one(&self, message: IggyMessage) -> Result<(), IggyError> { self.send(vec![message]).await } @@ -293,6 +396,7 @@ impl IggyProducer { .await } + // TODO add batch_size async fn send_buffered( &self, stream: Arc<Identifier>, @@ -301,8 +405,18 @@ impl IggyProducer { partitioning: Option<Arc<Partitioning>>, ) -> Result<(), IggyError> { self.encrypt_messages(&mut messages)?; - let partitioning = self.get_partitioning(&stream, &topic, &messages, partitioning)?; - let batch_length = self.batch_length.unwrap_or(MAX_BATCH_SIZE); + let default_partitioning = self.default_partitioning.clone(); + let partitioner = self.partitioner.clone(); + let partitioning = get_partitioning( + &partitioner, + &stream, + &topic, + &messages, + partitioning, + &self.partitioning, + default_partitioning, + )?; + let batch_length = self.batch_length.unwrap_or(MAX_BATCH_LENGTH); let batches = messages.chunks_mut(batch_length); let mut current_batch = 1; let batches_count = batches.len(); @@ -321,8 +435,30 @@ impl IggyProducer { ); self.last_sent_at .store(IggyTimestamp::now().into(), ORDERING); - self.try_send_messages(&self.stream_id, &self.topic_id, &partitioning, batch) - .await?; + + let client = self.client.clone(); + let send_retries_count = self.send_retries_count.clone(); + let send_retries_interval = self.send_retries_interval.clone(); + let can_send = self.can_send.clone(); + + let sender = self.sender.clone(); + let send_mode = self.send_mode.clone(); + try_send_messages_new( + BackpressureMode::Block, + sender, + self.error_callback.clone(), + send_mode, + client, + send_retries_count, + send_retries_interval, + can_send, + &self.stream_id, + &self.topic_id, + &partitioning, + batch, + ) + .await?; + trace!("Sent {messages_count} messages ({current_batch}/{batches_count} batch(es))."); current_batch += 1; } @@ -338,21 +474,56 @@ impl IggyProducer { ) -> Result<(), IggyError> { trace!("No batch size specified, sending messages immediately."); self.encrypt_messages(&mut messages)?; - let partitioning = self.get_partitioning(stream, topic, &messages, partitioning)?; - let batch_length = self.batch_length.unwrap_or(MAX_BATCH_SIZE); + let default_partitioning = self.default_partitioning.clone(); + let partitioner = self.partitioner.clone(); + let client = self.client.clone(); + let send_retries_count = self.send_retries_count.clone(); + let send_retries_interval = self.send_retries_interval.clone(); + let can_send = self.can_send.clone(); + + let partitioning = get_partitioning( + &partitioner, + &stream, + &topic, + &messages, + partitioning, + &self.partitioning, + default_partitioning, + )?; + let batch_length = self.batch_length.unwrap_or(MAX_BATCH_LENGTH); if messages.len() <= batch_length { self.last_sent_at .store(IggyTimestamp::now().into(), ORDERING); - self.try_send_messages(stream, topic, &partitioning, &mut messages) - .await?; + try_send_messages( + client.clone(), + send_retries_count, + send_retries_interval, + can_send, + stream, + topic, + &partitioning, + &mut messages, + ) + .await?; return Ok(()); } for batch in messages.chunks_mut(batch_length) { + let client = client.clone(); + let can_send = can_send.clone(); self.last_sent_at .store(IggyTimestamp::now().into(), ORDERING); - self.try_send_messages(stream, topic, &partitioning, batch) - .await?; + try_send_messages( + client, + send_retries_count, + send_retries_interval, + can_send, + stream, + topic, + &partitioning, + batch, + ) + .await?; } Ok(()) } @@ -385,144 +556,236 @@ impl IggyProducer { } Ok(()) } +} - async fn try_send_messages( - &self, - stream: &Identifier, - topic: &Identifier, - partitioning: &Arc<Partitioning>, - messages: &mut [IggyMessage], - ) -> Result<(), IggyError> { - let client = self.client.read().await; - let Some(max_retries) = self.send_retries_count else { - return client - .send_messages(stream, topic, partitioning, messages) - .await; - }; +fn get_partitioning( + partitioner: &Option<Arc<dyn Partitioner>>, + stream: &Identifier, + topic: &Identifier, + messages: &[IggyMessage], + partitioning: Option<Arc<Partitioning>>, + producer_partitioning: &Option<Arc<Partitioning>>, + default_partitioning: Arc<Partitioning>, +) -> Result<Arc<Partitioning>, IggyError> { + if let Some(partitioner) = partitioner { + trace!("Calculating partition id using custom partitioner."); + let partition_id = partitioner.calculate_partition_id(stream, topic, messages)?; + Ok(Arc::new(Partitioning::partition_id(partition_id))) + } else { + trace!("Using the provided partitioning."); + Ok(partitioning.unwrap_or_else(|| { + producer_partitioning + .clone() + .unwrap_or_else(|| default_partitioning.clone()) + })) + } +} - if max_retries == 0 { - return client - .send_messages(stream, topic, partitioning, messages) - .await; +async fn wait_until_connected( + can_send: Arc<AtomicBool>, + max_retries: u32, + stream: &Identifier, + topic: &Identifier, + timer: &mut Option<Interval>, +) -> Result<(), IggyError> { + let mut retries = 0; + while !can_send.load(ORDERING) { + retries += 1; + if retries > max_retries { + error!( + "Failed to send messages to topic: {topic}, stream: {stream} \ + after {max_retries} retries. Client is disconnected." + ); + return Err(IggyError::CannotSendMessagesDueToClientDisconnection); } - let mut timer = if let Some(interval) = self.send_retries_interval { - let mut timer = tokio::time::interval(interval.get_duration()); - timer.tick().await; - Some(timer) - } else { - None - }; + error!( + "Trying to send messages to topic: {topic}, stream: {stream} \ + but the client is disconnected. Retrying {retries}/{max_retries}..." + ); - self.wait_until_connected(max_retries, stream, topic, &mut timer) - .await?; - self.send_with_retries( - max_retries, - stream, - topic, - partitioning, - messages, - &mut timer, - ) - .await + if let Some(timer) = timer.as_mut() { + trace!( + "Waiting for the next retry to send messages to topic: {topic}, \ + stream: {stream} for disconnected client..." + ); + timer.tick().await; + } } + Ok(()) +} + +async fn send_with_retries( + client: Arc<IggySharedMut<Box<dyn Client>>>, + max_retries: u32, + stream: &Identifier, + topic: &Identifier, + partitioning: &Arc<Partitioning>, + messages: &mut [IggyMessage], + timer: &mut Option<Interval>, +) -> Result<(), IggyError> { + let client = client.read().await; + let mut retries = 0; + loop { + match client + .send_messages(stream, topic, partitioning, messages) + .await + { + Ok(_) => return Ok(()), + Err(error) => { + retries += 1; + if retries > max_retries { + error!( + "Failed to send messages to topic: {topic}, stream: {stream} \ + after {max_retries} retries. {error}." + ); + return Err(error); + } - async fn wait_until_connected( - &self, - max_retries: u32, - stream: &Identifier, - topic: &Identifier, - timer: &mut Option<Interval>, - ) -> Result<(), IggyError> { - let mut retries = 0; - while !self.can_send.load(ORDERING) { - retries += 1; - if retries > max_retries { error!( - "Failed to send messages to topic: {topic}, stream: {stream} \ - after {max_retries} retries. Client is disconnected." + "Failed to send messages to topic: {topic}, stream: {stream}. \ + {error} Retrying {retries}/{max_retries}..." ); - return Err(IggyError::CannotSendMessagesDueToClientDisconnection); - } - - error!( - "Trying to send messages to topic: {topic}, stream: {stream} \ - but the client is disconnected. Retrying {retries}/{max_retries}..." - ); - if let Some(timer) = timer.as_mut() { - trace!( - "Waiting for the next retry to send messages to topic: {topic}, \ - stream: {stream} for disconnected client..." - ); - timer.tick().await; + if let Some(t) = timer.as_mut() { + trace!( + "Waiting for the next retry to send messages to topic: {topic}, \ + stream: {stream}..." + ); + t.tick().await; + } } } - Ok(()) } +} - async fn send_with_retries( - &self, - max_retries: u32, - stream: &Identifier, - topic: &Identifier, - partitioning: &Arc<Partitioning>, - messages: &mut [IggyMessage], - timer: &mut Option<Interval>, - ) -> Result<(), IggyError> { - let client = self.client.read().await; - let mut retries = 0; - loop { - match client - .send_messages(stream, topic, partitioning, messages) - .await - { - Ok(_) => return Ok(()), - Err(error) => { - retries += 1; - if retries > max_retries { - error!( - "Failed to send messages to topic: {topic}, stream: {stream} \ - after {max_retries} retries. {error}." - ); - return Err(error); +async fn try_send_messages( + client: Arc<IggySharedMut<Box<dyn Client>>>, + send_retries_count: Option<u32>, + send_retries_interval: Option<IggyDuration>, + can_send: Arc<AtomicBool>, + stream: &Identifier, + topic: &Identifier, + partitioning: &Arc<Partitioning>, + messages: &mut [IggyMessage], +) -> Result<(), IggyError> { + let rw_client = client.read().await; + let Some(max_retries) = send_retries_count else { + return rw_client + .send_messages(stream, topic, partitioning, messages) + .await; + }; + + if max_retries == 0 { + return rw_client + .send_messages(stream, topic, partitioning, messages) + .await; + } + + let mut timer = if let Some(interval) = send_retries_interval { + let mut timer = tokio::time::interval(interval.get_duration()); + timer.tick().await; + Some(timer) + } else { + None + }; + + wait_until_connected(can_send.clone(), max_retries, stream, topic, &mut timer).await?; + send_with_retries( + client.clone(), + max_retries, + stream, + topic, + partitioning, + messages, + &mut timer, + ) + .await +} + +async fn try_send_messages_new( + backpressure_mode: BackpressureMode, + sender: Option<Arc<flume::Sender<Vec<IggyMessage>>>>, + error_callback: Option<Arc<dyn ErrorCallback>>, + send_mode: Arc<SendMode>, + client: Arc<IggySharedMut<Box<dyn Client>>>, + send_retries_count: Option<u32>, + send_retries_interval: Option<IggyDuration>, + can_send: Arc<AtomicBool>, + stream: &Identifier, + topic: &Identifier, + partitioning: &Arc<Partitioning>, + messages: &mut [IggyMessage], +) -> Result<(), IggyError> { + match &*send_mode { + SendMode::Sync => { + try_send_messages( + client, + send_retries_count, + send_retries_interval, + can_send, + stream, + topic, + partitioning, + messages, + ) + .await + } + + SendMode::Background(_cfg) => { + let out: Vec<IggyMessage> = messages.iter_mut().map(std::mem::take).collect(); + let sender = sender.as_ref().expect("init() must set sender").clone(); + + match sender.try_send(out) { + Ok(()) => Ok(()), + + Err(flume::TrySendError::Full(out)) => match backpressure_mode { + BackpressureMode::Block => match sender.send_async(out).await { + Ok(()) => Ok(()), + Err(flume::SendError(out)) => { + if let Some(cb) = error_callback { + cb.call(IggyError::BackgroundSendError, out); + } + Err(IggyError::BackgroundSendError) + } + }, + + BackpressureMode::BlockWithTimeout(t) => { + match tokio::time::timeout(t.get_duration(), sender.send_async(out)).await { + Ok(Ok(())) => Ok(()), + + Ok(Err(flume::SendError(out))) => { + if let Some(cb) = error_callback { + cb.call(IggyError::BackgroundSendTimeout, out); + } + Err(IggyError::BackgroundSendTimeout) + } + + Err(_) => { + if let Some(cb) = &error_callback { + cb.call(IggyError::BackgroundSendTimeout, vec![]); + } + Err(IggyError::BackgroundSendTimeout) + } + } } - error!( - "Failed to send messages to topic: {topic}, stream: {stream}. \ - {error} Retrying {retries}/{max_retries}..." - ); + BackpressureMode::FailImmediately => { + if let Some(cb) = &error_callback { + cb.call(IggyError::BackgroundSendBufferFull, out); + } + Err(IggyError::BackgroundSendBufferFull) + } + }, - if let Some(t) = timer.as_mut() { - trace!( - "Waiting for the next retry to send messages to topic: {topic}, \ - stream: {stream}..." - ); - t.tick().await; + Err(flume::TrySendError::Disconnected(out)) => { + if let Some(cb) = &error_callback { + cb.call(IggyError::BackgroundWorkerDisconnected, out); } + error!("Background worker has shut down."); + Err(IggyError::BackgroundWorkerDisconnected) } } } } - - fn get_partitioning( - &self, - stream: &Identifier, - topic: &Identifier, - messages: &[IggyMessage], - partitioning: Option<Arc<Partitioning>>, - ) -> Result<Arc<Partitioning>, IggyError> { - if let Some(partitioner) = &self.partitioner { - trace!("Calculating partition id using custom partitioner."); - let partition_id = partitioner.calculate_partition_id(stream, topic, messages)?; - Ok(Arc::new(Partitioning::partition_id(partition_id))) - } else { - trace!("Using the provided partitioning."); - Ok(partitioning.unwrap_or_else(|| { - self.partitioning - .clone() - .unwrap_or_else(|| self.default_partitioning.clone()) - })) - } - } } diff --git a/core/sdk/src/clients/producer_builder.rs b/core/sdk/src/clients/producer_builder.rs index 76ecfeea..4d9590f4 100644 --- a/core/sdk/src/clients/producer_builder.rs +++ b/core/sdk/src/clients/producer_builder.rs @@ -15,8 +15,9 @@ // specific language governing permissions and limitations // under the License. -use super::MAX_BATCH_SIZE; -use crate::prelude::IggyProducer; +use super::send_mode::{self, SendMode}; +use super::MAX_BATCH_LENGTH; +use crate::prelude::{IggyProducer, ErrorCallback}; use iggy_binary_protocol::Client; use iggy_common::locking::IggySharedMut; use iggy_common::{ @@ -36,6 +37,7 @@ pub struct IggyProducerBuilder { encryptor: Option<Arc<EncryptorKind>>, partitioner: Option<Arc<dyn Partitioner>>, linger_time: Option<IggyDuration>, + send_mode: SendMode, create_stream_if_not_exists: bool, create_topic_if_not_exists: bool, topic_partitions_count: u32, @@ -44,6 +46,7 @@ pub struct IggyProducerBuilder { send_retries_interval: Option<IggyDuration>, topic_message_expiry: IggyExpiry, topic_max_size: MaxTopicSize, + error_callback: Option<Arc<dyn ErrorCallback>> } impl IggyProducerBuilder { @@ -67,6 +70,7 @@ impl IggyProducerBuilder { partitioning: None, encryptor, partitioner, + send_mode: SendMode::default(), linger_time: Some(IggyDuration::from(1000)), create_stream_if_not_exists: true, create_topic_if_not_exists: true, @@ -76,6 +80,7 @@ impl IggyProducerBuilder { topic_max_size: MaxTopicSize::ServerDefault, send_retries_count: Some(3), send_retries_interval: Some(IggyDuration::ONE_SECOND), + error_callback: None, } } @@ -95,7 +100,7 @@ impl IggyProducerBuilder { batch_length: if batch_length == 0 { None } else { - Some(batch_length.min(MAX_BATCH_SIZE as u32) as usize) + Some(batch_length.min(MAX_BATCH_LENGTH as u32) as usize) }, ..self } @@ -125,6 +130,20 @@ impl IggyProducerBuilder { } } + pub fn send_mode(self, send_mode: SendMode) -> Self { + Self { + send_mode, + ..self + } + } + + pub fn error_callback(self, cb: Arc<dyn ErrorCallback>) -> Self { + Self { + error_callback: Some(cb), + ..self + } + } + /// Sets the encryptor for encrypting the messages' payloads. pub fn encryptor(self, encryptor: Arc<EncryptorKind>) -> Self { Self { @@ -249,6 +268,8 @@ impl IggyProducerBuilder { self.topic_max_size, self.send_retries_count, self.send_retries_interval, + self.send_mode, + self.error_callback, ) } } diff --git a/core/sdk/src/clients/send_mode.rs b/core/sdk/src/clients/send_mode.rs new file mode 100644 index 00000000..98b3abef --- /dev/null +++ b/core/sdk/src/clients/send_mode.rs @@ -0,0 +1,27 @@ +use iggy_common::IggyDuration; + +#[derive(Debug, Clone, Default)] +pub enum SendMode { + #[default] + Sync, + Background(BackgroundConfig), +} + +#[derive(Debug, Clone)] +/// Determines how the `send_messages` API should behave when problem is encountered +pub enum BackpressureMode { + /// Block until the send succeeds + Block, + /// Block with a timeout, after which the send fails + BlockWithTimeout(IggyDuration), + /// Fail immediately without retrying + FailImmediately, +} + +#[derive(Debug, Clone)] +pub struct BackgroundConfig { + pub max_in_flight: usize, + pub in_flight_timeout: Option<IggyDuration>, + pub batch_size: Option<usize>, + pub failure_mode: BackpressureMode, +} diff --git a/core/sdk/src/prelude.rs b/core/sdk/src/prelude.rs index 3d87cc82..9b13f218 100644 --- a/core/sdk/src/prelude.rs +++ b/core/sdk/src/prelude.rs @@ -35,7 +35,7 @@ pub use crate::clients::consumer::{ AutoCommit, AutoCommitAfter, AutoCommitWhen, IggyConsumer, ReceivedMessage, }; pub use crate::clients::consumer_builder::IggyConsumerBuilder; -pub use crate::clients::producer::IggyProducer; +pub use crate::clients::producer::{IggyProducer, ErrorCallback}; pub use crate::clients::producer_builder::IggyProducerBuilder; pub use crate::consumer_ext::IggyConsumerMessageExt; pub use crate::stream_builder::IggyConsumerConfig;
