This is an automated email from the ASF dual-hosted git repository. bashirbekov pushed a commit to branch feat/add-background-send in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 08854aaa46dd6ad0ef8efbacbc69a9676fb7237a Author: haze518 <[email protected]> AuthorDate: Wed May 28 06:21:37 2025 +0600 del --- core/sdk/src/clients/producer.rs | 700 +++++++++++++++++++------------------- core/sdk/src/clients/send_mode.rs | 59 +++- 2 files changed, 403 insertions(+), 356 deletions(-) diff --git a/core/sdk/src/clients/producer.rs b/core/sdk/src/clients/producer.rs index 9ba3026d..a8c106a9 100644 --- a/core/sdk/src/clients/producer.rs +++ b/core/sdk/src/clients/producer.rs @@ -1,4 +1,4 @@ -use super::send_mode::{BackpressureMode, SendMode}; +use super::send_mode::{BackgroundConfig, BackpressureMode, Dispatcher, SendMode}; /* Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -19,6 +19,7 @@ use super::send_mode::{BackpressureMode, SendMode}; use super::{MAX_BATCH_LENGTH, ORDERING}; use bytes::Bytes; +use dashmap::DashMap; use futures_util::StreamExt; use iggy_binary_protocol::Client; use iggy_common::locking::{IggySharedMut, IggySharedMutFn}; @@ -28,41 +29,17 @@ use iggy_common::{ }; use std::fmt::Debug; use std::sync::Arc; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering; use std::sync::atomic::{AtomicBool, AtomicU64}; use std::time::{Duration, Instant}; use tokio::sync::Semaphore; use tokio::task::JoinHandle; use tokio::time::{Interval, sleep}; use tracing::{error, info, trace, warn}; -use dashmap::DashMap; -use std::sync::atomic::AtomicUsize; -use std::sync::atomic::Ordering; - -struct shard { - tx: flume::Sender<Vec<IggyMessage>>, - _join_handle: JoinHandle<()>, -} - -impl shard { - fn new(id: usize) -> Self { - let (tx, rx) = flume::bounded::<Vec<IggyMessage>>(10); // todo добавить размер в конфигурацию - let handle = tokio::spawn(async move { - while let Ok(message) = rx.recv_async().await { // todo поменять на match - - } - }); - } -} - -pub trait ErrorCallback: Send + Sync + Debug { - fn call(&self, error: IggyError, messages: Vec<IggyMessage>); -} - -unsafe impl Send for IggyProducer {} -unsafe impl Sync for IggyProducer {} -pub struct IggyProducer { - initialized: bool, +pub(crate) struct ProducerCore { + initialized: AtomicBool, can_send: Arc<AtomicBool>, client: Arc<IggySharedMut<Box<dyn Client>>>, stream_id: Arc<Identifier>, @@ -71,7 +48,6 @@ pub struct IggyProducer { topic_name: String, batch_length: Option<usize>, partitioning: Option<Arc<Partitioning>>, - send_mode: Arc<SendMode>, encryptor: Option<Arc<EncryptorKind>>, partitioner: Option<Arc<dyn Partitioner>>, linger_time_micros: u64, @@ -92,79 +68,14 @@ pub struct IggyProducer { sender: Option<Arc<flume::Sender<Vec<IggyMessage>>>>, error_callback: Option<Arc<dyn ErrorCallback>>, shard_number: usize, - // todo добавить ShardStrategy } -impl IggyProducer { - #[allow(clippy::too_many_arguments)] - pub(crate) fn new( - client: IggySharedMut<Box<dyn Client>>, - stream: Identifier, - stream_name: String, - topic: Identifier, - topic_name: String, - batch_length: Option<usize>, - partitioning: Option<Partitioning>, - encryptor: Option<Arc<EncryptorKind>>, - partitioner: Option<Arc<dyn Partitioner>>, - linger_time: Option<IggyDuration>, - create_stream_if_not_exists: bool, - create_topic_if_not_exists: bool, - topic_partitions_count: u32, - topic_replication_factor: Option<u8>, - topic_message_expiry: IggyExpiry, - topic_max_size: MaxTopicSize, - send_retries_count: Option<u32>, - send_retries_interval: Option<IggyDuration>, - send_mode: SendMode, - error_callback: Option<Arc<dyn ErrorCallback>>, - ) -> Self { - Self { - initialized: false, - client: Arc::new(client), - can_send: Arc::new(AtomicBool::new(true)), - stream_id: Arc::new(stream), - stream_name, - topic_id: Arc::new(topic), - topic_name, - batch_length, - partitioning: partitioning.map(Arc::new), - send_mode: Arc::new(send_mode), - encryptor, - partitioner, - linger_time_micros: linger_time.map_or(0, |i| i.as_micros()), - create_stream_if_not_exists, - create_topic_if_not_exists, - topic_partitions_count, - topic_replication_factor, - topic_message_expiry, - topic_max_size, - default_partitioning: Arc::new(Partitioning::balanced()), - can_send_immediately: linger_time.is_none(), - last_sent_at: Arc::new(AtomicU64::new(0)), - send_retries_count, - send_retries_interval, - _join_handle: None, - sema: Arc::new(Semaphore::new(10)), - sender: None, - error_callback, - shard_number: default_shard_count(), - } - } - - pub fn stream(&self) -> &Identifier { - &self.stream_id - } - - pub fn topic(&self) -> &Identifier { - &self.topic_id - } - - /// Initializes the producer by subscribing to diagnostic events, creating the stream and topic if they do not exist etc. - /// - /// Note: This method must be invoked before producing messages. - pub async fn init(&mut self) -> Result<(), IggyError> { - if self.initialized { +impl ProducerCore { + pub async fn init(&self) -> Result<(), IggyError> { + if self.initialized.compare_exchange( + false, true, + Ordering::SeqCst, Ordering::SeqCst, + ).is_err() { return Ok(()); } @@ -222,85 +133,49 @@ impl IggyProducer { .await?; } - let (tx, rx) = flume::bounded::<Vec<IggyMessage>>(0); // todo задать какое-то значение - let stream_id = self.stream_id.clone(); - let topic_id = self.topic_id.clone(); - let partitioning = self.partitioning.clone(); - let default_partitioning = self.default_partitioning.clone(); - let partitioner = self.partitioner.clone(); - let client = self.client.clone(); - let send_retries_count = self.send_retries_count.clone(); - let send_retries_interval = self.send_retries_interval.clone(); - let can_send = self.can_send.clone(); - let send_batches = AtomicUsize::new(0); - let num_shard = self.shard_number; - // let sema = self.sema.clone(); - - let handle = tokio::spawn(async move { - while let Ok(mut batch) = rx.recv_async().await { - // let sema = sema.clone(); - // let partitioner = partitioner.clone(); - // let stream_id = stream_id.clone(); - // let topic_id = topic_id.clone(); - // let partitioning = partitioning.clone(); - // let default_partitioning = default_partitioning.clone(); - // let client = client.clone(); - // let can_send = can_send.clone(); - - // if round_robin - let shard_ix = send_batches.fetch_add(1, Ordering::SeqCst) % num_shard; - - - let partitioning = get_partitioning( - &partitioner, - &stream_id, - &topic_id, - &batch, - partitioning.clone(), - &partitioning, - default_partitioning.clone(), - ) - .unwrap(); - try_send_messages( - client.clone(), - send_retries_count, - send_retries_interval, - can_send.clone(), - &stream_id, - &topic_id, - &partitioning, - &mut batch, - ) - .await - .unwrap(); - // tokio::spawn(async move { - // let _permit = sema.acquire().await.unwrap(); - // let partitioning = get_partitioning( - // &partitioner, - // &stream_id, - // &topic_id, - // &batch, - // partitioning.clone(), - // &partitioning, - // default_partitioning.clone(), - // ) - // .unwrap(); - // try_send_messages( - // client.clone(), - // send_retries_count, - // send_retries_interval, - // can_send.clone(), - // &stream_id, - // &topic_id, - // &partitioning, - // &mut batch, - // ).await.unwrap(); - // }); - } - }); - self._join_handle = Some(handle); - self.sender = Some(Arc::new(tx)); - self.initialized = true; + // let (tx, rx) = flume::bounded::<Vec<IggyMessage>>(0); // todo задать какое-то значение + // let stream_id = self.stream_id.clone(); + // let topic_id = self.topic_id.clone(); + // let partitioning = self.partitioning.clone(); + // let default_partitioning = self.default_partitioning.clone(); + // let partitioner = self.partitioner.clone(); + // let client = self.client.clone(); + // let send_retries_count = self.send_retries_count.clone(); + // let send_retries_interval = self.send_retries_interval.clone(); + // let send_batches = AtomicUsize::new(0); + // let num_shard = self.shard_number; + // let can_send = self.can_send.clone(); + + // let handle = tokio::spawn(async move { + // while let Ok(mut batch) = rx.recv_async().await { + // // if round_robin + // let shard_ix = send_batches.fetch_add(1, Ordering::SeqCst) % num_shard; + // let partitioning = get_partitioning( + // &partitioner, + // &stream_id, + // &topic_id, + // &batch, + // partitioning.clone(), + // &partitioning, + // default_partitioning.clone(), + // ) + // .unwrap(); + // try_send_messages( + // client.clone(), + // send_retries_count, + // send_retries_interval, + // can_send.clone(), + // &stream_id, + // &topic_id, + // &partitioning, + // &mut batch, + // ) + // .await + // .unwrap(); + // } + // }); + // self._join_handle = Some(handle); + // self.sender = Some(Arc::new(tx)); info!( "Producer has been initialized for stream: {} and topic: {}.", self.stream_id.clone(), @@ -346,214 +221,184 @@ impl IggyProducer { }); } - pub async fn send(&self, messages: Vec<IggyMessage>) -> Result<(), IggyError> { - if messages.is_empty() { - trace!("No messages to send."); + async fn send_internal( + &self, + stream: &Identifier, + topic: &Identifier, + mut msgs: Vec<IggyMessage>, + partitioning: Option<Arc<Partitioning>>, + ) -> Result<(), IggyError> { + if msgs.is_empty() { return Ok(()); } - if self.can_send_immediately { - return self - .send_immediately(&self.stream_id, &self.topic_id, messages, None) - .await; - } + self.encrypt_messages(&mut msgs)?; - self.send_buffered( - self.stream_id.clone(), - self.topic_id.clone(), - messages, - None, - ) - .await - } + let part = self.get_partitioning(stream, topic, &msgs, partitioning)?; - // todod добавить канал для считывания - pub async fn send_async(&self, messages: Vec<IggyMessage>) { - let sender = self.sender.clone(); - sender.unwrap().send_async(messages).await.unwrap(); - } + let max = self.batch_length.unwrap_or(MAX_BATCH_LENGTH); - pub async fn send_one(&self, message: IggyMessage) -> Result<(), IggyError> { - self.send(vec![message]).await + if self.can_send_immediately && self.linger_time_micros > 0 { + Self::wait_before_sending(self.linger_time_micros, self.last_sent_at.load(ORDERING)) + .await; + } + + for chunk in msgs.chunks_mut(max) { + self.last_sent_at + .store(IggyTimestamp::now().into(), ORDERING); + self.try_send_messages(stream, topic, &part, chunk).await?; + } + Ok(()) } - pub async fn send_with_partitioning( + async fn try_send_messages( &self, - messages: Vec<IggyMessage>, - partitioning: Option<Arc<Partitioning>>, + stream: &Identifier, + topic: &Identifier, + partitioning: &Arc<Partitioning>, + messages: &mut [IggyMessage], ) -> Result<(), IggyError> { - if messages.is_empty() { - trace!("No messages to send."); - return Ok(()); - } + let client = self.client.read().await; + let Some(max_retries) = self.send_retries_count else { + return client + .send_messages(stream, topic, partitioning, messages) + .await; + }; - if self.can_send_immediately { - return self - .send_immediately(&self.stream_id, &self.topic_id, messages, partitioning) + if max_retries == 0 { + return client + .send_messages(stream, topic, partitioning, messages) .await; } - self.send_buffered( - self.stream_id.clone(), - self.topic_id.clone(), - messages, + let mut timer = if let Some(interval) = self.send_retries_interval { + let mut timer = tokio::time::interval(interval.get_duration()); + timer.tick().await; + Some(timer) + } else { + None + }; + + self.wait_until_connected(max_retries, stream, topic, &mut timer) + .await?; + self.send_with_retries( + max_retries, + stream, + topic, partitioning, + messages, + &mut timer, ) .await } - pub async fn send_to( + async fn wait_until_connected( &self, - stream: Arc<Identifier>, - topic: Arc<Identifier>, - messages: Vec<IggyMessage>, - partitioning: Option<Arc<Partitioning>>, + max_retries: u32, + stream: &Identifier, + topic: &Identifier, + timer: &mut Option<Interval>, ) -> Result<(), IggyError> { - if messages.is_empty() { - trace!("No messages to send."); - return Ok(()); - } + let mut retries = 0; + while !self.can_send.load(ORDERING) { + retries += 1; + if retries > max_retries { + error!( + "Failed to send messages to topic: {topic}, stream: {stream} \ + after {max_retries} retries. Client is disconnected." + ); + return Err(IggyError::CannotSendMessagesDueToClientDisconnection); + } - if self.can_send_immediately { - return self - .send_immediately(&self.stream_id, &self.topic_id, messages, partitioning) - .await; - } + error!( + "Trying to send messages to topic: {topic}, stream: {stream} \ + but the client is disconnected. Retrying {retries}/{max_retries}..." + ); - self.send_buffered(stream, topic, messages, partitioning) - .await + if let Some(timer) = timer.as_mut() { + trace!( + "Waiting for the next retry to send messages to topic: {topic}, \ + stream: {stream} for disconnected client..." + ); + timer.tick().await; + } + } + Ok(()) } - // TODO add batch_size - async fn send_buffered( + async fn send_with_retries( &self, - stream: Arc<Identifier>, - topic: Arc<Identifier>, - mut messages: Vec<IggyMessage>, - partitioning: Option<Arc<Partitioning>>, + max_retries: u32, + stream: &Identifier, + topic: &Identifier, + partitioning: &Arc<Partitioning>, + messages: &mut [IggyMessage], + timer: &mut Option<Interval>, ) -> Result<(), IggyError> { - self.encrypt_messages(&mut messages)?; - let default_partitioning = self.default_partitioning.clone(); - let partitioner = self.partitioner.clone(); - let partitioning = get_partitioning( - &partitioner, - &stream, - &topic, - &messages, - partitioning, - &self.partitioning, - default_partitioning, - )?; - let batch_length = self.batch_length.unwrap_or(MAX_BATCH_LENGTH); - let batches = messages.chunks_mut(batch_length); - let mut current_batch = 1; - let batches_count = batches.len(); - for batch in batches { - if self.linger_time_micros > 0 { - Self::wait_before_sending( - self.linger_time_micros, - self.last_sent_at.load(ORDERING), - ) - .await; - } + let client = self.client.read().await; + let mut retries = 0; + loop { + match client + .send_messages(stream, topic, partitioning, messages) + .await + { + Ok(_) => return Ok(()), + Err(error) => { + retries += 1; + if retries > max_retries { + error!( + "Failed to send messages to topic: {topic}, stream: {stream} \ + after {max_retries} retries. {error}." + ); + return Err(error); + } - let messages_count = batch.len(); - trace!( - "Sending {messages_count} messages ({current_batch}/{batches_count} batch(es))..." - ); - self.last_sent_at - .store(IggyTimestamp::now().into(), ORDERING); + error!( + "Failed to send messages to topic: {topic}, stream: {stream}. \ + {error} Retrying {retries}/{max_retries}..." + ); - let client = self.client.clone(); - let send_retries_count = self.send_retries_count.clone(); - let send_retries_interval = self.send_retries_interval.clone(); - let can_send = self.can_send.clone(); - - let sender = self.sender.clone(); - let send_mode = self.send_mode.clone(); - try_send_messages_new( - BackpressureMode::Block, - sender, - self.error_callback.clone(), - send_mode, - client, - send_retries_count, - send_retries_interval, - can_send, - &self.stream_id, - &self.topic_id, - &partitioning, - batch, - ) - .await?; + if let Some(t) = timer.as_mut() { + trace!( + "Waiting for the next retry to send messages to topic: {topic}, \ + stream: {stream}..." + ); + t.tick().await; + } + } + } + } + } - trace!("Sent {messages_count} messages ({current_batch}/{batches_count} batch(es))."); - current_batch += 1; + fn encrypt_messages(&self, messages: &mut [IggyMessage]) -> Result<(), IggyError> { + if let Some(encryptor) = &self.encryptor { + for message in messages { + message.payload = Bytes::from(encryptor.encrypt(&message.payload)?); + message.header.payload_length = message.payload.len() as u32; + } } Ok(()) } - async fn send_immediately( + fn get_partitioning( &self, stream: &Identifier, topic: &Identifier, - mut messages: Vec<IggyMessage>, + messages: &[IggyMessage], partitioning: Option<Arc<Partitioning>>, - ) -> Result<(), IggyError> { - trace!("No batch size specified, sending messages immediately."); - self.encrypt_messages(&mut messages)?; - let default_partitioning = self.default_partitioning.clone(); - let partitioner = self.partitioner.clone(); - let client = self.client.clone(); - let send_retries_count = self.send_retries_count.clone(); - let send_retries_interval = self.send_retries_interval.clone(); - let can_send = self.can_send.clone(); - - let partitioning = get_partitioning( - &partitioner, - &stream, - &topic, - &messages, - partitioning, - &self.partitioning, - default_partitioning, - )?; - let batch_length = self.batch_length.unwrap_or(MAX_BATCH_LENGTH); - if messages.len() <= batch_length { - self.last_sent_at - .store(IggyTimestamp::now().into(), ORDERING); - try_send_messages( - client.clone(), - send_retries_count, - send_retries_interval, - can_send, - stream, - topic, - &partitioning, - &mut messages, - ) - .await?; - return Ok(()); + ) -> Result<Arc<Partitioning>, IggyError> { + if let Some(partitioner) = &self.partitioner { + trace!("Calculating partition id using custom partitioner."); + let partition_id = partitioner.calculate_partition_id(stream, topic, messages)?; + Ok(Arc::new(Partitioning::partition_id(partition_id))) + } else { + trace!("Using the provided partitioning."); + Ok(partitioning.unwrap_or_else(|| { + self.partitioning + .clone() + .unwrap_or_else(|| self.default_partitioning.clone()) + })) } - - for batch in messages.chunks_mut(batch_length) { - let client = client.clone(); - let can_send = can_send.clone(); - self.last_sent_at - .store(IggyTimestamp::now().into(), ORDERING); - try_send_messages( - client, - send_retries_count, - send_retries_interval, - can_send, - stream, - topic, - &partitioning, - batch, - ) - .await?; - } - Ok(()) } async fn wait_before_sending(interval: u64, last_sent_at: u64) { @@ -574,15 +419,160 @@ impl IggyProducer { ); sleep(Duration::from_micros(remaining)).await; } +} + + +pub trait ErrorCallback: Send + Sync + Debug { + fn call(&self, error: IggyError, messages: Vec<IggyMessage>); +} + +unsafe impl Send for IggyProducer {} +unsafe impl Sync for IggyProducer {} + +pub struct IggyProducer { + core: Arc<ProducerCore>, + send_mode: SendMode, +} + +impl IggyProducer { + #[allow(clippy::too_many_arguments)] + pub(crate) fn new( + client: IggySharedMut<Box<dyn Client>>, + stream: Identifier, + stream_name: String, + topic: Identifier, + topic_name: String, + batch_length: Option<usize>, + partitioning: Option<Partitioning>, + encryptor: Option<Arc<EncryptorKind>>, + partitioner: Option<Arc<dyn Partitioner>>, + linger_time: Option<IggyDuration>, + create_stream_if_not_exists: bool, + create_topic_if_not_exists: bool, + topic_partitions_count: u32, + topic_replication_factor: Option<u8>, + topic_message_expiry: IggyExpiry, + topic_max_size: MaxTopicSize, + send_retries_count: Option<u32>, + send_retries_interval: Option<IggyDuration>, + send_mode: SendMode, + error_callback: Option<Arc<dyn ErrorCallback>>, + ) -> Self { + let core = Arc::new(ProducerCore { + initialized: AtomicBool::new(true), + client: Arc::new(client), + can_send: Arc::new(AtomicBool::new(true)), + stream_id: Arc::new(stream), + stream_name, + topic_id: Arc::new(topic), + topic_name, + batch_length, + partitioning: partitioning.map(Arc::new), + encryptor, + partitioner, + linger_time_micros: linger_time.map_or(0, |i| i.as_micros()), + create_stream_if_not_exists, + create_topic_if_not_exists, + topic_partitions_count, + topic_replication_factor, + topic_message_expiry, + topic_max_size, + default_partitioning: Arc::new(Partitioning::balanced()), + can_send_immediately: linger_time.is_none(), + last_sent_at: Arc::new(AtomicU64::new(0)), + send_retries_count, + send_retries_interval, + _join_handle: None, + sema: Arc::new(Semaphore::new(10)), + sender: None, + error_callback, + shard_number: default_shard_count(), + }); + // todo поставить количество шардов по-умолчанию = num_cpu + let num_shards = default_shard_count(); // todo потом заменмить на норм значепние + let config = BackgroundConfig{ + max_in_flight: 4, + in_flight_timeout: None, + batch_size: None, + failure_mode: BackpressureMode::Block, + }; + let dispatcher = Dispatcher::new(core.clone(), num_shards, config); + + + Self { + core, + send_mode, + } + } + + pub fn stream(&self) -> &Identifier { + &self.core.stream_id + } + + pub fn topic(&self) -> &Identifier { + &self.core.topic_id + } + + /// Initializes the producer by subscribing to diagnostic events, creating the stream and topic if they do not exist etc. + /// + /// Note: This method must be invoked before producing messages. + pub async fn init(&mut self) -> Result<(), IggyError> { + self.core.init().await + } + + pub async fn send(&self, messages: Vec<IggyMessage>) -> Result<(), IggyError> { + if messages.is_empty() { + trace!("No messages to send."); + return Ok(()); + } + + match &self.send_mode { + SendMode::Sync => { + let stream_id = &self.core.stream_id; + let topic_id = &self.core.topic_id; + self.core.send_internal(stream_id, topic_id, messages, None).await - fn encrypt_messages(&self, messages: &mut [IggyMessage]) -> Result<(), IggyError> { - if let Some(encryptor) = &self.encryptor { - for message in messages { - message.payload = Bytes::from(encryptor.encrypt(&message.payload)?); - message.header.payload_length = message.payload.len() as u32; } + // SendMode::Background(cfg) => { + + // } } - Ok(()) + + } + + pub async fn send_one(&self, message: IggyMessage) -> Result<(), IggyError> { + self.send(vec![message]).await + } + + pub async fn send_with_partitioning( + &self, + messages: Vec<IggyMessage>, + partitioning: Option<Arc<Partitioning>>, + ) -> Result<(), IggyError> { + if messages.is_empty() { + trace!("No messages to send."); + return Ok(()); + } + + let stream_id = &self.core.stream_id; + let topic_id = &self.core.topic_id; + + self.core.send_internal(stream_id, topic_id, messages, partitioning).await + } + + pub async fn send_to( + &self, + stream: Arc<Identifier>, + topic: Arc<Identifier>, + messages: Vec<IggyMessage>, + partitioning: Option<Arc<Partitioning>>, + ) -> Result<(), IggyError> { + if messages.is_empty() { + trace!("No messages to send."); + return Ok(()); + } + + self.core.send_internal(&stream, &topic, messages, partitioning).await } } diff --git a/core/sdk/src/clients/send_mode.rs b/core/sdk/src/clients/send_mode.rs index 98b3abef..a42ed2e8 100644 --- a/core/sdk/src/clients/send_mode.rs +++ b/core/sdk/src/clients/send_mode.rs @@ -1,4 +1,10 @@ -use iggy_common::IggyDuration; +use std::{path::Display, sync::{atomic::{AtomicUsize, Ordering}, Arc}}; + +use iggy_common::{IggyDuration, IggyMessage}; +use tokio::task::JoinHandle; +use dashmap::DashMap; + +use super::producer::ProducerCore; #[derive(Debug, Clone, Default)] pub enum SendMode { @@ -25,3 +31,54 @@ pub struct BackgroundConfig { pub batch_size: Option<usize>, pub failure_mode: BackpressureMode, } + +struct shard { + core: Arc<ProducerCore>, + tx: flume::Sender<Vec<IggyMessage>>, + _join_handle: JoinHandle<()>, +} + +impl shard { + fn new(id: usize, core: Arc<ProducerCore>) -> Self { + let (tx, rx) = flume::bounded::<Vec<IggyMessage>>(10); // todo добавить размер в конфигурацию + let core = core.clone(); + let handle = tokio::spawn(async move { + while let Ok(message) = rx.recv_async().await { // todo поменять на match + core.send_internal(); + } + }); + Self { core, tx, _join_handle: handle } + } + + async fn send(&self, messages: Vec<IggyMessage>) -> Result<(), flume::SendError<Vec<IggyMessage>>> { + self.tx.send(messages) + } +} + +pub struct Dispatcher { + config: BackgroundConfig, + sender: flume::Sender<Vec<IggyMessage>>, + _join_handle: JoinHandle<()>, +} + +impl Dispatcher { + pub async fn new(core: Arc<ProducerCore>, num_shards: usize, config: BackgroundConfig) -> Self { + let shards = DashMap::with_capacity(num_shards); + for i in 0..num_shards { + shards.insert(i, shard::new(i, core.clone())); + } + + let (tx, rx) = flume::bounded::<Vec<IggyMessage>>(0); + let sent = AtomicUsize::new(0); + let handle = tokio::spawn(async move { + loop { + if let Ok(msg) = rx.recv_async().await { + let ix = sent.fetch_add(1, Ordering::SeqCst) % num_shards; + let shard = shards.get(&ix).unwrap(); + shard.send(msg).await.unwrap(); + } + } + }); + Self{ config, sender: tx, _join_handle: handle } + } +}
