This is an automated email from the ASF dual-hosted git repository.

bashirbekov pushed a commit to branch feat/add-background-send
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit bacd54ee1f4f235900dda4aa80a02f6dbeeeb192
Author: haze518 <[email protected]>
AuthorDate: Sat May 24 10:33:08 2025 +0600

    del
---
 Cargo.lock                       |  1 +
 core/sdk/Cargo.toml              |  1 +
 core/sdk/src/clients/producer.rs | 43 +++++++++++++++++++++++++++++++++++++++-
 3 files changed, 44 insertions(+), 1 deletion(-)

diff --git a/Cargo.lock b/Cargo.lock
index f5ed8db8..9f632ea7 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -3547,6 +3547,7 @@ dependencies = [
  "futures-util",
  "iggy_binary_protocol",
  "iggy_common",
+ "num_cpus",
  "quinn",
  "reqwest",
  "reqwest-middleware",
diff --git a/core/sdk/Cargo.toml b/core/sdk/Cargo.toml
index c763af12..32d1cb2f 100644
--- a/core/sdk/Cargo.toml
+++ b/core/sdk/Cargo.toml
@@ -48,6 +48,7 @@ futures = { workspace = true }
 futures-util = { workspace = true }
 iggy_binary_protocol = { workspace = true }
 iggy_common = { workspace = true }
+num_cpus = "1.16.0"
 quinn = { workspace = true }
 reqwest = { workspace = true }
 reqwest-middleware = { workspace = true }
diff --git a/core/sdk/src/clients/producer.rs b/core/sdk/src/clients/producer.rs
index c0457538..9ba3026d 100644
--- a/core/sdk/src/clients/producer.rs
+++ b/core/sdk/src/clients/producer.rs
@@ -34,6 +34,25 @@ use tokio::sync::Semaphore;
 use tokio::task::JoinHandle;
 use tokio::time::{Interval, sleep};
 use tracing::{error, info, trace, warn};
+use dashmap::DashMap;
+use std::sync::atomic::AtomicUsize;
+use std::sync::atomic::Ordering;
+
+struct shard {
+    tx: flume::Sender<Vec<IggyMessage>>,
+    _join_handle: JoinHandle<()>,
+}
+
+impl shard {
+    fn new(id: usize) -> Self {
+        let (tx, rx) = flume::bounded::<Vec<IggyMessage>>(10); // todo 
добавить размер в конфигурацию
+        let handle = tokio::spawn(async move {
+            while let Ok(message) = rx.recv_async().await { // todo поменять 
на match
+                
+            }
+        });
+    }
+}
 
 pub trait ErrorCallback: Send + Sync + Debug {
     fn call(&self, error: IggyError, messages: Vec<IggyMessage>);
@@ -72,6 +91,8 @@ pub struct IggyProducer {
     sema: Arc<Semaphore>,
     sender: Option<Arc<flume::Sender<Vec<IggyMessage>>>>,
     error_callback: Option<Arc<dyn ErrorCallback>>,
+    shard_number: usize,
+    // todo добавить ShardStrategy
 }
 
 impl IggyProducer {
@@ -127,6 +148,7 @@ impl IggyProducer {
             sema: Arc::new(Semaphore::new(10)),
             sender: None,
             error_callback,
+            shard_number: default_shard_count(),
         }
     }
 
@@ -210,7 +232,9 @@ impl IggyProducer {
         let send_retries_count = self.send_retries_count.clone();
         let send_retries_interval = self.send_retries_interval.clone();
         let can_send = self.can_send.clone();
-        let sema = self.sema.clone();
+        let send_batches = AtomicUsize::new(0);
+        let num_shard = self.shard_number;
+        // let sema = self.sema.clone();
 
         let handle = tokio::spawn(async move {
             while let Ok(mut batch) = rx.recv_async().await {
@@ -223,6 +247,10 @@ impl IggyProducer {
                 // let client = client.clone();
                 // let can_send = can_send.clone();
 
+                // if round_robin
+                let shard_ix = send_batches.fetch_add(1, Ordering::SeqCst) % 
num_shard;
+
+
                 let partitioning = get_partitioning(
                     &partitioner,
                     &stream_id,
@@ -789,3 +817,16 @@ async fn try_send_messages_new(
         }
     }
 }
+
+fn default_shard_count() -> usize {
+    let cpus = num_cpus::get();
+    cpus.clamp(2, 16)
+}
+
+fn default_sharder(msg: &IggyMessage) -> &[u8] {
+    if let Some(h) = &msg.user_headers {
+        return &h[..h.len().min(16)];
+    }
+
+    &msg.payload[..msg.payload.len().min(16)]
+}

Reply via email to