This is an automated email from the ASF dual-hosted git repository.

bashirbekov pushed a commit to branch refactor/replace-sync-with-direct
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit 7ac83bdcedb154975d57b471fd0b690bc7c992a2
Author: haze518 <ashr...@gmail.com>
AuthorDate: Fri Jun 13 10:00:12 2025 +0600

    refactor: rename sync configuration to direct
---
 core/common/src/error/iggy_error.rs                |  7 +++-
 core/connectors/runtime/src/source.rs              |  6 +--
 core/examples/src/multi-tenant/producer/main.rs    |  4 +-
 core/examples/src/new-sdk/producer/main.rs         |  4 +-
 core/examples/src/sink-data-producer/main.rs       |  8 ++--
 core/sdk/src/clients/mod.rs                        |  1 +
 core/sdk/src/clients/producer.rs                   | 39 +++++++++---------
 core/sdk/src/clients/producer_builder.rs           | 14 +++----
 core/sdk/src/clients/producer_config.rs            | 13 +++---
 core/sdk/src/clients/producer_dispatcher.rs        |  4 +-
 core/sdk/src/clients/producer_error_callback.rs    | 10 +++--
 core/sdk/src/clients/producer_sharding.rs          | 47 +++++++++++++++++-----
 core/sdk/src/prelude.rs                            |  2 +-
 .../stream_builder/build/build_iggy_producer.rs    |  6 +--
 14 files changed, 99 insertions(+), 66 deletions(-)

diff --git a/core/common/src/error/iggy_error.rs 
b/core/common/src/error/iggy_error.rs
index d8b8d005..cc8a284b 100644
--- a/core/common/src/error/iggy_error.rs
+++ b/core/common/src/error/iggy_error.rs
@@ -22,7 +22,7 @@ use std::sync::Arc;
 use strum::{EnumDiscriminants, FromRepr, IntoStaticStr};
 use thiserror::Error;
 
-#[derive(Clone, Debug, Error, EnumDiscriminants, IntoStaticStr, FromRepr)]
+#[derive(Clone, Debug, Error, EnumDiscriminants, IntoStaticStr, FromRepr, 
Default)]
 #[repr(u32)]
 #[strum(serialize_all = "snake_case")]
 #[strum_discriminants(
