This is an automated email from the ASF dual-hosted git repository. bashirbekov pushed a commit to branch feat/add-background-send in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 05468f8a173802c4b8984179106c0e24723f5505 Author: haze518 <[email protected]> AuthorDate: Tue Jun 3 22:07:43 2025 +0600 del --- .../tests/examples/test_new_publisher.rs | 69 +++--- core/sdk/src/clients/mod.rs | 3 + core/sdk/src/clients/producer.rs | 72 +----- core/sdk/src/clients/producer_builder.rs | 25 +- core/sdk/src/clients/producer_config.rs | 52 ++++ core/sdk/src/clients/producer_dispatcher.rs | 266 +++------------------ core/sdk/src/clients/producer_error_callback.rs | 70 ++++++ core/sdk/src/clients/producer_sharding.rs | 200 ++++++++++++++++ 8 files changed, 403 insertions(+), 354 deletions(-) diff --git a/core/integration/tests/examples/test_new_publisher.rs b/core/integration/tests/examples/test_new_publisher.rs index af3ba077..2c0e0e84 100644 --- a/core/integration/tests/examples/test_new_publisher.rs +++ b/core/integration/tests/examples/test_new_publisher.rs @@ -43,6 +43,7 @@ async fn test_new_publisher() { let producer = client .producer("1", "1") .unwrap() + // .background(|b| b.batch_length(10)) .sync(|b| b.batch_length(10)) // .send_mode(SendMode::Background) .build(); @@ -50,7 +51,7 @@ async fn test_new_publisher() { producer.init().await.unwrap(); let mut t = Vec::new(); - let batches_to_send = 10_000; + let batches_to_send = 10_00; let messages_per_batch = 10; let total_expected = batches_to_send * messages_per_batch; @@ -71,37 +72,37 @@ async fn test_new_publisher() { t.push(start.elapsed().as_millis()); } - let mut consumer = client - .consumer_group("some-consumer", "1", "1") - .unwrap() - .auto_commit(AutoCommit::When(AutoCommitWhen::PollingMessages)) - .create_consumer_group_if_not_exists() - .auto_join_consumer_group() - .polling_strategy(PollingStrategy::next()) - .poll_interval(IggyDuration::from_str("1ms").unwrap()) - .batch_length(10) - .build(); - - consumer.init().await.unwrap(); - - let mut received = 0; - while let Some(msg) = consumer.next().await { - match msg { - Ok(_) => { - received += 1; - if received >= total_expected { - break; - } - } - Err(e) => panic!("Consumer error: {}", e), - } - } - - assert_eq!( - received, total_expected, - "Not all messages received: got {}, expected {}", - received, total_expected - ); + // let mut consumer = client + // .consumer_group("some-consumer", "1", "1") + // .unwrap() + // .auto_commit(AutoCommit::When(AutoCommitWhen::PollingMessages)) + // .create_consumer_group_if_not_exists() + // .auto_join_consumer_group() + // .polling_strategy(PollingStrategy::next()) + // .poll_interval(IggyDuration::from_str("1ms").unwrap()) + // .batch_length(10) + // .build(); + + // consumer.init().await.unwrap(); + + // let mut received = 0; + // while let Some(msg) = consumer.next().await { + // match msg { + // Ok(_) => { + // received += 1; + // if received >= total_expected { + // break; + // } + // } + // Err(e) => panic!("Consumer error: {}", e), + // } + // } + + // assert_eq!( + // received, total_expected, + // "Not all messages received: got {}, expected {}", + // received, total_expected + // ); let total: u128 = t.iter().sum(); let avg = total as f64 / t.len() as f64; @@ -111,3 +112,7 @@ async fn test_new_publisher() { // sync: avg send: 1.561ms; overall: 46.71s // async: avg send: 0.356ms; overall: 28.67s + +// master sync: 44s +// new sync: 44.3s +// new async: 43s diff --git a/core/sdk/src/clients/mod.rs b/core/sdk/src/clients/mod.rs index 96916967..f46bc2a4 100644 --- a/core/sdk/src/clients/mod.rs +++ b/core/sdk/src/clients/mod.rs @@ -33,6 +33,9 @@ pub mod consumer_builder; pub mod producer; pub mod producer_builder; pub mod producer_dispatcher; +pub mod producer_config; +pub mod producer_error_callback; +pub mod producer_sharding; const ORDERING: std::sync::atomic::Ordering = std::sync::atomic::Ordering::SeqCst; const MAX_BATCH_LENGTH: usize = 1000000; diff --git a/core/sdk/src/clients/producer.rs b/core/sdk/src/clients/producer.rs index b2014d9f..88a44204 100644 --- a/core/sdk/src/clients/producer.rs +++ b/core/sdk/src/clients/producer.rs @@ -1,4 +1,3 @@ -use super::producer_builder::{SendMode, SyncConfig}; /* Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -16,9 +15,11 @@ use super::producer_builder::{SendMode, SyncConfig}; * specific language governing permissions and limitations * under the License. */ -use super::{MAX_BATCH_LENGTH, ORDERING}; +use super::ORDERING; -use super::producer_dispatcher::{BackgroundConfig, BalancedSharding, ProducerDispatcher, Sharding}; +use crate::clients::producer_builder::SendMode; +use crate::clients::producer_config::SyncConfig; +use crate::clients::producer_dispatcher::ProducerDispatcher; use bytes::Bytes; use futures_util::StreamExt; use iggy_binary_protocol::Client; @@ -27,8 +28,6 @@ use iggy_common::{ CompressionAlgorithm, DiagnosticEvent, EncryptorKind, IdKind, Identifier, IggyDuration, IggyError, IggyExpiry, IggyMessage, IggyTimestamp, MaxTopicSize, Partitioner, Partitioning, }; -use std::fmt::Debug; -use std::pin::Pin; use std::sync::Arc; use std::sync::atomic::Ordering; use std::sync::atomic::{AtomicBool, AtomicU64}; @@ -36,20 +35,6 @@ use std::time::Duration; use tokio::time::{Interval, sleep}; use tracing::{error, info, trace, warn}; -pub trait SendStrategy: Send + Sync + 'static { - fn send_batch( - &self, - stream: &Identifier, - topic: &Identifier, - msgs: Vec<IggyMessage>, - partitioning: Option<Arc<Partitioning>>, - ) -> impl Future<Output = Result<(), IggyError>> + Send; -} - -pub trait AsyncSendStrategy: SendStrategy { - fn flush(&self) -> Result<(), IggyError>; -} - pub struct ProducerCore { initialized: AtomicBool, can_send: Arc<AtomicBool>, @@ -217,7 +202,7 @@ impl ProducerCore { Some(t) => t.as_micros(), None => 0 }; - if !cfg.can_send_immediately && linger_time_micros > 0 { + if linger_time_micros > 0 { Self::wait_before_sending( linger_time_micros, self.last_sent_at.load(ORDERING), @@ -429,53 +414,6 @@ impl ProducerCore { } } -pub struct ErrorCtx { - pub cause: String, - pub stream: Arc<Identifier>, - pub topic: Arc<Identifier>, - pub partitioning: Option<Arc<Partitioning>>, - pub messages: Arc<Vec<IggyMessage>>, -} - -pub trait ErrorCallback: Send + Sync + Debug + 'static { - fn call(&self, ctx: ErrorCtx) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>>; -} - -pub struct LogErrorCallback; - -impl Default for LogErrorCallback { - fn default() -> Self { - Self - } -} - -impl std::fmt::Debug for LogErrorCallback { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("LogErrorCallback").finish() - } -} - -impl ErrorCallback for LogErrorCallback { - fn call(&self, ctx: ErrorCtx) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> { - Box::pin(async move { - let partitioning = ctx - .partitioning - .as_ref() - .map(|p| format!("{:?}", p)) - .unwrap_or_else(|| "None".to_string()); - - error!( - cause = ctx.cause, - stream = %ctx.stream, - topic = %ctx.topic, - partitioning = %partitioning, - num_messages = ctx.messages.len(), - "Failed to send messages in background task", - ); - }) - } -} - unsafe impl Send for IggyProducer {} unsafe impl Sync for IggyProducer {} diff --git a/core/sdk/src/clients/producer_builder.rs b/core/sdk/src/clients/producer_builder.rs index f92a5da1..6f3e001b 100644 --- a/core/sdk/src/clients/producer_builder.rs +++ b/core/sdk/src/clients/producer_builder.rs @@ -16,8 +16,9 @@ // under the License. use super::MAX_BATCH_LENGTH; -use super::producer::LogErrorCallback; -use super::producer_dispatcher::{BackgroundConfig, BackpressureMode, BalancedSharding, Sharding}; +use crate::clients::producer_config::{BackgroundConfig, BackpressureMode, SyncConfig}; +use crate::clients::producer_error_callback::LogErrorCallback; +use crate::clients::producer_sharding::{BalancedSharding, Sharding}; use crate::prelude::{ErrorCallback, IggyProducer}; use iggy_binary_protocol::Client; use iggy_common::locking::IggySharedMut; @@ -27,17 +28,8 @@ use iggy_common::{ }; use std::sync::Arc; -// TODO move here BackgroundConfig - -#[derive(Clone)] -pub struct SyncConfig { - pub batch_length: usize, - pub linger_time: Option<IggyDuration>, - pub can_send_immediately: bool, -} - pub struct BackgroundBuilder { - max_in_flight: Option<usize>, // TODO rename to shards_count + num_shards: Option<usize>, batch_size: Option<usize>, batch_length: Option<usize>, failure_mode: Option<BackpressureMode>, @@ -51,7 +43,7 @@ pub struct BackgroundBuilder { impl Default for BackgroundBuilder { fn default() -> Self { BackgroundBuilder { - max_in_flight: Some(default_shard_count()), + num_shards: Some(default_shard_count()), sharding: Box::new(BalancedSharding::default()), error_callback: Box::new(LogErrorCallback::default()), batch_size: Some(1_048_576), @@ -100,9 +92,9 @@ impl BackgroundBuilder { } } - pub fn max_in_flight(self, value: usize) -> Self { + pub fn num_shards(self, value: usize) -> Self { Self { - max_in_flight: Some(value), + num_shards: Some(value), ..self } } @@ -123,7 +115,7 @@ impl BackgroundBuilder { pub fn build(self) -> BackgroundConfig { BackgroundConfig { - max_in_flight: self.max_in_flight.unwrap_or(8), + num_shards: self.num_shards.unwrap_or(8), batch_size: self.batch_size, batch_length: self.batch_length, failure_mode: self.failure_mode.unwrap_or(BackpressureMode::Block), @@ -190,7 +182,6 @@ impl SyncBuilder { SyncConfig { batch_length: self.batch_length.unwrap_or(MAX_BATCH_LENGTH), linger_time: self.linger_time, - can_send_immediately: self.linger_time.is_some(), } } } diff --git a/core/sdk/src/clients/producer_config.rs b/core/sdk/src/clients/producer_config.rs new file mode 100644 index 00000000..0865a05c --- /dev/null +++ b/core/sdk/src/clients/producer_config.rs @@ -0,0 +1,52 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +use std::sync::Arc; + +use iggy_common::{IggyByteSize, IggyDuration}; + +use crate::clients::producer_sharding::Sharding; +use crate::prelude::ErrorCallback; + +#[derive(Debug, Clone)] +/// Determines how the `send_messages` API should behave when problem is encountered +pub enum BackpressureMode { + /// Block until the send succeeds + Block, + /// Block with a timeout, after which the send fails + BlockWithTimeout(IggyDuration), + /// Fail immediately without retrying + FailImmediately, +} + +#[derive(Debug)] +pub struct BackgroundConfig { + pub num_shards: usize, + pub batch_size: Option<usize>, + pub batch_length: Option<usize>, + pub failure_mode: BackpressureMode, + pub max_buffer_size: Option<IggyByteSize>, + pub linger_time: IggyDuration, + pub error_callback: Arc<Box<dyn ErrorCallback + Send + Sync>>, + pub sharding: Box<dyn Sharding + Send + Sync>, +} + +#[derive(Clone)] +pub struct SyncConfig { + pub batch_length: usize, + pub linger_time: Option<IggyDuration>, +} diff --git a/core/sdk/src/clients/producer_dispatcher.rs b/core/sdk/src/clients/producer_dispatcher.rs index 66d811fa..0cc38aa2 100644 --- a/core/sdk/src/clients/producer_dispatcher.rs +++ b/core/sdk/src/clients/producer_dispatcher.rs @@ -1,243 +1,34 @@ -use std::{ - sync::{ - Arc, - atomic::{AtomicBool, AtomicUsize, Ordering}, - }, -}; - +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +use crate::clients::producer::ProducerCore; +use crate::clients::producer_config::{BackgroundConfig, BackpressureMode}; +use crate::clients::producer_error_callback::ErrorCtx; +use crate::clients::producer_sharding::{Shard, ShardMessage}; +use std::sync::Arc; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use futures::FutureExt; use iggy_common::{ - Identifier, IggyByteSize, IggyDuration, IggyError, IggyMessage, Partitioning, Sizeable, + Identifier, IggyError, IggyMessage, Partitioning, Sizeable, }; -use tokio::{sync::Notify, task::JoinHandle}; -use tracing::error; -use super::producer::{ErrorCtx, ProducerCore}; - -use crate::prelude::ErrorCallback; - -#[derive(Debug, Clone)] -/// Determines how the `send_messages` API should behave when problem is encountered -pub enum BackpressureMode { - /// Block until the send succeeds - Block, - /// Block with a timeout, after which the send fails - BlockWithTimeout(IggyDuration), - /// Fail immediately without retrying - FailImmediately, -} - -#[derive(Debug)] -pub struct BackgroundConfig { - pub max_in_flight: usize, - pub batch_size: Option<usize>, - pub batch_length: Option<usize>, - pub failure_mode: BackpressureMode, - pub max_buffer_size: Option<IggyByteSize>, - pub linger_time: IggyDuration, - pub error_callback: Arc<Box<dyn ErrorCallback + Send + Sync>>, - pub sharding: Box<dyn Sharding + Send + Sync>, -} - -#[derive(Debug)] -pub struct ShardMessage { - pub stream: Arc<Identifier>, - pub topic: Arc<Identifier>, - pub messages: Vec<IggyMessage>, - pub partitioning: Option<Arc<Partitioning>>, -} - -impl Sizeable for ShardMessage { - fn get_size_bytes(&self) -> IggyByteSize { - let mut total = IggyByteSize::new(0); - - total += self.stream.get_size_bytes(); - total += self.topic.get_size_bytes(); - for msg in &self.messages { - total += msg.get_size_bytes(); - } - total - } -} - -pub struct Shard { - tx: flume::Sender<ShardMessage>, - _handle: JoinHandle<()>, -} - -impl Shard { - pub fn new( - core: Arc<ProducerCore>, - global_buffer: Arc<AtomicUsize>, - config: Arc<BackgroundConfig>, - notify: Arc<Notify>, - err_sender: flume::Sender<ErrorCtx>, - ) -> Self { - let (tx, rx) = flume::bounded::<ShardMessage>(1024); - - let handle = tokio::spawn(async move { - let mut buffer = Vec::new(); - let mut buffer_bytes = 0; - let mut last_flush = tokio::time::Instant::now(); - - loop { - let deadline = last_flush + config.linger_time.get_duration(); - tokio::select! { - maybe_msg = rx.recv_async() => { - match maybe_msg { - Ok(msg) => { - buffer_bytes += msg.get_size_bytes().as_bytes_usize(); - buffer.push(msg); - - let exceed_batch_len = config.batch_length - .map_or(false, |len| buffer.len() >= len); - let exceed_batch_size = config.batch_size - .map_or(false, |size| buffer_bytes >= size); - - if exceed_batch_len || exceed_batch_size { - Self::flush_buffer(&core, &mut buffer, &mut buffer_bytes, &global_buffer, &err_sender, ¬ify).await; - last_flush = tokio::time::Instant::now(); - } - } - Err(_) => break, - } - } - _ = tokio::time::sleep_until(deadline) => { - if !buffer.is_empty() { - Self::flush_buffer(&core, &mut buffer, &mut buffer_bytes, &global_buffer, &err_sender, ¬ify).await; - last_flush = tokio::time::Instant::now(); - } - } - } - } - }); - - Self { - tx, - _handle: handle, - } - } - - async fn flush_buffer( - core: &Arc<ProducerCore>, - buffer: &mut Vec<ShardMessage>, - buffer_bytes: &mut usize, - global_buffer: &Arc<AtomicUsize>, - err_sender: &flume::Sender<ErrorCtx>, - notify: &Arc<Notify>, - ) { - for msg in buffer.drain(..) { - let size = msg.get_size_bytes().as_bytes_usize(); - if let Err(err) = core - .send_internal( - &msg.stream, - &msg.topic, - msg.messages, - msg.partitioning.clone(), - ) - .await - { - if let IggyError::ProducerSendFailed { failed, cause } = &err { - let ctx = ErrorCtx { - cause: cause.clone(), - stream: msg.stream, - topic: msg.topic, - partitioning: msg.partitioning, - messages: failed.clone(), - }; - if err_sender.send_async(ctx).await.is_err() { - tracing::warn!("error-queue receiver has been dropped; lost error report"); - } - } else { - tracing::error!("background send failed: {err}"); - } - } - - global_buffer.fetch_sub(size, Ordering::Relaxed); - notify.notify_waiters(); - } - *buffer_bytes = 0; - } - - async fn send_with_block(&self, message: ShardMessage) -> Result<(), IggyError> { - self.tx.send_async(message).await.map_err(|e| { - error!("Failed to send_with_block: {e}"); - IggyError::BackgroundSendError - }) - } - - async fn send_with_timeout( - &self, - message: ShardMessage, - timeout: IggyDuration, - ) -> Result<(), IggyError> { - match tokio::time::timeout(timeout.get_duration(), self.tx.send_async(message)).await { - Ok(Ok(())) => Ok(()), - Ok(Err(e)) => { - error!("Channel send failed during timeout: {e}"); - Err(IggyError::BackgroundSendTimeout) - } - Err(_) => { - error!("Timeout elapsed before sending message batch"); - Err(IggyError::BackgroundSendTimeout) - } - } - } - - fn send_with_fail(&self, message: ShardMessage) -> Result<(), IggyError> { - self.tx.try_send(message).map_err(|_| { - error!("Channel is full, dropping message batch"); - IggyError::BackgroundSendError - }) - } - - async fn shutdown(self) { - drop(self.tx); - let _ = self._handle.await; - } -} - -pub trait Sharding: Send + Sync + std::fmt::Debug + 'static { - fn pick_shard( - &self, - shards: &[Shard], - messages: &[IggyMessage], - stream: &Identifier, - topic: &Identifier, - ) -> usize; -} - -pub struct BalancedSharding { - counter: AtomicUsize, -} - -impl Default for BalancedSharding { - fn default() -> Self { - Self { - counter: AtomicUsize::new(0), - } - } -} - -impl std::fmt::Debug for BalancedSharding { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("BalancedSharding").finish() - } -} -impl Sharding for BalancedSharding { - fn pick_shard( - &self, - shards: &[Shard], - _: &[IggyMessage], - _: &Identifier, - _: &Identifier, - ) -> usize { - self.counter.fetch_add(1, Ordering::Relaxed) % shards.len() - } -} +use tokio::{sync::Notify, task::JoinHandle}; pub struct ProducerDispatcher { - core: Arc<ProducerCore>, shards: Vec<Shard>, global_buffer: Arc<AtomicUsize>, notify: Arc<Notify>, @@ -248,7 +39,7 @@ pub struct ProducerDispatcher { impl ProducerDispatcher { pub fn new(core: Arc<ProducerCore>, config: BackgroundConfig) -> Self { - let mut shards = Vec::with_capacity(config.max_in_flight); + let mut shards = Vec::with_capacity(config.num_shards); let current_buffered_size = Arc::new(AtomicUsize::new(0)); let notify = Arc::new(Notify::new()); let config = Arc::new(config); @@ -267,7 +58,7 @@ impl ProducerDispatcher { tracing::debug!("error-callback worker finished"); }); - for _ in 0..config.max_in_flight { + for _ in 0..config.num_shards { shards.push(Shard::new( core.clone(), current_buffered_size.clone(), @@ -278,7 +69,6 @@ impl ProducerDispatcher { } Self { - core, shards, global_buffer: current_buffered_size, config, @@ -355,7 +145,7 @@ impl ProducerDispatcher { ); debug_assert!(shard_ix < self.shards.len()); let shard = &self.shards[shard_ix]; - shard.send_with_block(shard_message).await + shard.send(shard_message).await } pub async fn shutdown(mut self) { diff --git a/core/sdk/src/clients/producer_error_callback.rs b/core/sdk/src/clients/producer_error_callback.rs new file mode 100644 index 00000000..13a2ac45 --- /dev/null +++ b/core/sdk/src/clients/producer_error_callback.rs @@ -0,0 +1,70 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +use std::pin::Pin; +use std::sync::Arc; +use std::fmt::Debug; + +use iggy_common::{Identifier, IggyMessage, Partitioning}; +use tracing::error; + +pub struct ErrorCtx { + pub cause: String, + pub stream: Arc<Identifier>, + pub topic: Arc<Identifier>, + pub partitioning: Option<Arc<Partitioning>>, + pub messages: Arc<Vec<IggyMessage>>, +} + +pub trait ErrorCallback: Send + Sync + Debug + 'static { + fn call(&self, ctx: ErrorCtx) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>>; +} + +pub struct LogErrorCallback; + +impl Default for LogErrorCallback { + fn default() -> Self { + Self + } +} + +impl std::fmt::Debug for LogErrorCallback { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("LogErrorCallback").finish() + } +} + +impl ErrorCallback for LogErrorCallback { + fn call(&self, ctx: ErrorCtx) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> { + Box::pin(async move { + let partitioning = ctx + .partitioning + .as_ref() + .map(|p| format!("{:?}", p)) + .unwrap_or_else(|| "None".to_string()); + + error!( + cause = ctx.cause, + stream = %ctx.stream, + topic = %ctx.topic, + partitioning = %partitioning, + num_messages = ctx.messages.len(), + "Failed to send messages in background task", + ); + }) + } +} diff --git a/core/sdk/src/clients/producer_sharding.rs b/core/sdk/src/clients/producer_sharding.rs new file mode 100644 index 00000000..077809fb --- /dev/null +++ b/core/sdk/src/clients/producer_sharding.rs @@ -0,0 +1,200 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +use std::sync::Arc; +use std::sync::atomic::{AtomicUsize, Ordering}; + +use iggy_common::{Identifier, IggyByteSize, IggyError, IggyMessage, Partitioning, Sizeable}; +use tokio::sync::Notify; +use tokio::task::JoinHandle; +use tracing::error; + +use crate::clients::producer::ProducerCore; +use crate::clients::producer_config::BackgroundConfig; +use crate::clients::producer_error_callback::ErrorCtx; + +pub trait Sharding: Send + Sync + std::fmt::Debug + 'static { + fn pick_shard( + &self, + shards: &[Shard], + messages: &[IggyMessage], + stream: &Identifier, + topic: &Identifier, + ) -> usize; +} + +pub struct BalancedSharding { + counter: AtomicUsize, +} + +impl Default for BalancedSharding { + fn default() -> Self { + Self { + counter: AtomicUsize::new(0), + } + } +} + +impl std::fmt::Debug for BalancedSharding { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("BalancedSharding").finish() + } +} + +impl Sharding for BalancedSharding { + fn pick_shard( + &self, + shards: &[Shard], + _: &[IggyMessage], + _: &Identifier, + _: &Identifier, + ) -> usize { + self.counter.fetch_add(1, Ordering::Relaxed) % shards.len() + } +} + +#[derive(Debug)] +pub struct ShardMessage { + pub stream: Arc<Identifier>, + pub topic: Arc<Identifier>, + pub messages: Vec<IggyMessage>, + pub partitioning: Option<Arc<Partitioning>>, +} + +impl Sizeable for ShardMessage { + fn get_size_bytes(&self) -> IggyByteSize { + let mut total = IggyByteSize::new(0); + total += self.stream.get_size_bytes(); + total += self.topic.get_size_bytes(); + for msg in &self.messages { + total += msg.get_size_bytes(); + } + total + } +} + +pub(crate) struct Shard { + tx: flume::Sender<ShardMessage>, + _handle: JoinHandle<()>, +} + +impl Shard { + pub fn new( + core: Arc<ProducerCore>, + global_buffer: Arc<AtomicUsize>, + config: Arc<BackgroundConfig>, + notify: Arc<Notify>, + err_sender: flume::Sender<ErrorCtx>, + ) -> Self { + let (tx, rx) = flume::bounded::<ShardMessage>(1024); + + let handle = tokio::spawn(async move { + let mut buffer = Vec::new(); + let mut buffer_bytes = 0; + let mut last_flush = tokio::time::Instant::now(); + + loop { + let deadline = last_flush + config.linger_time.get_duration(); + tokio::select! { + maybe_msg = rx.recv_async() => { + match maybe_msg { + Ok(msg) => { + buffer_bytes += msg.get_size_bytes().as_bytes_usize(); + buffer.push(msg); + + let exceed_batch_len = config.batch_length + .map_or(false, |len| buffer.len() >= len); + let exceed_batch_size = config.batch_size + .map_or(false, |size| buffer_bytes >= size); + + if exceed_batch_len || exceed_batch_size { + Self::flush_buffer(&core, &mut buffer, &mut buffer_bytes, &global_buffer, &err_sender, ¬ify).await; + last_flush = tokio::time::Instant::now(); + } + } + Err(_) => break, + } + } + _ = tokio::time::sleep_until(deadline) => { + if !buffer.is_empty() { + Self::flush_buffer(&core, &mut buffer, &mut buffer_bytes, &global_buffer, &err_sender, ¬ify).await; + last_flush = tokio::time::Instant::now(); + } + } + } + } + }); + + Self { + tx, + _handle: handle, + } + } + + async fn flush_buffer( + core: &Arc<ProducerCore>, + buffer: &mut Vec<ShardMessage>, + buffer_bytes: &mut usize, + global_buffer: &Arc<AtomicUsize>, + err_sender: &flume::Sender<ErrorCtx>, + notify: &Arc<Notify>, + ) { + for msg in buffer.drain(..) { + let size = msg.get_size_bytes().as_bytes_usize(); + if let Err(err) = core + .send_internal( + &msg.stream, + &msg.topic, + msg.messages, + msg.partitioning.clone(), + ) + .await + { + if let IggyError::ProducerSendFailed { failed, cause } = &err { + let ctx = ErrorCtx { + cause: cause.clone(), + stream: msg.stream, + topic: msg.topic, + partitioning: msg.partitioning, + messages: failed.clone(), + }; + if err_sender.send_async(ctx).await.is_err() { + tracing::warn!("error-queue receiver has been dropped; lost error report"); + } + } else { + tracing::error!("background send failed: {err}"); + } + } + + global_buffer.fetch_sub(size, Ordering::Relaxed); + notify.notify_waiters(); + } + *buffer_bytes = 0; + } + + pub(crate) async fn send(&self, message: ShardMessage) -> Result<(), IggyError> { + self.tx.send_async(message).await.map_err(|e| { + error!("Failed to send_with_block: {e}"); + IggyError::BackgroundSendError + }) + } + + pub(crate) async fn shutdown(self) { + drop(self.tx); + let _ = self._handle.await; + } +}
