This is an automated email from the ASF dual-hosted git repository. bashirbekov pushed a commit to branch refactor/replace-sync-with-direct in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 7ac83bdcedb154975d57b471fd0b690bc7c992a2 Author: haze518 <ashr...@gmail.com> AuthorDate: Fri Jun 13 10:00:12 2025 +0600 refactor: rename sync configuration to direct --- core/common/src/error/iggy_error.rs | 7 +++- core/connectors/runtime/src/source.rs | 6 +-- core/examples/src/multi-tenant/producer/main.rs | 4 +- core/examples/src/new-sdk/producer/main.rs | 4 +- core/examples/src/sink-data-producer/main.rs | 8 ++-- core/sdk/src/clients/mod.rs | 1 + core/sdk/src/clients/producer.rs | 39 +++++++++--------- core/sdk/src/clients/producer_builder.rs | 14 +++---- core/sdk/src/clients/producer_config.rs | 13 +++--- core/sdk/src/clients/producer_dispatcher.rs | 4 +- core/sdk/src/clients/producer_error_callback.rs | 10 +++-- core/sdk/src/clients/producer_sharding.rs | 47 +++++++++++++++++----- core/sdk/src/prelude.rs | 2 +- .../stream_builder/build/build_iggy_producer.rs | 6 +-- 14 files changed, 99 insertions(+), 66 deletions(-) diff --git a/core/common/src/error/iggy_error.rs b/core/common/src/error/iggy_error.rs index d8b8d005..cc8a284b 100644 --- a/core/common/src/error/iggy_error.rs +++ b/core/common/src/error/iggy_error.rs @@ -22,7 +22,7 @@ use std::sync::Arc; use strum::{EnumDiscriminants, FromRepr, IntoStaticStr}; use thiserror::Error; -#[derive(Clone, Debug, Error, EnumDiscriminants, IntoStaticStr, FromRepr)] +#[derive(Clone, Debug, Error, EnumDiscriminants, IntoStaticStr, FromRepr, Default)] #[repr(u32)] #[strum(serialize_all = "snake_case")] #[strum_discriminants( @@ -31,6 +31,7 @@ use thiserror::Error; strum(serialize_all = "snake_case") )] pub enum IggyError { + #[default] #[error("Error")] Error = 1, #[error("Invalid configuration")] @@ -375,8 +376,10 @@ pub enum IggyError { BackgroundSendBufferOverflow = 4055, #[error("Producer send failed")] ProducerSendFailed { - cause: String, + cause: Box<IggyError>, failed: Arc<Vec<IggyMessage>>, + stream_name: String, + topic_name: String, } = 4056, #[error("Producer closed")] ProducerClosed = 4057, diff --git a/core/connectors/runtime/src/source.rs b/core/connectors/runtime/src/source.rs index b6cdeb4d..6bc6a526 100644 --- a/core/connectors/runtime/src/source.rs +++ b/core/connectors/runtime/src/source.rs @@ -20,7 +20,7 @@ use dashmap::DashMap; use dlopen2::wrapper::Container; use flume::{Receiver, Sender}; use iggy::prelude::{ - HeaderKey, HeaderValue, IggyClient, IggyDuration, IggyError, IggyMessage, SyncConfig, + DirectConfig, HeaderKey, HeaderValue, IggyClient, IggyDuration, IggyError, IggyMessage, }; use iggy_connector_sdk::{ DecodedMessage, Error, ProducedMessages, StreamEncoder, TopicMetadata, transforms::Transform, @@ -120,8 +120,8 @@ pub async fn init( let batch_length = stream.batch_length.unwrap_or(1000); let producer = iggy_client .producer(&stream.stream, &stream.topic)? - .sync( - SyncConfig::builder() + .direct( + DirectConfig::builder() .batch_length(batch_length) .linger_time(linger_time) .build(), diff --git a/core/examples/src/multi-tenant/producer/main.rs b/core/examples/src/multi-tenant/producer/main.rs index 5931f1b8..456b1c29 100644 --- a/core/examples/src/multi-tenant/producer/main.rs +++ b/core/examples/src/multi-tenant/producer/main.rs @@ -262,8 +262,8 @@ async fn create_producers( for id in 1..=producers_count { let producer = client .producer(stream, topic)? - .sync( - SyncConfig::builder() + .direct( + DirectConfig::builder() .batch_length(batch_length) .linger_time(IggyDuration::from_str(interval).expect("Invalid duration")) .build(), diff --git a/core/examples/src/new-sdk/producer/main.rs b/core/examples/src/new-sdk/producer/main.rs index 62d2b827..8f4b7cf5 100644 --- a/core/examples/src/new-sdk/producer/main.rs +++ b/core/examples/src/new-sdk/producer/main.rs @@ -46,8 +46,8 @@ async fn main() -> anyhow::Result<(), Box<dyn Error>> { let interval = IggyDuration::from_str(&args.interval)?; let producer = client .producer(&args.stream_id, &args.topic_id)? - .sync( - SyncConfig::builder() + .direct( + DirectConfig::builder() .batch_length(args.messages_per_batch) .linger_time(interval) .build(), diff --git a/core/examples/src/sink-data-producer/main.rs b/core/examples/src/sink-data-producer/main.rs index e97b51fb..d8a63f16 100644 --- a/core/examples/src/sink-data-producer/main.rs +++ b/core/examples/src/sink-data-producer/main.rs @@ -20,8 +20,8 @@ use std::{env, str::FromStr, time::Duration}; use chrono::{DateTime, Days, Utc}; use iggy::prelude::{ - Client, IggyClient, IggyClientBuilder, IggyDuration, IggyError, IggyMessage, Partitioning, - SyncConfig, + Client, DirectConfig, IggyClient, IggyClientBuilder, IggyDuration, IggyError, IggyMessage, + Partitioning, }; use rand::{ Rng, @@ -58,8 +58,8 @@ async fn main() -> Result<(), DataProducerError> { let client = create_client(&address, &username, &password).await?; let producer = client .producer(&stream, &topic)? - .sync( - SyncConfig::builder() + .direct( + DirectConfig::builder() .batch_length(1000) .linger_time(IggyDuration::from_str("5ms").unwrap()) .build(), diff --git a/core/sdk/src/clients/mod.rs b/core/sdk/src/clients/mod.rs index e50bc443..9f51cc52 100644 --- a/core/sdk/src/clients/mod.rs +++ b/core/sdk/src/clients/mod.rs @@ -39,3 +39,4 @@ pub mod producer_sharding; const ORDERING: std::sync::atomic::Ordering = std::sync::atomic::Ordering::SeqCst; const MAX_BATCH_LENGTH: usize = 1000000; +const MIB: usize = 1_048_576; diff --git a/core/sdk/src/clients/producer.rs b/core/sdk/src/clients/producer.rs index 2480ea97..43ecf7b0 100644 --- a/core/sdk/src/clients/producer.rs +++ b/core/sdk/src/clients/producer.rs @@ -18,7 +18,7 @@ use super::ORDERING; use crate::clients::MAX_BATCH_LENGTH; use crate::clients::producer_builder::SendMode; -use crate::clients::producer_config::SyncConfig; +use crate::clients::producer_config::DirectConfig; use crate::clients::producer_dispatcher::ProducerDispatcher; use bytes::Bytes; use futures_util::StreamExt; @@ -70,7 +70,7 @@ pub struct ProducerCore { last_sent_at: Arc<AtomicU64>, send_retries_count: Option<u32>, send_retries_interval: Option<IggyDuration>, - sync_config: Option<SyncConfig>, + direct_config: Option<DirectConfig>, } impl ProducerCore { @@ -345,6 +345,15 @@ impl ProducerCore { ); sleep(Duration::from_micros(remaining)).await; } + + fn make_failed_error(&self, cause: IggyError, failed: Vec<IggyMessage>) -> IggyError { + IggyError::ProducerSendFailed { + cause: Box::new(cause), + failed: Arc::new(failed), + stream_name: self.stream_name.clone(), + topic_name: self.topic_name.clone(), + } + } } impl ProducerCoreBackend for ProducerCore { @@ -360,23 +369,17 @@ impl ProducerCoreBackend for ProducerCore { } if let Err(err) = self.encrypt_messages(&mut msgs) { - return Err(IggyError::ProducerSendFailed { - cause: err.to_string(), - failed: Arc::new(msgs), - }); + return Err(self.make_failed_error(err, msgs)); } let part = match self.get_partitioning(stream, topic, &msgs, partitioning.clone()) { Ok(p) => p, Err(err) => { - return Err(IggyError::ProducerSendFailed { - cause: err.to_string(), - failed: Arc::new(msgs), - }); + return Err(self.make_failed_error(err, msgs)); } }; - match &self.sync_config { + match &self.direct_config { Some(cfg) => { let linger_time_micros = cfg.linger_time.as_micros(); if linger_time_micros > 0 { @@ -396,10 +399,7 @@ impl ProducerCoreBackend for ProducerCore { if let Err(err) = self.try_send_messages(stream, topic, &part, chunk).await { let failed_tail = msgs.split_off(index); - return Err(IggyError::ProducerSendFailed { - cause: err.to_string(), - failed: Arc::new(failed_tail), - }); + return Err(self.make_failed_error(err, failed_tail)); } self.last_sent_at .store(IggyTimestamp::now().into(), ORDERING); @@ -410,10 +410,7 @@ impl ProducerCoreBackend for ProducerCore { _ => { self.try_send_messages(stream, topic, &part, &mut msgs) .await - .map_err(|err| IggyError::ProducerSendFailed { - cause: err.to_string(), - failed: Arc::new(msgs), - })?; + .map_err(|err| self.make_failed_error(err, msgs))?; self.last_sent_at .store(IggyTimestamp::now().into(), ORDERING); } @@ -473,8 +470,8 @@ impl IggyProducer { last_sent_at: Arc::new(AtomicU64::new(0)), send_retries_count, send_retries_interval, - sync_config: match mode { - SendMode::Sync(ref cfg) => Some(cfg.clone()), + direct_config: match mode { + SendMode::Direct(ref cfg) => Some(cfg.clone()), _ => None, }, }); diff --git a/core/sdk/src/clients/producer_builder.rs b/core/sdk/src/clients/producer_builder.rs index 10e58f77..fcd0c4a0 100644 --- a/core/sdk/src/clients/producer_builder.rs +++ b/core/sdk/src/clients/producer_builder.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use crate::clients::producer_config::{BackgroundConfig, SyncConfig}; +use crate::clients::producer_config::{BackgroundConfig, DirectConfig}; use crate::prelude::IggyProducer; use iggy_binary_protocol::Client; use iggy_common::locking::IggySharedMut; @@ -25,13 +25,13 @@ use iggy_common::{ use std::sync::Arc; pub enum SendMode { - Sync(SyncConfig), + Direct(DirectConfig), Background(BackgroundConfig), } impl Default for SendMode { fn default() -> Self { - SendMode::Sync(SyncConfig::builder().build()) + SendMode::Direct(DirectConfig::builder().build()) } } @@ -198,15 +198,15 @@ impl IggyProducerBuilder { } } - /// Sets the producer to use synchronous (direct) message sending. + /// Sets the producer to use direct message sending. /// This mode ensures that messages are sent immediately to the server /// without being buffered or delayed. - pub fn sync(mut self, config: SyncConfig) -> Self { - self.mode = SendMode::Sync(config); + pub fn direct(mut self, config: DirectConfig) -> Self { + self.mode = SendMode::Direct(config); self } - /// Sets the producer to use asynchronous (background) message sending. + /// Sets the producer to use background message sending. /// This mode buffers messages and sends them in the background. pub fn background(mut self, config: BackgroundConfig) -> Self { self.mode = SendMode::Background(config); diff --git a/core/sdk/src/clients/producer_config.rs b/core/sdk/src/clients/producer_config.rs index 8fe83ed2..dd3f301d 100644 --- a/core/sdk/src/clients/producer_config.rs +++ b/core/sdk/src/clients/producer_config.rs @@ -15,6 +15,7 @@ * specific language governing permissions and limitations * under the License. */ +use crate::clients::MIB; use crate::clients::producer_error_callback::{ErrorCallback, LogErrorCallback}; use crate::clients::producer_sharding::{BalancedSharding, Sharding}; use bon::Builder; @@ -87,7 +88,7 @@ pub struct BackgroundConfig { pub sharding: Box<dyn Sharding + Send + Sync>, /// Maximum **total size in bytes** of a batch. /// `0` ⇒ unlimited (size-based batching disabled). - #[builder(default = 1_048_576)] + #[builder(default = MIB)] pub batch_size: usize, /// Maximum **number of messages** per batch. /// `0` ⇒ unlimited (length-based batching disabled). @@ -98,7 +99,7 @@ pub struct BackgroundConfig { pub failure_mode: BackpressureMode, /// Upper bound for the **bytes held in memory** across *all* shards. /// `IggyByteSize::from(0)` ⇒ unlimited. - #[builder(default = IggyByteSize::from(32 * 1_048_576))] + #[builder(default = IggyByteSize::from(32 * MIB as u64))] pub max_buffer_size: IggyByteSize, /// Maximum number of **in-flight requests** (batches being sent). /// `0` ⇒ unlimited. @@ -114,20 +115,20 @@ pub struct BackgroundConfig { /// use iggy_common::IggyDuration; /// /// // Send messages one-by-one (max latency, min memory per request) -/// let cfg = SyncConfig::builder() +/// let cfg = DirectConfig::builder() /// .batch_length(1) /// .linger_time(IggyDuration::from(0)) /// .build(); /// /// // Send in chunks of up to 500 messages, /// // with a delay of at least 200 ms between consecutive sends. -/// let cfg = SyncConfig::builder() +/// let cfg = DirectConfig::builder() /// .batch_length(500) /// .linger_time(IggyDuration::from(200)) /// .build(); /// ``` #[derive(Clone, Builder)] -pub struct SyncConfig { +pub struct DirectConfig { /// Maximum number of messages to pack into **one** synchronous request. /// `0` ⇒ MAX_BATCH_LENTH(). #[builder(default = 1000)] @@ -139,5 +140,5 @@ pub struct SyncConfig { fn default_shard_count() -> usize { let cpus = num_cpus::get(); - cpus.clamp(2, 16) + cpus.clamp(2, 64) } diff --git a/core/sdk/src/clients/producer_dispatcher.rs b/core/sdk/src/clients/producer_dispatcher.rs index 5ef07158..fa6dded4 100644 --- a/core/sdk/src/clients/producer_dispatcher.rs +++ b/core/sdk/src/clients/producer_dispatcher.rs @@ -425,8 +425,10 @@ mod tests { .returning(|_, _, _, _| { Box::pin(async { Err(IggyError::ProducerSendFailed { - cause: "some_error".to_string(), + cause: Box::new(IggyError::Error), failed: Arc::new(vec![dummy_message(10)]), + stream_name: "1".to_string(), + topic_name: "1".to_string(), }) }) }); diff --git a/core/sdk/src/clients/producer_error_callback.rs b/core/sdk/src/clients/producer_error_callback.rs index bfb16ac2..b8ce5890 100644 --- a/core/sdk/src/clients/producer_error_callback.rs +++ b/core/sdk/src/clients/producer_error_callback.rs @@ -15,7 +15,7 @@ * specific language governing permissions and limitations * under the License. */ -use iggy_common::{Identifier, IggyMessage, Partitioning}; +use iggy_common::{Identifier, IggyError, IggyMessage, Partitioning}; use std::fmt::Debug; use std::pin::Pin; use std::sync::Arc; @@ -23,9 +23,11 @@ use tracing::error; #[derive(Debug)] pub struct ErrorCtx { - pub cause: String, + pub cause: Box<IggyError>, pub stream: Arc<Identifier>, + pub stream_name: String, pub topic: Arc<Identifier>, + pub topic_name: String, pub partitioning: Option<Arc<Partitioning>>, pub messages: Arc<Vec<IggyMessage>>, } @@ -54,9 +56,11 @@ impl ErrorCallback for LogErrorCallback { .unwrap_or_else(|| "None".to_string()); error!( - cause = ctx.cause, + cause = %ctx.cause, stream = %ctx.stream, + stream_name = ctx.stream_name, topic = %ctx.topic, + topic_name = ctx.topic_name, partitioning = %partitioning, num_messages = ctx.messages.len(), "Failed to send messages in background task", diff --git a/core/sdk/src/clients/producer_sharding.rs b/core/sdk/src/clients/producer_sharding.rs index 5cbe45f5..49c2e17a 100644 --- a/core/sdk/src/clients/producer_sharding.rs +++ b/core/sdk/src/clients/producer_sharding.rs @@ -23,7 +23,7 @@ use std::sync::Arc; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use tokio::sync::{OwnedSemaphorePermit, broadcast}; use tokio::task::JoinHandle; -use tracing::error; +use tracing::{debug, error}; /// A strategy for distributing messages across shards. /// @@ -130,12 +130,31 @@ impl Shard { Ok(msg) => { buffer_bytes += msg.inner.get_size_bytes().as_bytes_usize(); buffer.push(msg); + debug!( + buffer_len = buffer.len(), + buffer_bytes, + "Added message to buffer" + ); let exceed_batch_len = config.batch_length != 0 && buffer.len() >= config.batch_length; let exceed_batch_size = config.batch_size != 0 && buffer_bytes >= config.batch_size; if exceed_batch_len || exceed_batch_size { + debug!( + exceed_batch_len, + exceed_batch_size, + "Flushing buffer (trigger: batch_len={}, batch_size={})", + exceed_batch_len, + exceed_batch_size, + ); + Self::flush_buffer(&core, &mut buffer, &mut buffer_bytes, &err_sender).await; + debug!( + new_buffer_len = buffer.len(), + new_buffer_bytes = buffer_bytes, + "Buffer flushed" + ); + last_flush = tokio::time::Instant::now(); } } @@ -183,11 +202,19 @@ impl Shard { .await; if let Err(err) = result { - if let IggyError::ProducerSendFailed { failed, cause } = &err { + if let IggyError::ProducerSendFailed { + failed, + cause, + stream_name, + topic_name, + } = &err + { let ctx = ErrorCtx { - cause: cause.clone(), + cause: cause.to_owned(), stream: msg.inner.stream, + stream_name: stream_name.clone(), topic: msg.inner.topic, + topic_name: topic_name.clone(), partitioning: msg.inner.partitioning, messages: failed.clone(), }; @@ -250,12 +277,8 @@ mod tests { Arc::new(Semaphore::new(100)), ); - let shard = Shard::new( - Arc::new(mock), - config, - flume::unbounded().0, - broadcast::channel(1).1, - ); + let (_stop_tx, stop_rx) = broadcast::channel(1); + let shard = Shard::new(Arc::new(mock), config, flume::unbounded().0, stop_rx); for _ in 0..10 { let message = ShardMessage { @@ -358,7 +381,9 @@ mod tests { let mut mock = MockProducerCoreBackend::new(); let error = IggyError::ProducerSendFailed { failed: Arc::new(vec![dummy_message(1)]), - cause: "test error".into(), + cause: Box::new(IggyError::Error), + stream_name: "1".to_string(), + topic_name: "1".to_string(), }; mock.expect_send_internal().returning(move |_, _, _, _| { @@ -392,7 +417,7 @@ mod tests { shard.send(wrapped).await.unwrap(); let err_ctx = err_rx.recv_async().await.unwrap(); - assert_eq!(err_ctx.cause, "test error"); + assert_eq!(err_ctx.cause, Box::new(IggyError::Error)); assert_eq!(err_ctx.messages.len(), 1); } diff --git a/core/sdk/src/prelude.rs b/core/sdk/src/prelude.rs index 62d91e12..b2afacbb 100644 --- a/core/sdk/src/prelude.rs +++ b/core/sdk/src/prelude.rs @@ -37,7 +37,7 @@ pub use crate::clients::consumer::{ pub use crate::clients::consumer_builder::IggyConsumerBuilder; pub use crate::clients::producer::IggyProducer; pub use crate::clients::producer_builder::IggyProducerBuilder; -pub use crate::clients::producer_config::{BackgroundConfig, SyncConfig}; +pub use crate::clients::producer_config::{BackgroundConfig, DirectConfig}; pub use crate::consumer_ext::IggyConsumerMessageExt; pub use crate::stream_builder::IggyConsumerConfig; pub use crate::stream_builder::IggyStreamConsumer; diff --git a/core/sdk/src/stream_builder/build/build_iggy_producer.rs b/core/sdk/src/stream_builder/build/build_iggy_producer.rs index 59a138cc..9522452b 100644 --- a/core/sdk/src/stream_builder/build/build_iggy_producer.rs +++ b/core/sdk/src/stream_builder/build/build_iggy_producer.rs @@ -18,7 +18,7 @@ use crate::clients::client::IggyClient; use crate::clients::producer::IggyProducer; -use crate::clients::producer_config::SyncConfig; +use crate::clients::producer_config::DirectConfig; use crate::prelude::{IggyError, IggyExpiry, MaxTopicSize}; use crate::stream_builder::IggyProducerConfig; use tracing::{error, trace}; @@ -66,8 +66,8 @@ pub(crate) async fn build_iggy_producer( IggyExpiry::ServerDefault, MaxTopicSize::ServerDefault, ) - .sync( - SyncConfig::builder() + .direct( + DirectConfig::builder() .batch_length(batch_length) .linger_time(linger_time) .build(),