This is an automated email from the ASF dual-hosted git repository.
bashirbekov pushed a commit to branch feat/add-background-send
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/feat/add-background-send by
this push:
new 09107ec0 del
09107ec0 is described below
commit 09107ec05da0458a78dad1eb5aca51c8fcc1181d
Author: haze518 <[email protected]>
AuthorDate: Wed Jun 4 10:33:01 2025 +0600
del
---
Cargo.lock | 1 +
core/examples/src/multi-tenant/producer/main.rs | 9 +-
core/examples/src/new-sdk/producer/main.rs | 10 +-
core/integration/tests/mod.rs | 1 +
core/integration/tests/sdk/mod.rs | 0
core/sdk/Cargo.toml | 3 +
core/sdk/src/clients/producer.rs | 172 ++++++------
core/sdk/src/clients/producer_builder.rs | 17 +-
core/sdk/src/clients/producer_config.rs | 2 +-
core/sdk/src/clients/producer_dispatcher.rs | 291 ++++++++++++++++-----
core/sdk/src/clients/producer_sharding.rs | 183 ++++++++++++-
core/sdk/src/prelude.rs | 2 +-
.../stream_builder/build/build_iggy_producer.rs | 4 +-
.../sdk/src/stream_builder/iggy_stream_producer.rs | 3 +-
14 files changed, 536 insertions(+), 162 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index 9f632ea7..39e25fcb 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -3547,6 +3547,7 @@ dependencies = [
"futures-util",
"iggy_binary_protocol",
"iggy_common",
+ "mockall",
"num_cpus",
"quinn",
"reqwest",
diff --git a/core/examples/src/multi-tenant/producer/main.rs
b/core/examples/src/multi-tenant/producer/main.rs
index 28dc9759..f867faa7 100644
--- a/core/examples/src/multi-tenant/producer/main.rs
+++ b/core/examples/src/multi-tenant/producer/main.rs
@@ -260,10 +260,13 @@ async fn create_producers(
let mut producers = Vec::new();
for topic in topics {
for id in 1..=producers_count {
- let mut producer = client
+ let producer = client
.producer(stream, topic)?
- .batch_length(batch_length)
- .linger_time(IggyDuration::from_str(interval).expect("Invalid
duration"))
+ .sync(|b|
+ b
+ .batch_length(batch_length)
+
.linger_time(IggyDuration::from_str(interval).expect("Invalid duration"))
+ )
.partitioning(Partitioning::balanced())
.create_topic_if_not_exists(
partitions_count,
diff --git a/core/examples/src/new-sdk/producer/main.rs
b/core/examples/src/new-sdk/producer/main.rs
index 52f60e7c..0da9afea 100644
--- a/core/examples/src/new-sdk/producer/main.rs
+++ b/core/examples/src/new-sdk/producer/main.rs
@@ -43,10 +43,14 @@ async fn main() -> anyhow::Result<(), Box<dyn Error>> {
let client = client_provider::get_raw_client(client_provider_config,
false).await?;
let client = IggyClient::builder().with_client(client).build()?;
client.connect().await?;
- let mut producer = client
+ let interval = IggyDuration::from_str(&args.interval)?;
+ let producer = client
.producer(&args.stream_id, &args.topic_id)?
- .batch_length(args.messages_per_batch)
- .linger_time(IggyDuration::from_str(&args.interval)?)
+ .sync(|b|
+ b
+ .batch_length(args.messages_per_batch)
+ .linger_time(interval)
+ )
.partitioning(Partitioning::balanced())
.create_topic_if_not_exists(
3,
diff --git a/core/integration/tests/mod.rs b/core/integration/tests/mod.rs
index 32c49b8d..e29f3d5e 100644
--- a/core/integration/tests/mod.rs
+++ b/core/integration/tests/mod.rs
@@ -34,6 +34,7 @@ mod examples;
mod server;
mod state;
mod streaming;
+mod sdk;
lazy_static! {
static ref TESTS_FAILED: AtomicBool = AtomicBool::new(false);
diff --git a/core/integration/tests/sdk/mod.rs
b/core/integration/tests/sdk/mod.rs
new file mode 100644
index 00000000..e69de29b
diff --git a/core/sdk/Cargo.toml b/core/sdk/Cargo.toml
index 32d1cb2f..fd42366e 100644
--- a/core/sdk/Cargo.toml
+++ b/core/sdk/Cargo.toml
@@ -60,3 +60,6 @@ tokio-rustls = { workspace = true }
tracing = { workspace = true }
trait-variant = { workspace = true }
webpki-roots = { workspace = true }
+
+[dev-dependencies]
+mockall.workspace = true
diff --git a/core/sdk/src/clients/producer.rs b/core/sdk/src/clients/producer.rs
index 88a44204..c63ae01e 100644
--- a/core/sdk/src/clients/producer.rs
+++ b/core/sdk/src/clients/producer.rs
@@ -35,6 +35,20 @@ use std::time::Duration;
use tokio::time::{Interval, sleep};
use tracing::{error, info, trace, warn};
+#[cfg(test)]
+use mockall::automock;
+
+#[cfg_attr(test, automock)]
+pub trait ProducerCoreBackend: Send + Sync + 'static {
+ fn send_internal(
+ &self,
+ stream: &Identifier,
+ topic: &Identifier,
+ msgs: Vec<IggyMessage>,
+ partitioning: Option<Arc<Partitioning>>,
+ ) -> impl Future<Output = Result<(), IggyError>> + Send;
+}
+
pub struct ProducerCore {
initialized: AtomicBool,
can_send: Arc<AtomicBool>,
@@ -168,82 +182,6 @@ impl ProducerCore {
});
}
- pub(crate) async fn send_internal(
- &self,
- stream: &Identifier,
- topic: &Identifier,
- mut msgs: Vec<IggyMessage>,
- partitioning: Option<Arc<Partitioning>>,
- ) -> Result<(), IggyError> {
- if msgs.is_empty() {
- return Ok(());
- }
-
- if let Err(err) = self.encrypt_messages(&mut msgs) {
- return Err(IggyError::ProducerSendFailed {
- cause: err.to_string(),
- failed: Arc::new(msgs),
- });
- }
-
- let part = match self.get_partitioning(stream, topic, &msgs,
partitioning.clone()) {
- Ok(p) => p,
- Err(err) => {
- return Err(IggyError::ProducerSendFailed {
- cause: err.to_string(),
- failed: Arc::new(msgs),
- });
- }
- };
-
- match &self.sync_config {
- Some(cfg) => {
- let linger_time_micros = match cfg.linger_time {
- Some(t) => t.as_micros(),
- None => 0
- };
- if linger_time_micros > 0 {
- Self::wait_before_sending(
- linger_time_micros,
- self.last_sent_at.load(ORDERING),
- )
- .await;
- }
-
- let max = cfg.batch_length;
- let mut index = 0;
- while index < msgs.len() {
- let end = (index + max).min(msgs.len());
- let chunk = &mut msgs[index..end];
-
- if let Err(err) = self.try_send_messages(stream, topic,
&part, chunk).await {
- let failed_tail = msgs.split_off(index);
- return Err(IggyError::ProducerSendFailed {
- cause: err.to_string(),
- failed: Arc::new(failed_tail),
- });
- }
- self.last_sent_at
- .store(IggyTimestamp::now().into(), ORDERING);
- index = end;
- }
- }
- // background send on
- _ => {
- self.try_send_messages(stream, topic, &part, &mut msgs)
- .await
- .map_err(|err| IggyError::ProducerSendFailed {
- cause: err.to_string(),
- failed: Arc::new(msgs),
- })?;
- self.last_sent_at
- .store(IggyTimestamp::now().into(), ORDERING);
- }
- }
-
- Ok(())
- }
-
async fn try_send_messages(
&self,
stream: &Identifier,
@@ -414,6 +352,81 @@ impl ProducerCore {
}
}
+impl ProducerCoreBackend for ProducerCore {
+ async fn send_internal(
+ &self,
+ stream: &Identifier,
+ topic: &Identifier,
+ mut msgs: Vec<IggyMessage>,
+ partitioning: Option<Arc<Partitioning>>,
+ ) -> Result<(), IggyError> {
+ if msgs.is_empty() {
+ return Ok(());
+ }
+
+ if let Err(err) = self.encrypt_messages(&mut msgs) {
+ return Err(IggyError::ProducerSendFailed {
+ cause: err.to_string(),
+ failed: Arc::new(msgs),
+ });
+ }
+
+ let part = match self.get_partitioning(stream, topic, &msgs,
partitioning.clone()) {
+ Ok(p) => p,
+ Err(err) => {
+ return Err(IggyError::ProducerSendFailed {
+ cause: err.to_string(),
+ failed: Arc::new(msgs),
+ });
+ }
+ };
+
+ match &self.sync_config {
+ Some(cfg) => {
+ let linger_time_micros = match cfg.linger_time {
+ Some(t) => t.as_micros(),
+ None => 0,
+ };
+ if linger_time_micros > 0 {
+ Self::wait_before_sending(linger_time_micros,
self.last_sent_at.load(ORDERING))
+ .await;
+ }
+
+ let max = cfg.batch_length;
+ let mut index = 0;
+ while index < msgs.len() {
+ let end = (index + max).min(msgs.len());
+ let chunk = &mut msgs[index..end];
+
+ if let Err(err) = self.try_send_messages(stream, topic,
&part, chunk).await {
+ let failed_tail = msgs.split_off(index);
+ return Err(IggyError::ProducerSendFailed {
+ cause: err.to_string(),
+ failed: Arc::new(failed_tail),
+ });
+ }
+ self.last_sent_at
+ .store(IggyTimestamp::now().into(), ORDERING);
+ index = end;
+ }
+ }
+ // background send on
+ _ => {
+ self.try_send_messages(stream, topic, &part, &mut msgs)
+ .await
+ .map_err(|err| IggyError::ProducerSendFailed {
+ cause: err.to_string(),
+ failed: Arc::new(msgs),
+ })?;
+ self.last_sent_at
+ .store(IggyTimestamp::now().into(), ORDERING);
+ }
+ }
+
+ Ok(())
+ }
+}
+
unsafe impl Send for IggyProducer {}
unsafe impl Sync for IggyProducer {}
@@ -466,7 +479,7 @@ impl IggyProducer {
send_retries_interval,
sync_config: match mode {
SendMode::Sync(ref cfg) => Some(cfg.clone()),
- _ => None
+ _ => None,
},
});
let dispatcher = match mode {
@@ -568,8 +581,3 @@ impl IggyProducer {
}
}
}
-
-fn default_shard_count() -> usize {
- let cpus = num_cpus::get();
- cpus.clamp(2, 16)
-}
diff --git a/core/sdk/src/clients/producer_builder.rs
b/core/sdk/src/clients/producer_builder.rs
index 6f3e001b..bc2184d7 100644
--- a/core/sdk/src/clients/producer_builder.rs
+++ b/core/sdk/src/clients/producer_builder.rs
@@ -19,7 +19,8 @@ use super::MAX_BATCH_LENGTH;
use crate::clients::producer_config::{BackgroundConfig, BackpressureMode,
SyncConfig};
use crate::clients::producer_error_callback::LogErrorCallback;
use crate::clients::producer_sharding::{BalancedSharding, Sharding};
-use crate::prelude::{ErrorCallback, IggyProducer};
+use crate::clients::producer_error_callback::ErrorCallback;
+use crate::prelude::IggyProducer;
use iggy_binary_protocol::Client;
use iggy_common::locking::IggySharedMut;
use iggy_common::{
@@ -99,6 +100,20 @@ impl BackgroundBuilder {
}
}
+ pub fn batch_size(self, value: usize) -> Self {
+ Self {
+ batch_size: Some(value),
+ ..self
+ }
+ }
+
+ pub fn sharding(self, sharding: Box<dyn Sharding>) -> Self {
+ Self {
+ sharding,
+ ..self
+ }
+ }
+
pub fn max_buffer_size(self, value: IggyByteSize) -> Self {
Self {
max_buffer_size: Some(value),
diff --git a/core/sdk/src/clients/producer_config.rs
b/core/sdk/src/clients/producer_config.rs
index 0865a05c..2a94bca1 100644
--- a/core/sdk/src/clients/producer_config.rs
+++ b/core/sdk/src/clients/producer_config.rs
@@ -19,8 +19,8 @@ use std::sync::Arc;
use iggy_common::{IggyByteSize, IggyDuration};
+use crate::clients::producer_error_callback::ErrorCallback;
use crate::clients::producer_sharding::Sharding;
-use crate::prelude::ErrorCallback;
#[derive(Debug, Clone)]
/// Determines how the `send_messages` API should behave when problem is
encountered
diff --git a/core/sdk/src/clients/producer_dispatcher.rs
b/core/sdk/src/clients/producer_dispatcher.rs
index 0cc38aa2..c318e34b 100644
--- a/core/sdk/src/clients/producer_dispatcher.rs
+++ b/core/sdk/src/clients/producer_dispatcher.rs
@@ -15,16 +15,14 @@
* specific language governing permissions and limitations
* under the License.
*/
-use crate::clients::producer::ProducerCore;
+use crate::clients::producer::{ProducerCore, ProducerCoreBackend};
use crate::clients::producer_config::{BackgroundConfig, BackpressureMode};
use crate::clients::producer_error_callback::ErrorCtx;
use crate::clients::producer_sharding::{Shard, ShardMessage};
+use futures::FutureExt;
+use iggy_common::{BytesSerializable, Identifier, IggyError, IggyMessage,
Partitioning, SendMessages, Sizeable};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
-use futures::FutureExt;
-use iggy_common::{
- Identifier, IggyError, IggyMessage, Partitioning, Sizeable,
-};
use tokio::{sync::Notify, task::JoinHandle};
@@ -34,13 +32,14 @@ pub struct ProducerDispatcher {
notify: Arc<Notify>,
config: Arc<BackgroundConfig>,
closed: AtomicBool,
+ sender: flume::Sender<ShardMessage>,
_join_handle: JoinHandle<()>,
}
impl ProducerDispatcher {
- pub fn new(core: Arc<ProducerCore>, config: BackgroundConfig) -> Self {
+ pub fn new(core: Arc<impl ProducerCoreBackend>, config: BackgroundConfig)
-> Self {
let mut shards = Vec::with_capacity(config.num_shards);
- let current_buffered_size = Arc::new(AtomicUsize::new(0));
+ let global_buffer = Arc::new(AtomicUsize::new(0));
let notify = Arc::new(Notify::new());
let config = Arc::new(config);
@@ -58,10 +57,78 @@ impl ProducerDispatcher {
tracing::debug!("error-callback worker finished");
});
+ let (tx, rx) = flume::bounded::<ShardMessage>(1);
+ let gb = global_buffer.clone();
+ let cfg = config.clone();
+ let not = notify.clone();
+ let _handle = tokio::spawn(async move {
+ loop {
+ match rx.recv_async().await {
+ Ok(msg) => {
+ let mut reserved = gb.load(Ordering::Relaxed);
+ if let Some(buffer_size) = cfg.max_buffer_size {
+ let batch_bytes = msg.get_size_bytes();
+ if batch_bytes.as_bytes_usize() >
buffer_size.as_bytes_usize() {
+ return
Err(IggyError::BackgroundSendBufferOverflow);
+ }
+ loop {
+ if buffer_size.as_bytes_usize() != 0
+ && reserved + batch_bytes.as_bytes_usize()
+ >= buffer_size.as_bytes_usize()
+ {
+ match cfg.failure_mode {
+ BackpressureMode::Block => {
+ not.notified().await;
+ continue;
+ }
+ BackpressureMode::BlockWithTimeout(t)
=> {
+ if tokio::time::timeout(
+ t.get_duration(),
+ not.notified(),
+ )
+ .await
+ .is_err()
+ {
+ return
Err(IggyError::BackgroundSendTimeout);
+ }
+ continue;
+ }
+ BackpressureMode::FailImmediately => {
+ return
Err(IggyError::BackgroundSendBufferOverflow);
+ }
+ };
+ }
+ match gb.compare_exchange(
+ reserved,
+ reserved + batch_bytes.as_bytes_usize(),
+ Ordering::AcqRel,
+ Ordering::Acquire,
+ ) {
+ Ok(_) => break,
+ Err(v) => reserved = v,
+ }
+ }
+ }
+
+ let shard_ix = cfg.sharding.pick_shard(
+ &self.shards,
+ &msg.messages,
+ &msg.stream,
+ &msg.topic,
+ );
+ debug_assert!(shard_ix < self.shards.len());
+ let shard = &self.shards[shard_ix];
+ shard.send(shard_message).await
+ }
+ Err(_) => break,
+ }
+ }
+ });
+
for _ in 0..config.num_shards {
shards.push(Shard::new(
core.clone(),
- current_buffered_size.clone(),
+ global_buffer.clone(),
config.clone(),
notify.clone(),
err_tx.clone(),
@@ -70,7 +137,7 @@ impl ProducerDispatcher {
Self {
shards,
- global_buffer: current_buffered_size,
+ global_buffer,
config,
notify,
closed: AtomicBool::new(false),
@@ -89,63 +156,12 @@ impl ProducerDispatcher {
return Err(IggyError::ProducerClosed);
}
- let shard_message = ShardMessage {
+ self.sender.send_async(ShardMessage {
messages,
stream,
topic,
partitioning,
- };
- let batch_bytes = shard_message.get_size_bytes();
-
- let mut reserved = self.global_buffer.load(Ordering::Relaxed);
- if let Some(buffer_size) = &self.config.max_buffer_size {
- if batch_bytes.as_bytes_usize() > buffer_size.as_bytes_usize() {
- return Err(IggyError::BackgroundSendBufferOverflow);
- }
- loop {
- if buffer_size.as_bytes_usize() != 0
- && reserved + batch_bytes.as_bytes_usize() >
buffer_size.as_bytes_usize()
- {
- match self.config.failure_mode {
- BackpressureMode::Block => {
- self.notify.notified().await;
- continue;
- }
- BackpressureMode::BlockWithTimeout(t) => {
- if tokio::time::timeout(t.get_duration(),
self.notify.notified())
- .await
- .is_err()
- {
- return Err(IggyError::BackgroundSendTimeout);
- }
- continue;
- }
- BackpressureMode::FailImmediately => {
- return
Err(IggyError::BackgroundSendBufferOverflow);
- }
- };
- }
- match self.global_buffer.compare_exchange(
- reserved,
- reserved + batch_bytes.as_bytes_usize(),
- Ordering::AcqRel,
- Ordering::Acquire,
- ) {
- Ok(_) => break,
- Err(v) => reserved = v,
- }
- }
- }
-
- let shard_ix = self.config.sharding.pick_shard(
- &self.shards,
- &shard_message.messages,
- &shard_message.stream,
- &shard_message.topic,
- );
- debug_assert!(shard_ix < self.shards.len());
- let shard = &self.shards[shard_ix];
- shard.send(shard_message).await
+ }).await.map_err(|_| IggyError::BackgroundSendError)
}
pub async fn shutdown(mut self) {
@@ -163,3 +179,154 @@ impl ProducerDispatcher {
debug_assert_eq!(self.global_buffer.load(Ordering::Relaxed), 0);
}
}
+
+#[cfg(test)]
+mod tests {
+ use std::time::Duration;
+
+ use bytes::Bytes;
+ use tokio::sync::Mutex;
+ use tokio::time::{sleep, timeout};
+
+ use crate::clients::producer::MockProducerCoreBackend;
+ use crate::clients::producer_builder::BackgroundBuilder;
+
+ use super::*;
+
+ fn dummy_identifier() -> Arc<Identifier> {
+ Arc::new(Identifier::numeric(1).unwrap())
+ }
+
+ fn dummy_message(size: usize) -> IggyMessage {
+ IggyMessage::builder()
+ .payload(Bytes::from(vec![0u8; size]))
+ .build()
+ .unwrap()
+ }
+
+ #[tokio::test]
+ async fn test_dispatch_successful() {
+ let mut mock = MockProducerCoreBackend::new();
+ mock.expect_send_internal()
+ .times(1)
+ .returning(|_, _, _, _| Box::pin(async { Ok(()) }));
+
+ let msg = ShardMessage {
+ stream: dummy_identifier(),
+ topic: dummy_identifier(),
+ messages: vec![dummy_message(5)],
+ partitioning: None,
+ };
+ let config = BackgroundBuilder::default()
+ .max_buffer_size(msg.get_size_bytes() + 100.into())
+ .num_shards(1)
+ .build();
+
+ let dispatcher = ProducerDispatcher::new(Arc::new(mock), config);
+
+ let result = dispatcher
+ .dispatch(msg.messages, msg.topic, msg.stream, msg.partitioning)
+ .await;
+
+ sleep(Duration::from_millis(100)).await;
+
+ assert!(result.is_ok());
+ }
+
+ #[tokio::test]
+ async fn test_dispatch_buffer_overflow_immediate_fail() {
+ let mock = MockProducerCoreBackend::new();
+
+ let msg = ShardMessage {
+ stream: dummy_identifier(),
+ topic: dummy_identifier(),
+ messages: vec![dummy_message(5)],
+ partitioning: None,
+ };
+ let config = BackgroundBuilder::default()
+ .max_buffer_size(msg.get_size_bytes() - 10.into())
+ .failure_mode(BackpressureMode::FailImmediately)
+ .num_shards(1)
+ .build();
+
+ let dispatcher = ProducerDispatcher::new(Arc::new(mock), config);
+
+ let result = dispatcher
+ .dispatch(msg.messages, msg.topic, msg.stream, msg.partitioning)
+ .await;
+
+ assert!(matches!(
+ result,
+ Err(IggyError::BackgroundSendBufferOverflow)
+ ));
+ }
+
+ #[tokio::test]
+ async fn test_dispatch_buffer_overflow_block_with_timeout() {
+ let mock = MockProducerCoreBackend::new();
+
+ let msg = ShardMessage {
+ stream: dummy_identifier(),
+ topic: dummy_identifier(),
+ messages: vec![dummy_message(5)],
+ partitioning: None,
+ };
+ let config = BackgroundBuilder::default()
+ .max_buffer_size(msg.get_size_bytes() + 100.into())
+ .failure_mode(BackpressureMode::BlockWithTimeout(
+ Duration::from_millis(100).into(),
+ ))
+ .num_shards(1)
+ .build();
+
+ let dispatcher = ProducerDispatcher::new(Arc::new(mock), config);
+ dispatcher.global_buffer.store(100, Ordering::Relaxed);
+
+ let result = dispatcher
+ .dispatch(msg.messages, msg.topic, msg.stream, msg.partitioning)
+ .await;
+
+ assert!(matches!(result, Err(IggyError::BackgroundSendTimeout)));
+ }
+
+ #[tokio::test]
+ async fn test_dispatch_buffer_overflow_block_then_continue() {
+ let mut mock = MockProducerCoreBackend::new();
+ mock.expect_send_internal()
+ .times(1)
+ .returning(|_, _, _, _| Box::pin(async { Ok(()) }));
+
+ let msg = ShardMessage {
+ stream: dummy_identifier(),
+ topic: dummy_identifier(),
+ messages: vec![dummy_message(5)],
+ partitioning: None,
+ };
+ let config = BackgroundBuilder::default()
+ .max_buffer_size(msg.get_size_bytes() + 100.into())
+ .failure_mode(BackpressureMode::Block)
+ .num_shards(1)
+ .build();
+
+ let dispatcher = ProducerDispatcher::new(Arc::new(mock), config);
+ dispatcher.global_buffer.store(50, Ordering::Relaxed);
+
+ let notify = dispatcher.notify.clone();
+
+ let global_buffer = dispatcher.global_buffer.clone();
+ tokio::spawn(async move {
+ tokio::time::sleep(Duration::from_millis(150)).await;
+ global_buffer.fetch_sub(50, Ordering::Relaxed);
+ notify.notify_waiters();
+ });
+
+ let result = dispatcher
+ .dispatch(msg.messages, msg.topic, msg.stream, msg.partitioning)
+ .await;
+
+ dispatcher.notify.notified().await;
+ sleep(Duration::from_millis(100)).await;
+
+ assert!(result.is_ok())
+ }
+}
diff --git a/core/sdk/src/clients/producer_sharding.rs
b/core/sdk/src/clients/producer_sharding.rs
index 077809fb..3ae5f682 100644
--- a/core/sdk/src/clients/producer_sharding.rs
+++ b/core/sdk/src/clients/producer_sharding.rs
@@ -23,7 +23,7 @@ use tokio::sync::Notify;
use tokio::task::JoinHandle;
use tracing::error;
-use crate::clients::producer::ProducerCore;
+use crate::clients::producer::ProducerCoreBackend;
use crate::clients::producer_config::BackgroundConfig;
use crate::clients::producer_error_callback::ErrorCtx;
@@ -87,14 +87,14 @@ impl Sizeable for ShardMessage {
}
}
-pub(crate) struct Shard {
+pub struct Shard {
tx: flume::Sender<ShardMessage>,
_handle: JoinHandle<()>,
}
impl Shard {
pub fn new(
- core: Arc<ProducerCore>,
+ core: Arc<impl ProducerCoreBackend>,
global_buffer: Arc<AtomicUsize>,
config: Arc<BackgroundConfig>,
notify: Arc<Notify>,
@@ -146,7 +146,7 @@ impl Shard {
}
async fn flush_buffer(
- core: &Arc<ProducerCore>,
+ core: &Arc<impl ProducerCoreBackend>,
buffer: &mut Vec<ShardMessage>,
buffer_bytes: &mut usize,
global_buffer: &Arc<AtomicUsize>,
@@ -198,3 +198,178 @@ impl Shard {
let _ = self._handle.await;
}
}
+
+#[cfg(test)]
+mod tests {
+ use std::time::Duration;
+
+ use bytes::Bytes;
+ use iggy_common::IggyDuration;
+ use tokio::{sync::Notify, time::sleep};
+
+ use super::*;
+ use crate::clients::{producer::MockProducerCoreBackend,
producer_builder::BackgroundBuilder};
+
+ fn dummy_identifier() -> Arc<Identifier> {
+ Arc::new(Identifier::numeric(1).unwrap())
+ }
+
+ fn dummy_message(size: usize) -> IggyMessage {
+ IggyMessage::builder()
+ .payload(Bytes::from(vec![0u8; size]))
+ .build()
+ .unwrap()
+ }
+
+ #[tokio::test]
+ async fn test_shard_flushes_by_batch_length() {
+ let mut mock = MockProducerCoreBackend::new();
+ mock.expect_send_internal()
+ .times(10)
+ .returning(|_, _, _, _| Box::pin(async { Ok(()) }));
+
+ let bb = BackgroundBuilder::default()
+ .batch_length(10)
+ .linger_time(IggyDuration::new_from_secs(1))
+ .batch_size(10_000);
+ let shard = Shard::new(
+ Arc::new(mock),
+ Arc::new(AtomicUsize::new(0)),
+ Arc::new(bb.build()),
+ Arc::new(Notify::new()),
+ flume::unbounded().0,
+ );
+
+ for _ in 0..10 {
+ shard
+ .send(ShardMessage {
+ stream: dummy_identifier(),
+ topic: dummy_identifier(),
+ messages: vec![dummy_message(1)],
+ partitioning: None,
+ })
+ .await
+ .unwrap();
+ }
+
+ sleep(Duration::from_millis(100)).await;
+ }
+
+ #[tokio::test]
+ async fn test_shard_flushes_by_batch_size() {
+ let mut mock = MockProducerCoreBackend::new();
+ mock.expect_send_internal()
+ .times(1)
+ .returning(|_, _, _, _| Box::pin(async { Ok(()) }));
+
+ let bb = BackgroundBuilder::default()
+ .batch_length(1000)
+ .linger_time(IggyDuration::new_from_secs(1))
+ .batch_size(10_000);
+ let shard = Shard::new(
+ Arc::new(mock),
+ Arc::new(AtomicUsize::new(0)),
+ Arc::new(bb.build()),
+ Arc::new(Notify::new()),
+ flume::unbounded().0,
+ );
+
+ shard
+ .send(ShardMessage {
+ stream: dummy_identifier(),
+ topic: dummy_identifier(),
+ messages: vec![dummy_message(10_000)],
+ partitioning: None,
+ })
+ .await
+ .unwrap();
+
+ sleep(Duration::from_millis(100)).await;
+ }
+
+ #[tokio::test]
+ async fn test_shard_flushes_by_timeout() {
+ let mut mock = MockProducerCoreBackend::new();
+ mock.expect_send_internal().times(1).returning(|_, _, _, _|
Box::pin(async { Ok(()) }));
+
+ let bb = BackgroundBuilder::default()
+ .batch_length(10)
+ .linger_time(IggyDuration::new(Duration::from_millis(50)))
+ .batch_size(10_000);
+ let shard = Shard::new(
+ Arc::new(mock),
+ Arc::new(AtomicUsize::new(0)),
+ Arc::new(bb.build()),
+ Arc::new(Notify::new()),
+ flume::unbounded().0,
+ );
+
+ shard
+ .send(ShardMessage {
+ stream: dummy_identifier(),
+ topic: dummy_identifier(),
+ messages: vec![dummy_message(1)],
+ partitioning: None,
+ })
+ .await
+ .unwrap();
+
+ sleep(Duration::from_millis(100)).await;
+ }
+
+ #[tokio::test]
+ async fn test_shard_forwards_error() {
+ let mut mock = MockProducerCoreBackend::new();
+ let error = IggyError::ProducerSendFailed {
+ failed: Arc::new(vec![dummy_message(1)]),
+ cause: "test error".into(),
+ };
+
+ mock.expect_send_internal().returning(move |_, _, _, _| {
+ let err = error.clone();
+ Box::pin(async move { Err(err) })
+ });
+
+ let (err_tx, err_rx) = flume::unbounded();
+ let bb = BackgroundBuilder::default();
+ let shard = Shard::new(
+ Arc::new(mock),
+ Arc::new(AtomicUsize::new(0)),
+ Arc::new(bb.build()),
+ Arc::new(Notify::new()),
+ err_tx,
+ );
+
+ shard.send(ShardMessage {
+ stream: dummy_identifier(),
+ topic: dummy_identifier(),
+ messages: vec![dummy_message(1)],
+ partitioning: None,
+ }).await.unwrap();
+
+ let err_ctx = err_rx.recv_async().await.unwrap();
+ assert_eq!(err_ctx.cause, "test error");
+ assert_eq!(err_ctx.messages.len(), 1);
+ }
+
+ #[tokio::test]
+ async fn test_shard_send_error_on_closed_channel() {
+ let (tx, rx) = flume::bounded::<ShardMessage>(1);
+
+ drop(rx);
+
+ let shard = Shard {
+ tx,
+ _handle: tokio::spawn(async {}),
+ };
+
+ let result = shard.send(ShardMessage {
+ stream: dummy_identifier(),
+ topic: dummy_identifier(),
+ messages: vec![dummy_message(1)],
+ partitioning: None,
+ }).await;
+
+ assert!(matches!(result, Err(IggyError::BackgroundSendError)));
+ }
+}
diff --git a/core/sdk/src/prelude.rs b/core/sdk/src/prelude.rs
index 9b13f218..3d87cc82 100644
--- a/core/sdk/src/prelude.rs
+++ b/core/sdk/src/prelude.rs
@@ -35,7 +35,7 @@ pub use crate::clients::consumer::{
AutoCommit, AutoCommitAfter, AutoCommitWhen, IggyConsumer, ReceivedMessage,
};
pub use crate::clients::consumer_builder::IggyConsumerBuilder;
-pub use crate::clients::producer::{IggyProducer, ErrorCallback};
+pub use crate::clients::producer::IggyProducer;
pub use crate::clients::producer_builder::IggyProducerBuilder;
pub use crate::consumer_ext::IggyConsumerMessageExt;
pub use crate::stream_builder::IggyConsumerConfig;
diff --git a/core/sdk/src/stream_builder/build/build_iggy_producer.rs
b/core/sdk/src/stream_builder/build/build_iggy_producer.rs
index 85409c5f..eb6b2167 100644
--- a/core/sdk/src/stream_builder/build/build_iggy_producer.rs
+++ b/core/sdk/src/stream_builder/build/build_iggy_producer.rs
@@ -17,9 +17,7 @@
*/
use crate::clients::client::IggyClient;
-use crate::clients::producer::{ErrorCallback, IggyProducer};
-use crate::clients::producer_builder::{SendMode, SyncBuilder};
-use crate::clients::producer_dispatcher::Sharding;
+use crate::clients::producer::IggyProducer;
use crate::prelude::{IggyError, IggyExpiry, MaxTopicSize};
use crate::stream_builder::IggyProducerConfig;
use tracing::{error, trace};
diff --git a/core/sdk/src/stream_builder/iggy_stream_producer.rs
b/core/sdk/src/stream_builder/iggy_stream_producer.rs
index 0a7174ce..31669518 100644
--- a/core/sdk/src/stream_builder/iggy_stream_producer.rs
+++ b/core/sdk/src/stream_builder/iggy_stream_producer.rs
@@ -17,8 +17,7 @@
*/
use crate::clients::client::IggyClient;
-use crate::clients::producer::{IggyProducer, ErrorCallback};
-use crate::clients::producer_dispatcher::Sharding;
+use crate::clients::producer::IggyProducer;
use crate::prelude::{IggyError, SystemClient};
use crate::stream_builder::{IggyProducerConfig, build};
use tracing::trace;