This is an automated email from the ASF dual-hosted git repository. bashirbekov pushed a commit to branch feat/add-background-send in repository https://gitbox.apache.org/repos/asf/iggy.git
commit bacd54ee1f4f235900dda4aa80a02f6dbeeeb192 Author: haze518 <[email protected]> AuthorDate: Sat May 24 10:33:08 2025 +0600 del --- Cargo.lock | 1 + core/sdk/Cargo.toml | 1 + core/sdk/src/clients/producer.rs | 43 +++++++++++++++++++++++++++++++++++++++- 3 files changed, 44 insertions(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index f5ed8db8..9f632ea7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3547,6 +3547,7 @@ dependencies = [ "futures-util", "iggy_binary_protocol", "iggy_common", + "num_cpus", "quinn", "reqwest", "reqwest-middleware", diff --git a/core/sdk/Cargo.toml b/core/sdk/Cargo.toml index c763af12..32d1cb2f 100644 --- a/core/sdk/Cargo.toml +++ b/core/sdk/Cargo.toml @@ -48,6 +48,7 @@ futures = { workspace = true } futures-util = { workspace = true } iggy_binary_protocol = { workspace = true } iggy_common = { workspace = true } +num_cpus = "1.16.0" quinn = { workspace = true } reqwest = { workspace = true } reqwest-middleware = { workspace = true } diff --git a/core/sdk/src/clients/producer.rs b/core/sdk/src/clients/producer.rs index c0457538..9ba3026d 100644 --- a/core/sdk/src/clients/producer.rs +++ b/core/sdk/src/clients/producer.rs @@ -34,6 +34,25 @@ use tokio::sync::Semaphore; use tokio::task::JoinHandle; use tokio::time::{Interval, sleep}; use tracing::{error, info, trace, warn}; +use dashmap::DashMap; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering; + +struct shard { + tx: flume::Sender<Vec<IggyMessage>>, + _join_handle: JoinHandle<()>, +} + +impl shard { + fn new(id: usize) -> Self { + let (tx, rx) = flume::bounded::<Vec<IggyMessage>>(10); // todo добавить размер в конфигурацию + let handle = tokio::spawn(async move { + while let Ok(message) = rx.recv_async().await { // todo поменять на match + + } + }); + } +} pub trait ErrorCallback: Send + Sync + Debug { fn call(&self, error: IggyError, messages: Vec<IggyMessage>); @@ -72,6 +91,8 @@ pub struct IggyProducer { sema: Arc<Semaphore>, sender: Option<Arc<flume::Sender<Vec<IggyMessage>>>>, error_callback: Option<Arc<dyn ErrorCallback>>, + shard_number: usize, + // todo добавить ShardStrategy } impl IggyProducer { @@ -127,6 +148,7 @@ impl IggyProducer { sema: Arc::new(Semaphore::new(10)), sender: None, error_callback, + shard_number: default_shard_count(), } } @@ -210,7 +232,9 @@ impl IggyProducer { let send_retries_count = self.send_retries_count.clone(); let send_retries_interval = self.send_retries_interval.clone(); let can_send = self.can_send.clone(); - let sema = self.sema.clone(); + let send_batches = AtomicUsize::new(0); + let num_shard = self.shard_number; + // let sema = self.sema.clone(); let handle = tokio::spawn(async move { while let Ok(mut batch) = rx.recv_async().await { @@ -223,6 +247,10 @@ impl IggyProducer { // let client = client.clone(); // let can_send = can_send.clone(); + // if round_robin + let shard_ix = send_batches.fetch_add(1, Ordering::SeqCst) % num_shard; + + let partitioning = get_partitioning( &partitioner, &stream_id, @@ -789,3 +817,16 @@ async fn try_send_messages_new( } } } + +fn default_shard_count() -> usize { + let cpus = num_cpus::get(); + cpus.clamp(2, 16) +} + +fn default_sharder(msg: &IggyMessage) -> &[u8] { + if let Some(h) = &msg.user_headers { + return &h[..h.len().min(16)]; + } + + &msg.payload[..msg.payload.len().min(16)] +}