@@ -31,6 +31,7 @@ use thiserror::Error;
     strum(serialize_all = "snake_case")
 )]
 pub enum IggyError {
+    #[default]
     #[error("Error")]
     Error = 1,
     #[error("Invalid configuration")]
@@ -375,8 +376,10 @@ pub enum IggyError {
     BackgroundSendBufferOverflow = 4055,
     #[error("Producer send failed")]
     ProducerSendFailed {
-        cause: String,
+        cause: Box<IggyError>,
         failed: Arc<Vec<IggyMessage>>,
+        stream_name: String,
+        topic_name: String,
     } = 4056,
     #[error("Producer closed")]
     ProducerClosed = 4057,
diff --git a/core/connectors/runtime/src/source.rs 
b/core/connectors/runtime/src/source.rs
index b6cdeb4d..6bc6a526 100644
--- a/core/connectors/runtime/src/source.rs
+++ b/core/connectors/runtime/src/source.rs
@@ -20,7 +20,7 @@ use dashmap::DashMap;
 use dlopen2::wrapper::Container;
 use flume::{Receiver, Sender};
 use iggy::prelude::{
-    HeaderKey, HeaderValue, IggyClient, IggyDuration, IggyError, IggyMessage, 
SyncConfig,
+    DirectConfig, HeaderKey, HeaderValue, IggyClient, IggyDuration, IggyError, 
IggyMessage,
 };
 use iggy_connector_sdk::{
     DecodedMessage, Error, ProducedMessages, StreamEncoder, TopicMetadata, 
transforms::Transform,
@@ -120,8 +120,8 @@ pub async fn init(
             let batch_length = stream.batch_length.unwrap_or(1000);
             let producer = iggy_client
                 .producer(&stream.stream, &stream.topic)?
-                .sync(
-                    SyncConfig::builder()
+                .direct(
+                    DirectConfig::builder()
                         .batch_length(batch_length)
                         .linger_time(linger_time)
                         .build(),
diff --git a/core/examples/src/multi-tenant/producer/main.rs 
b/core/examples/src/multi-tenant/producer/main.rs
index 5931f1b8..456b1c29 100644
--- a/core/examples/src/multi-tenant/producer/main.rs
+++ b/core/examples/src/multi-tenant/producer/main.rs
@@ -262,8 +262,8 @@ async fn create_producers(
         for id in 1..=producers_count {
             let producer = client
                 .producer(stream, topic)?
-                .sync(
-                    SyncConfig::builder()
+                .direct(
+                    DirectConfig::builder()
                         .batch_length(batch_length)
                         
.linger_time(IggyDuration::from_str(interval).expect("Invalid duration"))
                         .build(),
diff --git a/core/examples/src/new-sdk/producer/main.rs 
b/core/examples/src/new-sdk/producer/main.rs
index 62d2b827..8f4b7cf5 100644
--- a/core/examples/src/new-sdk/producer/main.rs
+++ b/core/examples/src/new-sdk/producer/main.rs
@@ -46,8 +46,8 @@ async fn main() -> anyhow::Result<(), Box<dyn Error>> {
     let interval = IggyDuration::from_str(&args.interval)?;
     let producer = client
         .producer(&args.stream_id, &args.topic_id)?
-        .sync(
-            SyncConfig::builder()
+        .direct(
+            DirectConfig::builder()
                 .batch_length(args.messages_per_batch)
                 .linger_time(interval)
                 .build(),
diff --git a/core/examples/src/sink-data-producer/main.rs 
b/core/examples/src/sink-data-producer/main.rs
index e97b51fb..d8a63f16 100644
--- a/core/examples/src/sink-data-producer/main.rs
+++ b/core/examples/src/sink-data-producer/main.rs
@@ -20,8 +20,8 @@ use std::{env, str::FromStr, time::Duration};
 
 use chrono::{DateTime, Days, Utc};
 use iggy::prelude::{
-    Client, IggyClient, IggyClientBuilder, IggyDuration, IggyError, 
IggyMessage, Partitioning,
-    SyncConfig,
+    Client, DirectConfig, IggyClient, IggyClientBuilder, IggyDuration, 
IggyError, IggyMessage,
+    Partitioning,
 };
 use rand::{
     Rng,
@@ -58,8 +58,8 @@ async fn main() -> Result<(), DataProducerError> {
     let client = create_client(&address, &username, &password).await?;
     let producer = client
         .producer(&stream, &topic)?
-        .sync(
-            SyncConfig::builder()
+        .direct(
+            DirectConfig::builder()
                 .batch_length(1000)
                 .linger_time(IggyDuration::from_str("5ms").unwrap())
                 .build(),
diff --git a/core/sdk/src/clients/mod.rs b/core/sdk/src/clients/mod.rs
index e50bc443..9f51cc52 100644
--- a/core/sdk/src/clients/mod.rs
+++ b/core/sdk/src/clients/mod.rs
@@ -39,3 +39,4 @@ pub mod producer_sharding;
 
 const ORDERING: std::sync::atomic::Ordering = 
std::sync::atomic::Ordering::SeqCst;
 const MAX_BATCH_LENGTH: usize = 1000000;
+const MIB: usize = 1_048_576;
diff --git a/core/sdk/src/clients/producer.rs b/core/sdk/src/clients/producer.rs
index 2480ea97..43ecf7b0 100644
--- a/core/sdk/src/clients/producer.rs
+++ b/core/sdk/src/clients/producer.rs
@@ -18,7 +18,7 @@
 use super::ORDERING;
 use crate::clients::MAX_BATCH_LENGTH;
 use crate::clients::producer_builder::SendMode;
-use crate::clients::producer_config::SyncConfig;
+use crate::clients::producer_config::DirectConfig;
 use crate::clients::producer_dispatcher::ProducerDispatcher;
 use bytes::Bytes;
 use futures_util::StreamExt;
@@ -70,7 +70,7 @@ pub struct ProducerCore {
     last_sent_at: Arc<AtomicU64>,
     send_retries_count: Option<u32>,
     send_retries_interval: Option<IggyDuration>,
-    sync_config: Option<SyncConfig>,
+    direct_config: Option<DirectConfig>,
 }
 
 impl ProducerCore {
@@ -345,6 +345,15 @@ impl ProducerCore {
         );
         sleep(Duration::from_micros(remaining)).await;
     }
+
+    fn make_failed_error(&self, cause: IggyError, failed: Vec<IggyMessage>) -> 
IggyError {
+        IggyError::ProducerSendFailed {
+            cause: Box::new(cause),
+            failed: Arc::new(failed),
+            stream_name: self.stream_name.clone(),
+            topic_name: self.topic_name.clone(),
+        }
+    }
 }
 
 impl ProducerCoreBackend for ProducerCore {
@@ -360,23 +369,17 @@ impl ProducerCoreBackend for ProducerCore {
         }
 
         if let Err(err) = self.encrypt_messages(&mut msgs) {
-            return Err(IggyError::ProducerSendFailed {
-                cause: err.to_string(),
-                failed: Arc::new(msgs),
-            });
+            return Err(self.make_failed_error(err, msgs));
         }
 
         let part = match self.get_partitioning(stream, topic, &msgs, 
partitioning.clone()) {
             Ok(p) => p,
             Err(err) => {
-                return Err(IggyError::ProducerSendFailed {
-                    cause: err.to_string(),
-                    failed: Arc::new(msgs),
-                });
+                return Err(self.make_failed_error(err, msgs));
             }
         };
 
-        match &self.sync_config {
+        match &self.direct_config {
             Some(cfg) => {
                 let linger_time_micros = cfg.linger_time.as_micros();
                 if linger_time_micros > 0 {
@@ -396,10 +399,7 @@ impl ProducerCoreBackend for ProducerCore {
 
                     if let Err(err) = self.try_send_messages(stream, topic, 
&part, chunk).await {
                         let failed_tail = msgs.split_off(index);
-                        return Err(IggyError::ProducerSendFailed {
-                            cause: err.to_string(),
-                            failed: Arc::new(failed_tail),
-                        });
+                        return Err(self.make_failed_error(err, failed_tail));
                     }
                     self.last_sent_at
                         .store(IggyTimestamp::now().into(), ORDERING);
@@ -410,10 +410,7 @@ impl ProducerCoreBackend for ProducerCore {
             _ => {
                 self.try_send_messages(stream, topic, &part, &mut msgs)
                     .await
-                    .map_err(|err| IggyError::ProducerSendFailed {
-                        cause: err.to_string(),
-                        failed: Arc::new(msgs),
-                    })?;
+                    .map_err(|err| self.make_failed_error(err, msgs))?;
                 self.last_sent_at
                     .store(IggyTimestamp::now().into(), ORDERING);
             }
@@ -473,8 +470,8 @@ impl IggyProducer {
             last_sent_at: Arc::new(AtomicU64::new(0)),
             send_retries_count,
             send_retries_interval,
-            sync_config: match mode {
-                SendMode::Sync(ref cfg) => Some(cfg.clone()),
+            direct_config: match mode {
+                SendMode::Direct(ref cfg) => Some(cfg.clone()),
                 _ => None,
             },
         });
diff --git a/core/sdk/src/clients/producer_builder.rs 
b/core/sdk/src/clients/producer_builder.rs
index 10e58f77..fcd0c4a0 100644
--- a/core/sdk/src/clients/producer_builder.rs
+++ b/core/sdk/src/clients/producer_builder.rs
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::clients::producer_config::{BackgroundConfig, SyncConfig};
+use crate::clients::producer_config::{BackgroundConfig, DirectConfig};
 use crate::prelude::IggyProducer;
 use iggy_binary_protocol::Client;
 use iggy_common::locking::IggySharedMut;
@@ -25,13 +25,13 @@ use iggy_common::{
 use std::sync::Arc;
 
 pub enum SendMode {
-    Sync(SyncConfig),
+    Direct(DirectConfig),
     Background(BackgroundConfig),
 }
 
 impl Default for SendMode {
     fn default() -> Self {
-        SendMode::Sync(SyncConfig::builder().build())
+        SendMode::Direct(DirectConfig::builder().build())
     }
 }
 
@@ -198,15 +198,15 @@ impl IggyProducerBuilder {
         }
     }
 
-    /// Sets the producer to use synchronous (direct) message sending.
+    /// Sets the producer to use direct message sending.
     /// This mode ensures that messages are sent immediately to the server
     /// without being buffered or delayed.
-    pub fn sync(mut self, config: SyncConfig) -> Self {
-        self.mode = SendMode::Sync(config);
+    pub fn direct(mut self, config: DirectConfig) -> Self {
+        self.mode = SendMode::Direct(config);
         self
     }
 
-    /// Sets the producer to use asynchronous (background) message sending.
+    /// Sets the producer to use background message sending.
     /// This mode buffers messages and sends them in the background.
     pub fn background(mut self, config: BackgroundConfig) -> Self {
         self.mode = SendMode::Background(config);
diff --git a/core/sdk/src/clients/producer_config.rs 
b/core/sdk/src/clients/producer_config.rs
index 8fe83ed2..dd3f301d 100644
--- a/core/sdk/src/clients/producer_config.rs
+++ b/core/sdk/src/clients/producer_config.rs
@@ -15,6 +15,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+use crate::clients::MIB;
 use crate::clients::producer_error_callback::{ErrorCallback, LogErrorCallback};
 use crate::clients::producer_sharding::{BalancedSharding, Sharding};
 use bon::Builder;
@@ -87,7 +88,7 @@ pub struct BackgroundConfig {
     pub sharding: Box<dyn Sharding + Send + Sync>,
     /// Maximum **total size in bytes** of a batch.  
     /// `0` ⇒ unlimited (size-based batching disabled).
-    #[builder(default = 1_048_576)]
+    #[builder(default = MIB)]
     pub batch_size: usize,
     /// Maximum **number of messages** per batch.  
     /// `0` ⇒ unlimited (length-based batching disabled).
@@ -98,7 +99,7 @@ pub struct BackgroundConfig {
     pub failure_mode: BackpressureMode,
     /// Upper bound for the **bytes held in memory** across *all* shards.  
     /// `IggyByteSize::from(0)` ⇒ unlimited.
-    #[builder(default = IggyByteSize::from(32 * 1_048_576))]
+    #[builder(default = IggyByteSize::from(32 * MIB as u64))]
     pub max_buffer_size: IggyByteSize,
     /// Maximum number of **in-flight requests** (batches being sent).  
     /// `0` ⇒ unlimited.
@@ -114,20 +115,20 @@ pub struct BackgroundConfig {
 /// use iggy_common::IggyDuration;
 ///
 /// // Send messages one-by-one (max latency, min memory per request)
-/// let cfg = SyncConfig::builder()
+/// let cfg = DirectConfig::builder()
 ///     .batch_length(1)
 ///     .linger_time(IggyDuration::from(0))
 ///     .build();
 ///
 /// // Send in chunks of up to 500 messages,
 /// // with a delay of at least 200 ms between consecutive sends.
-/// let cfg = SyncConfig::builder()
+/// let cfg = DirectConfig::builder()
 ///     .batch_length(500)
 ///     .linger_time(IggyDuration::from(200))
 ///     .build();
 /// ```
 #[derive(Clone, Builder)]
-pub struct SyncConfig {
+pub struct DirectConfig {
     /// Maximum number of messages to pack into **one** synchronous request.
     /// `0` ⇒ MAX_BATCH_LENTH().
     #[builder(default = 1000)]
@@ -139,5 +140,5 @@ pub struct SyncConfig {
 
 fn default_shard_count() -> usize {
     let cpus = num_cpus::get();
-    cpus.clamp(2, 16)
+    cpus.clamp(2, 64)
 }
diff --git a/core/sdk/src/clients/producer_dispatcher.rs 
b/core/sdk/src/clients/producer_dispatcher.rs
index 5ef07158..fa6dded4 100644
--- a/core/sdk/src/clients/producer_dispatcher.rs
+++ b/core/sdk/src/clients/producer_dispatcher.rs
@@ -425,8 +425,10 @@ mod tests {
             .returning(|_, _, _, _| {
                 Box::pin(async {
                     Err(IggyError::ProducerSendFailed {
-                        cause: "some_error".to_string(),
+                        cause: Box::new(IggyError::Error),
                         failed: Arc::new(vec![dummy_message(10)]),
+                        stream_name: "1".to_string(),
+                        topic_name: "1".to_string(),
                     })
                 })
             });
diff --git a/core/sdk/src/clients/producer_error_callback.rs 
b/core/sdk/src/clients/producer_error_callback.rs
index bfb16ac2..b8ce5890 100644
--- a/core/sdk/src/clients/producer_error_callback.rs
+++ b/core/sdk/src/clients/producer_error_callback.rs
@@ -15,7 +15,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-use iggy_common::{Identifier, IggyMessage, Partitioning};
+use iggy_common::{Identifier, IggyError, IggyMessage, Partitioning};
 use std::fmt::Debug;
 use std::pin::Pin;
 use std::sync::Arc;
@@ -23,9 +23,11 @@ use tracing::error;
 
 #[derive(Debug)]
 pub struct ErrorCtx {
-    pub cause: String,
+    pub cause: Box<IggyError>,
     pub stream: Arc<Identifier>,
+    pub stream_name: String,
     pub topic: Arc<Identifier>,
+    pub topic_name: String,
     pub partitioning: Option<Arc<Partitioning>>,
     pub messages: Arc<Vec<IggyMessage>>,
 }
@@ -54,9 +56,11 @@ impl ErrorCallback for LogErrorCallback {
                 .unwrap_or_else(|| "None".to_string());
 
             error!(
-                cause = ctx.cause,
+                cause = %ctx.cause,
                 stream = %ctx.stream,
+                stream_name = ctx.stream_name,
                 topic = %ctx.topic,
+                topic_name = ctx.topic_name,
                 partitioning = %partitioning,
                 num_messages = ctx.messages.len(),
                 "Failed to send messages in background task",
diff --git a/core/sdk/src/clients/producer_sharding.rs 
b/core/sdk/src/clients/producer_sharding.rs
index 5cbe45f5..49c2e17a 100644
--- a/core/sdk/src/clients/producer_sharding.rs
+++ b/core/sdk/src/clients/producer_sharding.rs
@@ -23,7 +23,7 @@ use std::sync::Arc;
 use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
 use tokio::sync::{OwnedSemaphorePermit, broadcast};
 use tokio::task::JoinHandle;
-use tracing::error;
+use tracing::{debug, error};
 
 /// A strategy for distributing messages across shards.
 ///
@@ -130,12 +130,31 @@ impl Shard {
                             Ok(msg) => {
                                 buffer_bytes += 
msg.inner.get_size_bytes().as_bytes_usize();
                                 buffer.push(msg);
+                                debug!(
+                                    buffer_len = buffer.len(),
+                                    buffer_bytes,
+                                    "Added message to buffer"
+                                );
 
                                 let exceed_batch_len = config.batch_length != 
0 && buffer.len() >= config.batch_length;
                                 let exceed_batch_size = config.batch_size != 0 
&& buffer_bytes >= config.batch_size;
 
                                 if exceed_batch_len || exceed_batch_size {
+                                    debug!(
+                                        exceed_batch_len,
+                                        exceed_batch_size,
+                                        "Flushing buffer (trigger: 
batch_len={}, batch_size={})",
+                                        exceed_batch_len,
+                                        exceed_batch_size,
+                                    );
+
                                     Self::flush_buffer(&core, &mut buffer, 
&mut buffer_bytes, &err_sender).await;
+                                    debug!(
+                                        new_buffer_len = buffer.len(),
+                                        new_buffer_bytes = buffer_bytes,
+                                        "Buffer flushed"
+                                    );
+
                                     last_flush = tokio::time::Instant::now();
                                 }
                             }
@@ -183,11 +202,19 @@ impl Shard {
                 .await;
 
             if let Err(err) = result {
-                if let IggyError::ProducerSendFailed { failed, cause } = &err {
+                if let IggyError::ProducerSendFailed {
+                    failed,
+                    cause,
+                    stream_name,
+                    topic_name,
+                } = &err
+                {
                     let ctx = ErrorCtx {
-                        cause: cause.clone(),
+                        cause: cause.to_owned(),
                         stream: msg.inner.stream,
+                        stream_name: stream_name.clone(),
                         topic: msg.inner.topic,
+                        topic_name: topic_name.clone(),
                         partitioning: msg.inner.partitioning,
                         messages: failed.clone(),
                     };
@@ -250,12 +277,8 @@ mod tests {
             Arc::new(Semaphore::new(100)),
         );
 
-        let shard = Shard::new(
-            Arc::new(mock),
-            config,
-            flume::unbounded().0,
-            broadcast::channel(1).1,
-        );
+        let (_stop_tx, stop_rx) = broadcast::channel(1);
+        let shard = Shard::new(Arc::new(mock), config, flume::unbounded().0, 
stop_rx);
 
         for _ in 0..10 {
             let message = ShardMessage {
@@ -358,7 +381,9 @@ mod tests {
         let mut mock = MockProducerCoreBackend::new();
         let error = IggyError::ProducerSendFailed {
             failed: Arc::new(vec![dummy_message(1)]),
-            cause: "test error".into(),
+            cause: Box::new(IggyError::Error),
+            stream_name: "1".to_string(),
+            topic_name: "1".to_string(),
         };
 
         mock.expect_send_internal().returning(move |_, _, _, _| {
@@ -392,7 +417,7 @@ mod tests {
         shard.send(wrapped).await.unwrap();
 
         let err_ctx = err_rx.recv_async().await.unwrap();
-        assert_eq!(err_ctx.cause, "test error");
+        assert_eq!(err_ctx.cause, Box::new(IggyError::Error));
         assert_eq!(err_ctx.messages.len(), 1);
     }
 
diff --git a/core/sdk/src/prelude.rs b/core/sdk/src/prelude.rs
index 62d91e12..b2afacbb 100644
--- a/core/sdk/src/prelude.rs
+++ b/core/sdk/src/prelude.rs
@@ -37,7 +37,7 @@ pub use crate::clients::consumer::{
 pub use crate::clients::consumer_builder::IggyConsumerBuilder;
 pub use crate::clients::producer::IggyProducer;
 pub use crate::clients::producer_builder::IggyProducerBuilder;
-pub use crate::clients::producer_config::{BackgroundConfig, SyncConfig};
+pub use crate::clients::producer_config::{BackgroundConfig, DirectConfig};
 pub use crate::consumer_ext::IggyConsumerMessageExt;
 pub use crate::stream_builder::IggyConsumerConfig;
 pub use crate::stream_builder::IggyStreamConsumer;
diff --git a/core/sdk/src/stream_builder/build/build_iggy_producer.rs 
b/core/sdk/src/stream_builder/build/build_iggy_producer.rs
index 59a138cc..9522452b 100644
--- a/core/sdk/src/stream_builder/build/build_iggy_producer.rs
+++ b/core/sdk/src/stream_builder/build/build_iggy_producer.rs
@@ -18,7 +18,7 @@
 
 use crate::clients::client::IggyClient;
 use crate::clients::producer::IggyProducer;
-use crate::clients::producer_config::SyncConfig;
+use crate::clients::producer_config::DirectConfig;
 use crate::prelude::{IggyError, IggyExpiry, MaxTopicSize};
 use crate::stream_builder::IggyProducerConfig;
 use tracing::{error, trace};
@@ -66,8 +66,8 @@ pub(crate) async fn build_iggy_producer(
             IggyExpiry::ServerDefault,
             MaxTopicSize::ServerDefault,
         )
-        .sync(
-            SyncConfig::builder()
+        .direct(
+            DirectConfig::builder()
                 .batch_length(batch_length)
                 .linger_time(linger_time)
                 .build(),

Reply via email to