This is an automated email from the ASF dual-hosted git repository.

bashirbekov pushed a commit to branch feat/add-background-send
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit 05468f8a173802c4b8984179106c0e24723f5505
Author: haze518 <[email protected]>
AuthorDate: Tue Jun 3 22:07:43 2025 +0600

    del
---
 .../tests/examples/test_new_publisher.rs           |  69 +++---
 core/sdk/src/clients/mod.rs                        |   3 +
 core/sdk/src/clients/producer.rs                   |  72 +-----
 core/sdk/src/clients/producer_builder.rs           |  25 +-
 core/sdk/src/clients/producer_config.rs            |  52 ++++
 core/sdk/src/clients/producer_dispatcher.rs        | 266 +++------------------
 core/sdk/src/clients/producer_error_callback.rs    |  70 ++++++
 core/sdk/src/clients/producer_sharding.rs          | 200 ++++++++++++++++
 8 files changed, 403 insertions(+), 354 deletions(-)

diff --git a/core/integration/tests/examples/test_new_publisher.rs 
b/core/integration/tests/examples/test_new_publisher.rs
index af3ba077..2c0e0e84 100644
--- a/core/integration/tests/examples/test_new_publisher.rs
+++ b/core/integration/tests/examples/test_new_publisher.rs
@@ -43,6 +43,7 @@ async fn test_new_publisher() {
     let producer = client
         .producer("1", "1")
         .unwrap()
+        // .background(|b| b.batch_length(10))
         .sync(|b| b.batch_length(10))
         // .send_mode(SendMode::Background)
         .build();
@@ -50,7 +51,7 @@ async fn test_new_publisher() {
     producer.init().await.unwrap();
 
     let mut t = Vec::new();
-    let batches_to_send = 10_000;
+    let batches_to_send = 10_00;
     let messages_per_batch = 10;
     let total_expected = batches_to_send * messages_per_batch;
 
@@ -71,37 +72,37 @@ async fn test_new_publisher() {
         t.push(start.elapsed().as_millis());
     }
 
-    let mut consumer = client
-        .consumer_group("some-consumer", "1", "1")
-        .unwrap()
-        .auto_commit(AutoCommit::When(AutoCommitWhen::PollingMessages))
-        .create_consumer_group_if_not_exists()
-        .auto_join_consumer_group()
-        .polling_strategy(PollingStrategy::next())
-        .poll_interval(IggyDuration::from_str("1ms").unwrap())
-        .batch_length(10)
-        .build();
-
-    consumer.init().await.unwrap();
-
-    let mut received = 0;
-    while let Some(msg) = consumer.next().await {
-        match msg {
-            Ok(_) => {
-                received += 1;
-                if received >= total_expected {
-                    break;
-                }
-            }
-            Err(e) => panic!("Consumer error: {}", e),
-        }
-    }
-
-    assert_eq!(
-        received, total_expected,
-        "Not all messages received: got {}, expected {}",
-        received, total_expected
-    );
+    // let mut consumer = client
+    //     .consumer_group("some-consumer", "1", "1")
+    //     .unwrap()
+    //     .auto_commit(AutoCommit::When(AutoCommitWhen::PollingMessages))
+    //     .create_consumer_group_if_not_exists()
+    //     .auto_join_consumer_group()
+    //     .polling_strategy(PollingStrategy::next())
+    //     .poll_interval(IggyDuration::from_str("1ms").unwrap())
+    //     .batch_length(10)
+    //     .build();
+
+    // consumer.init().await.unwrap();
+
+    // let mut received = 0;
+    // while let Some(msg) = consumer.next().await {
+    //     match msg {
+    //         Ok(_) => {
+    //             received += 1;
+    //             if received >= total_expected {
+    //                 break;
+    //             }
+    //         }
+    //         Err(e) => panic!("Consumer error: {}", e),
+    //     }
+    // }
+
+    // assert_eq!(
+    //     received, total_expected,
+    //     "Not all messages received: got {}, expected {}",
+    //     received, total_expected
+    // );
 
     let total: u128 = t.iter().sum();
     let avg = total as f64 / t.len() as f64;
@@ -111,3 +112,7 @@ async fn test_new_publisher() {
 
 // sync: avg send: 1.561ms; overall: 46.71s
 // async: avg send: 0.356ms; overall: 28.67s
+
+// master sync: 44s
+// new sync: 44.3s
+// new async: 43s
diff --git a/core/sdk/src/clients/mod.rs b/core/sdk/src/clients/mod.rs
index 96916967..f46bc2a4 100644
--- a/core/sdk/src/clients/mod.rs
+++ b/core/sdk/src/clients/mod.rs
@@ -33,6 +33,9 @@ pub mod consumer_builder;
 pub mod producer;
 pub mod producer_builder;
 pub mod producer_dispatcher;
+pub mod producer_config;
+pub mod producer_error_callback;
+pub mod producer_sharding;
 
 const ORDERING: std::sync::atomic::Ordering = 
std::sync::atomic::Ordering::SeqCst;
 const MAX_BATCH_LENGTH: usize = 1000000;
diff --git a/core/sdk/src/clients/producer.rs b/core/sdk/src/clients/producer.rs
index b2014d9f..88a44204 100644
--- a/core/sdk/src/clients/producer.rs
+++ b/core/sdk/src/clients/producer.rs
@@ -1,4 +1,3 @@
-use super::producer_builder::{SendMode, SyncConfig};
 /* Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -16,9 +15,11 @@ use super::producer_builder::{SendMode, SyncConfig};
  * specific language governing permissions and limitations
  * under the License.
  */
-use super::{MAX_BATCH_LENGTH, ORDERING};
+use super::ORDERING;
 
-use super::producer_dispatcher::{BackgroundConfig, BalancedSharding, 
ProducerDispatcher, Sharding};
+use crate::clients::producer_builder::SendMode;
+use crate::clients::producer_config::SyncConfig;
+use crate::clients::producer_dispatcher::ProducerDispatcher;
 use bytes::Bytes;
 use futures_util::StreamExt;
 use iggy_binary_protocol::Client;
@@ -27,8 +28,6 @@ use iggy_common::{
     CompressionAlgorithm, DiagnosticEvent, EncryptorKind, IdKind, Identifier, 
IggyDuration,
     IggyError, IggyExpiry, IggyMessage, IggyTimestamp, MaxTopicSize, 
Partitioner, Partitioning,
 };
-use std::fmt::Debug;
-use std::pin::Pin;
 use std::sync::Arc;
 use std::sync::atomic::Ordering;
 use std::sync::atomic::{AtomicBool, AtomicU64};
@@ -36,20 +35,6 @@ use std::time::Duration;
 use tokio::time::{Interval, sleep};
 use tracing::{error, info, trace, warn};
 
-pub trait SendStrategy: Send + Sync + 'static {
-    fn send_batch(
-        &self,
-        stream: &Identifier,
-        topic: &Identifier,
-        msgs: Vec<IggyMessage>,
-        partitioning: Option<Arc<Partitioning>>,
-    ) -> impl Future<Output = Result<(), IggyError>> + Send;
-}
-
-pub trait AsyncSendStrategy: SendStrategy {
-    fn flush(&self) -> Result<(), IggyError>;
-}
-
 pub struct ProducerCore {
     initialized: AtomicBool,
     can_send: Arc<AtomicBool>,
@@ -217,7 +202,7 @@ impl ProducerCore {
                     Some(t) => t.as_micros(),
                     None => 0
                 };
-                if !cfg.can_send_immediately && linger_time_micros > 0 {
+                if linger_time_micros > 0 {
                     Self::wait_before_sending(
                         linger_time_micros,
                         self.last_sent_at.load(ORDERING),
@@ -429,53 +414,6 @@ impl ProducerCore {
     }
 }
 
-pub struct ErrorCtx {
-    pub cause: String,
-    pub stream: Arc<Identifier>,
-    pub topic: Arc<Identifier>,
-    pub partitioning: Option<Arc<Partitioning>>,
-    pub messages: Arc<Vec<IggyMessage>>,
-}
-
-pub trait ErrorCallback: Send + Sync + Debug + 'static {
-    fn call(&self, ctx: ErrorCtx) -> Pin<Box<dyn Future<Output = ()> + Send + 
'static>>;
-}
-
-pub struct LogErrorCallback;
-
-impl Default for LogErrorCallback {
-    fn default() -> Self {
-        Self
-    }
-}
-
-impl std::fmt::Debug for LogErrorCallback {
-    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
-        f.debug_struct("LogErrorCallback").finish()
-    }
-}
-
-impl ErrorCallback for LogErrorCallback {
-    fn call(&self, ctx: ErrorCtx) -> Pin<Box<dyn Future<Output = ()> + Send + 
'static>> {
-        Box::pin(async move {
-            let partitioning = ctx
-                .partitioning
-                .as_ref()
-                .map(|p| format!("{:?}", p))
-                .unwrap_or_else(|| "None".to_string());
-
-            error!(
-                cause = ctx.cause,
-                stream = %ctx.stream,
-                topic = %ctx.topic,
-                partitioning = %partitioning,
-                num_messages = ctx.messages.len(),
-                "Failed to send messages in background task",
-            );
-        })
-    }
-}
-
 unsafe impl Send for IggyProducer {}
 unsafe impl Sync for IggyProducer {}
 
diff --git a/core/sdk/src/clients/producer_builder.rs 
b/core/sdk/src/clients/producer_builder.rs
index f92a5da1..6f3e001b 100644
--- a/core/sdk/src/clients/producer_builder.rs
+++ b/core/sdk/src/clients/producer_builder.rs
@@ -16,8 +16,9 @@
 // under the License.
 
 use super::MAX_BATCH_LENGTH;
-use super::producer::LogErrorCallback;
-use super::producer_dispatcher::{BackgroundConfig, BackpressureMode, 
BalancedSharding, Sharding};
+use crate::clients::producer_config::{BackgroundConfig, BackpressureMode, 
SyncConfig};
+use crate::clients::producer_error_callback::LogErrorCallback;
+use crate::clients::producer_sharding::{BalancedSharding, Sharding};
 use crate::prelude::{ErrorCallback, IggyProducer};
 use iggy_binary_protocol::Client;
 use iggy_common::locking::IggySharedMut;
@@ -27,17 +28,8 @@ use iggy_common::{
 };
 use std::sync::Arc;
 
-// TODO move here BackgroundConfig
-
-#[derive(Clone)]
-pub struct SyncConfig {
-    pub batch_length: usize,
-    pub linger_time: Option<IggyDuration>,
-    pub can_send_immediately: bool,
-}
-
 pub struct BackgroundBuilder {
-    max_in_flight: Option<usize>, // TODO rename to shards_count
+    num_shards: Option<usize>,
     batch_size: Option<usize>,
     batch_length: Option<usize>,
     failure_mode: Option<BackpressureMode>,
@@ -51,7 +43,7 @@ pub struct BackgroundBuilder {
 impl Default for BackgroundBuilder {
     fn default() -> Self {
         BackgroundBuilder {
-            max_in_flight: Some(default_shard_count()),
+            num_shards: Some(default_shard_count()),
             sharding: Box::new(BalancedSharding::default()),
             error_callback: Box::new(LogErrorCallback::default()),
             batch_size: Some(1_048_576),
@@ -100,9 +92,9 @@ impl BackgroundBuilder {
         }
     }
 
-    pub fn max_in_flight(self, value: usize) -> Self {
+    pub fn num_shards(self, value: usize) -> Self {
         Self {
-            max_in_flight: Some(value),
+            num_shards: Some(value),
             ..self
         }
     }
@@ -123,7 +115,7 @@ impl BackgroundBuilder {
 
     pub fn build(self) -> BackgroundConfig {
         BackgroundConfig {
-            max_in_flight: self.max_in_flight.unwrap_or(8),
+            num_shards: self.num_shards.unwrap_or(8),
             batch_size: self.batch_size,
             batch_length: self.batch_length,
             failure_mode: self.failure_mode.unwrap_or(BackpressureMode::Block),
@@ -190,7 +182,6 @@ impl SyncBuilder {
         SyncConfig {
             batch_length: self.batch_length.unwrap_or(MAX_BATCH_LENGTH),
             linger_time: self.linger_time,
-            can_send_immediately: self.linger_time.is_some(),
         }
     }
 }
diff --git a/core/sdk/src/clients/producer_config.rs 
b/core/sdk/src/clients/producer_config.rs
new file mode 100644
index 00000000..0865a05c
--- /dev/null
+++ b/core/sdk/src/clients/producer_config.rs
@@ -0,0 +1,52 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use std::sync::Arc;
+
+use iggy_common::{IggyByteSize, IggyDuration};
+
+use crate::clients::producer_sharding::Sharding;
+use crate::prelude::ErrorCallback;
+
+#[derive(Debug, Clone)]
+/// Determines how the `send_messages` API should behave when problem is 
encountered
+pub enum BackpressureMode {
+    /// Block until the send succeeds
+    Block,
+    /// Block with a timeout, after which the send fails
+    BlockWithTimeout(IggyDuration),
+    /// Fail immediately without retrying
+    FailImmediately,
+}
+
+#[derive(Debug)]
+pub struct BackgroundConfig {
+    pub num_shards: usize,
+    pub batch_size: Option<usize>,
+    pub batch_length: Option<usize>,
+    pub failure_mode: BackpressureMode,
+    pub max_buffer_size: Option<IggyByteSize>,
+    pub linger_time: IggyDuration,
+    pub error_callback: Arc<Box<dyn ErrorCallback + Send + Sync>>,
+    pub sharding: Box<dyn Sharding + Send + Sync>,
+}
+
+#[derive(Clone)]
+pub struct SyncConfig {
+    pub batch_length: usize,
+    pub linger_time: Option<IggyDuration>,
+}
diff --git a/core/sdk/src/clients/producer_dispatcher.rs 
b/core/sdk/src/clients/producer_dispatcher.rs
index 66d811fa..0cc38aa2 100644
--- a/core/sdk/src/clients/producer_dispatcher.rs
+++ b/core/sdk/src/clients/producer_dispatcher.rs
@@ -1,243 +1,34 @@
-use std::{
-    sync::{
-        Arc,
-        atomic::{AtomicBool, AtomicUsize, Ordering},
-    },
-};
-
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use crate::clients::producer::ProducerCore;
+use crate::clients::producer_config::{BackgroundConfig, BackpressureMode};
+use crate::clients::producer_error_callback::ErrorCtx;
+use crate::clients::producer_sharding::{Shard, ShardMessage};
+use std::sync::Arc;
+use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
 use futures::FutureExt;
 use iggy_common::{
-    Identifier, IggyByteSize, IggyDuration, IggyError, IggyMessage, 
Partitioning, Sizeable,
+    Identifier, IggyError, IggyMessage, Partitioning, Sizeable,
 };
-use tokio::{sync::Notify, task::JoinHandle};
-use tracing::error;
-use super::producer::{ErrorCtx, ProducerCore};
-
-use crate::prelude::ErrorCallback;
-
-#[derive(Debug, Clone)]
-/// Determines how the `send_messages` API should behave when problem is 
encountered
-pub enum BackpressureMode {
-    /// Block until the send succeeds
-    Block,
-    /// Block with a timeout, after which the send fails
-    BlockWithTimeout(IggyDuration),
-    /// Fail immediately without retrying
-    FailImmediately,
-}
-
-#[derive(Debug)]
-pub struct BackgroundConfig {
-    pub max_in_flight: usize,
-    pub batch_size: Option<usize>,
-    pub batch_length: Option<usize>,
-    pub failure_mode: BackpressureMode,
-    pub max_buffer_size: Option<IggyByteSize>,
-    pub linger_time: IggyDuration,
-    pub error_callback: Arc<Box<dyn ErrorCallback + Send + Sync>>,
-    pub sharding: Box<dyn Sharding + Send + Sync>,
-}
-
-#[derive(Debug)]
-pub struct ShardMessage {
-    pub stream: Arc<Identifier>,
-    pub topic: Arc<Identifier>,
-    pub messages: Vec<IggyMessage>,
-    pub partitioning: Option<Arc<Partitioning>>,
-}
-
-impl Sizeable for ShardMessage {
-    fn get_size_bytes(&self) -> IggyByteSize {
-        let mut total = IggyByteSize::new(0);
-
-        total += self.stream.get_size_bytes();
-        total += self.topic.get_size_bytes();
-        for msg in &self.messages {
-            total += msg.get_size_bytes();
-        }
-        total
-    }
-}
-
-pub struct Shard {
-    tx: flume::Sender<ShardMessage>,
-    _handle: JoinHandle<()>,
-}
-
-impl Shard {
-    pub fn new(
-        core: Arc<ProducerCore>,
-        global_buffer: Arc<AtomicUsize>,
-        config: Arc<BackgroundConfig>,
-        notify: Arc<Notify>,
-        err_sender: flume::Sender<ErrorCtx>,
-    ) -> Self {
-        let (tx, rx) = flume::bounded::<ShardMessage>(1024);
-
-        let handle = tokio::spawn(async move {
-            let mut buffer = Vec::new();
-            let mut buffer_bytes = 0;
-            let mut last_flush = tokio::time::Instant::now();
-
-            loop {
-                let deadline = last_flush + config.linger_time.get_duration();
-                tokio::select! {
-                    maybe_msg = rx.recv_async() => {
-                        match maybe_msg {
-                            Ok(msg) => {
-                                buffer_bytes += 
msg.get_size_bytes().as_bytes_usize();
-                                buffer.push(msg);
-
-                                let exceed_batch_len = config.batch_length
-                                    .map_or(false, |len| buffer.len() >= len);
-                                let exceed_batch_size = config.batch_size
-                                    .map_or(false, |size| buffer_bytes >= 
size);
-
-                                if exceed_batch_len || exceed_batch_size {
-                                    Self::flush_buffer(&core, &mut buffer, 
&mut buffer_bytes, &global_buffer, &err_sender, &notify).await;
-                                    last_flush = tokio::time::Instant::now();
-                                }
-                            }
-                            Err(_) => break,
-                        }
-                    }
-                    _ = tokio::time::sleep_until(deadline) => {
-                        if !buffer.is_empty() {
-                            Self::flush_buffer(&core, &mut buffer, &mut 
buffer_bytes, &global_buffer, &err_sender, &notify).await;
-                            last_flush = tokio::time::Instant::now();
-                        }
-                    }
-                }
-            }
-        });
-
-        Self {
-            tx,
-            _handle: handle,
-        }
-    }
-
-    async fn flush_buffer(
-        core: &Arc<ProducerCore>,
-        buffer: &mut Vec<ShardMessage>,
-        buffer_bytes: &mut usize,
-        global_buffer: &Arc<AtomicUsize>,
-        err_sender: &flume::Sender<ErrorCtx>,
-        notify: &Arc<Notify>,
-    ) {
-        for msg in buffer.drain(..) {
-            let size = msg.get_size_bytes().as_bytes_usize();
-            if let Err(err) = core
-                .send_internal(
-                    &msg.stream,
-                    &msg.topic,
-                    msg.messages,
-                    msg.partitioning.clone(),
-                )
-                .await
-            {
-                if let IggyError::ProducerSendFailed { failed, cause } = &err {
-                    let ctx = ErrorCtx {
-                        cause: cause.clone(),
-                        stream: msg.stream,
-                        topic: msg.topic,
-                        partitioning: msg.partitioning,
-                        messages: failed.clone(),
-                    };
-                    if err_sender.send_async(ctx).await.is_err() {
-                        tracing::warn!("error-queue receiver has been dropped; 
lost error report");
-                    }
-                } else {
-                    tracing::error!("background send failed: {err}");
-                }
-            }
-
-            global_buffer.fetch_sub(size, Ordering::Relaxed);
-            notify.notify_waiters();
-        }
-        *buffer_bytes = 0;
-    }
-
-    async fn send_with_block(&self, message: ShardMessage) -> Result<(), 
IggyError> {
-        self.tx.send_async(message).await.map_err(|e| {
-            error!("Failed to send_with_block: {e}");
-            IggyError::BackgroundSendError
-        })
-    }
-
-    async fn send_with_timeout(
-        &self,
-        message: ShardMessage,
-        timeout: IggyDuration,
-    ) -> Result<(), IggyError> {
-        match tokio::time::timeout(timeout.get_duration(), 
self.tx.send_async(message)).await {
-            Ok(Ok(())) => Ok(()),
-            Ok(Err(e)) => {
-                error!("Channel send failed during timeout: {e}");
-                Err(IggyError::BackgroundSendTimeout)
-            }
-            Err(_) => {
-                error!("Timeout elapsed before sending message batch");
-                Err(IggyError::BackgroundSendTimeout)
-            }
-        }
-    }
-
-    fn send_with_fail(&self, message: ShardMessage) -> Result<(), IggyError> {
-        self.tx.try_send(message).map_err(|_| {
-            error!("Channel is full, dropping message batch");
-            IggyError::BackgroundSendError
-        })
-    }
-
-    async fn shutdown(self) {
-        drop(self.tx);
-        let _ = self._handle.await;
-    }
-}
-
-pub trait Sharding: Send + Sync + std::fmt::Debug + 'static {
-    fn pick_shard(
-        &self,
-        shards: &[Shard],
-        messages: &[IggyMessage],
-        stream: &Identifier,
-        topic: &Identifier,
-    ) -> usize;
-}
-
-pub struct BalancedSharding {
-    counter: AtomicUsize,
-}
-
-impl Default for BalancedSharding {
-    fn default() -> Self {
-        Self {
-            counter: AtomicUsize::new(0),
-        }
-    }
-}
-
-impl std::fmt::Debug for BalancedSharding {
-    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
-        f.debug_struct("BalancedSharding").finish()
-    }
-}
 
-impl Sharding for BalancedSharding {
-    fn pick_shard(
-        &self,
-        shards: &[Shard],
-        _: &[IggyMessage],
-        _: &Identifier,
-        _: &Identifier,
-    ) -> usize {
-        self.counter.fetch_add(1, Ordering::Relaxed) % shards.len()
-    }
-}
+use tokio::{sync::Notify, task::JoinHandle};
 
 pub struct ProducerDispatcher {
-    core: Arc<ProducerCore>,
     shards: Vec<Shard>,
     global_buffer: Arc<AtomicUsize>,
     notify: Arc<Notify>,
@@ -248,7 +39,7 @@ pub struct ProducerDispatcher {
 
 impl ProducerDispatcher {
     pub fn new(core: Arc<ProducerCore>, config: BackgroundConfig) -> Self {
-        let mut shards = Vec::with_capacity(config.max_in_flight);
+        let mut shards = Vec::with_capacity(config.num_shards);
         let current_buffered_size = Arc::new(AtomicUsize::new(0));
         let notify = Arc::new(Notify::new());
         let config = Arc::new(config);
@@ -267,7 +58,7 @@ impl ProducerDispatcher {
             tracing::debug!("error-callback worker finished");
         });
 
-        for _ in 0..config.max_in_flight {
+        for _ in 0..config.num_shards {
             shards.push(Shard::new(
                 core.clone(),
                 current_buffered_size.clone(),
@@ -278,7 +69,6 @@ impl ProducerDispatcher {
         }
 
         Self {
-            core,
             shards,
             global_buffer: current_buffered_size,
             config,
@@ -355,7 +145,7 @@ impl ProducerDispatcher {
         );
         debug_assert!(shard_ix < self.shards.len());
         let shard = &self.shards[shard_ix];
-        shard.send_with_block(shard_message).await
+        shard.send(shard_message).await
     }
 
     pub async fn shutdown(mut self) {
diff --git a/core/sdk/src/clients/producer_error_callback.rs 
b/core/sdk/src/clients/producer_error_callback.rs
new file mode 100644
index 00000000..13a2ac45
--- /dev/null
+++ b/core/sdk/src/clients/producer_error_callback.rs
@@ -0,0 +1,70 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use std::pin::Pin;
+use std::sync::Arc;
+use std::fmt::Debug;
+
+use iggy_common::{Identifier, IggyMessage, Partitioning};
+use tracing::error;
+
+pub struct ErrorCtx {
+    pub cause: String,
+    pub stream: Arc<Identifier>,
+    pub topic: Arc<Identifier>,
+    pub partitioning: Option<Arc<Partitioning>>,
+    pub messages: Arc<Vec<IggyMessage>>,
+}
+
+pub trait ErrorCallback: Send + Sync + Debug + 'static {
+    fn call(&self, ctx: ErrorCtx) -> Pin<Box<dyn Future<Output = ()> + Send + 
'static>>;
+}
+
+pub struct LogErrorCallback;
+
+impl Default for LogErrorCallback {
+    fn default() -> Self {
+        Self
+    }
+}
+
+impl std::fmt::Debug for LogErrorCallback {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("LogErrorCallback").finish()
+    }
+}
+
+impl ErrorCallback for LogErrorCallback {
+    fn call(&self, ctx: ErrorCtx) -> Pin<Box<dyn Future<Output = ()> + Send + 
'static>> {
+        Box::pin(async move {
+            let partitioning = ctx
+                .partitioning
+                .as_ref()
+                .map(|p| format!("{:?}", p))
+                .unwrap_or_else(|| "None".to_string());
+
+            error!(
+                cause = ctx.cause,
+                stream = %ctx.stream,
+                topic = %ctx.topic,
+                partitioning = %partitioning,
+                num_messages = ctx.messages.len(),
+                "Failed to send messages in background task",
+            );
+        })
+    }
+}
diff --git a/core/sdk/src/clients/producer_sharding.rs 
b/core/sdk/src/clients/producer_sharding.rs
new file mode 100644
index 00000000..077809fb
--- /dev/null
+++ b/core/sdk/src/clients/producer_sharding.rs
@@ -0,0 +1,200 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use std::sync::Arc;
+use std::sync::atomic::{AtomicUsize, Ordering};
+
+use iggy_common::{Identifier, IggyByteSize, IggyError, IggyMessage, 
Partitioning, Sizeable};
+use tokio::sync::Notify;
+use tokio::task::JoinHandle;
+use tracing::error;
+
+use crate::clients::producer::ProducerCore;
+use crate::clients::producer_config::BackgroundConfig;
+use crate::clients::producer_error_callback::ErrorCtx;
+
+pub trait Sharding: Send + Sync + std::fmt::Debug + 'static {
+    fn pick_shard(
+        &self,
+        shards: &[Shard],
+        messages: &[IggyMessage],
+        stream: &Identifier,
+        topic: &Identifier,
+    ) -> usize;
+}
+
+pub struct BalancedSharding {
+    counter: AtomicUsize,
+}
+
+impl Default for BalancedSharding {
+    fn default() -> Self {
+        Self {
+            counter: AtomicUsize::new(0),
+        }
+    }
+}
+
+impl std::fmt::Debug for BalancedSharding {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("BalancedSharding").finish()
+    }
+}
+
+impl Sharding for BalancedSharding {
+    fn pick_shard(
+        &self,
+        shards: &[Shard],
+        _: &[IggyMessage],
+        _: &Identifier,
+        _: &Identifier,
+    ) -> usize {
+        self.counter.fetch_add(1, Ordering::Relaxed) % shards.len()
+    }
+}
+
+#[derive(Debug)]
+pub struct ShardMessage {
+    pub stream: Arc<Identifier>,
+    pub topic: Arc<Identifier>,
+    pub messages: Vec<IggyMessage>,
+    pub partitioning: Option<Arc<Partitioning>>,
+}
+
+impl Sizeable for ShardMessage {
+    fn get_size_bytes(&self) -> IggyByteSize {
+        let mut total = IggyByteSize::new(0);
+        total += self.stream.get_size_bytes();
+        total += self.topic.get_size_bytes();
+        for msg in &self.messages {
+            total += msg.get_size_bytes();
+        }
+        total
+    }
+}
+
+pub(crate) struct Shard {
+    tx: flume::Sender<ShardMessage>,
+    _handle: JoinHandle<()>,
+}
+
+impl Shard {
+    pub fn new(
+        core: Arc<ProducerCore>,
+        global_buffer: Arc<AtomicUsize>,
+        config: Arc<BackgroundConfig>,
+        notify: Arc<Notify>,
+        err_sender: flume::Sender<ErrorCtx>,
+    ) -> Self {
+        let (tx, rx) = flume::bounded::<ShardMessage>(1024);
+
+        let handle = tokio::spawn(async move {
+            let mut buffer = Vec::new();
+            let mut buffer_bytes = 0;
+            let mut last_flush = tokio::time::Instant::now();
+
+            loop {
+                let deadline = last_flush + config.linger_time.get_duration();
+                tokio::select! {
+                    maybe_msg = rx.recv_async() => {
+                        match maybe_msg {
+                            Ok(msg) => {
+                                buffer_bytes += 
msg.get_size_bytes().as_bytes_usize();
+                                buffer.push(msg);
+
+                                let exceed_batch_len = config.batch_length
+                                    .map_or(false, |len| buffer.len() >= len);
+                                let exceed_batch_size = config.batch_size
+                                    .map_or(false, |size| buffer_bytes >= 
size);
+
+                                if exceed_batch_len || exceed_batch_size {
+                                    Self::flush_buffer(&core, &mut buffer, 
&mut buffer_bytes, &global_buffer, &err_sender, &notify).await;
+                                    last_flush = tokio::time::Instant::now();
+                                }
+                            }
+                            Err(_) => break,
+                        }
+                    }
+                    _ = tokio::time::sleep_until(deadline) => {
+                        if !buffer.is_empty() {
+                            Self::flush_buffer(&core, &mut buffer, &mut 
buffer_bytes, &global_buffer, &err_sender, &notify).await;
+                            last_flush = tokio::time::Instant::now();
+                        }
+                    }
+                }
+            }
+        });
+
+        Self {
+            tx,
+            _handle: handle,
+        }
+    }
+
+    async fn flush_buffer(
+        core: &Arc<ProducerCore>,
+        buffer: &mut Vec<ShardMessage>,
+        buffer_bytes: &mut usize,
+        global_buffer: &Arc<AtomicUsize>,
+        err_sender: &flume::Sender<ErrorCtx>,
+        notify: &Arc<Notify>,
+    ) {
+        for msg in buffer.drain(..) {
+            let size = msg.get_size_bytes().as_bytes_usize();
+            if let Err(err) = core
+                .send_internal(
+                    &msg.stream,
+                    &msg.topic,
+                    msg.messages,
+                    msg.partitioning.clone(),
+                )
+                .await
+            {
+                if let IggyError::ProducerSendFailed { failed, cause } = &err {
+                    let ctx = ErrorCtx {
+                        cause: cause.clone(),
+                        stream: msg.stream,
+                        topic: msg.topic,
+                        partitioning: msg.partitioning,
+                        messages: failed.clone(),
+                    };
+                    if err_sender.send_async(ctx).await.is_err() {
+                        tracing::warn!("error-queue receiver has been dropped; 
lost error report");
+                    }
+                } else {
+                    tracing::error!("background send failed: {err}");
+                }
+            }
+
+            global_buffer.fetch_sub(size, Ordering::Relaxed);
+            notify.notify_waiters();
+        }
+        *buffer_bytes = 0;
+    }
+
+    pub(crate) async fn send(&self, message: ShardMessage) -> Result<(), 
IggyError> {
+        self.tx.send_async(message).await.map_err(|e| {
+            error!("Failed to send_with_block: {e}");
+            IggyError::BackgroundSendError
+        })
+    }
+
+    pub(crate) async fn shutdown(self) {
+        drop(self.tx);
+        let _ = self._handle.await;
+    }
+}

Reply via email to