This is an automated email from the ASF dual-hosted git repository.

bashirbekov pushed a commit to branch feat/add-background-send
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit 08854aaa46dd6ad0ef8efbacbc69a9676fb7237a
Author: haze518 <[email protected]>
AuthorDate: Wed May 28 06:21:37 2025 +0600

    del
---
 core/sdk/src/clients/producer.rs  | 700 +++++++++++++++++++-------------------
 core/sdk/src/clients/send_mode.rs |  59 +++-
 2 files changed, 403 insertions(+), 356 deletions(-)

diff --git a/core/sdk/src/clients/producer.rs b/core/sdk/src/clients/producer.rs
index 9ba3026d..a8c106a9 100644
--- a/core/sdk/src/clients/producer.rs
+++ b/core/sdk/src/clients/producer.rs
@@ -1,4 +1,4 @@
-use super::send_mode::{BackpressureMode, SendMode};
+use super::send_mode::{BackgroundConfig, BackpressureMode, Dispatcher, 
SendMode};
 /* Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -19,6 +19,7 @@ use super::send_mode::{BackpressureMode, SendMode};
 use super::{MAX_BATCH_LENGTH, ORDERING};
 
 use bytes::Bytes;
+use dashmap::DashMap;
 use futures_util::StreamExt;
 use iggy_binary_protocol::Client;
 use iggy_common::locking::{IggySharedMut, IggySharedMutFn};
@@ -28,41 +29,17 @@ use iggy_common::{
 };
 use std::fmt::Debug;
 use std::sync::Arc;
+use std::sync::atomic::AtomicUsize;
+use std::sync::atomic::Ordering;
 use std::sync::atomic::{AtomicBool, AtomicU64};
 use std::time::{Duration, Instant};
 use tokio::sync::Semaphore;
 use tokio::task::JoinHandle;
 use tokio::time::{Interval, sleep};
 use tracing::{error, info, trace, warn};
-use dashmap::DashMap;
-use std::sync::atomic::AtomicUsize;
-use std::sync::atomic::Ordering;
-
-struct shard {
-    tx: flume::Sender<Vec<IggyMessage>>,
-    _join_handle: JoinHandle<()>,
-}
-
-impl shard {
-    fn new(id: usize) -> Self {
-        let (tx, rx) = flume::bounded::<Vec<IggyMessage>>(10); // todo 
добавить размер в конфигурацию
-        let handle = tokio::spawn(async move {
-            while let Ok(message) = rx.recv_async().await { // todo поменять 
на match
-                
-            }
-        });
-    }
-}
-
-pub trait ErrorCallback: Send + Sync + Debug {
-    fn call(&self, error: IggyError, messages: Vec<IggyMessage>);
-}
-
-unsafe impl Send for IggyProducer {}
-unsafe impl Sync for IggyProducer {}
 
-pub struct IggyProducer {
-    initialized: bool,
+pub(crate) struct ProducerCore {
+    initialized: AtomicBool,
     can_send: Arc<AtomicBool>,
     client: Arc<IggySharedMut<Box<dyn Client>>>,
     stream_id: Arc<Identifier>,
@@ -71,7 +48,6 @@ pub struct IggyProducer {
     topic_name: String,
     batch_length: Option<usize>,
     partitioning: Option<Arc<Partitioning>>,
-    send_mode: Arc<SendMode>,
     encryptor: Option<Arc<EncryptorKind>>,
     partitioner: Option<Arc<dyn Partitioner>>,
     linger_time_micros: u64,
@@ -92,79 +68,14 @@ pub struct IggyProducer {
     sender: Option<Arc<flume::Sender<Vec<IggyMessage>>>>,
     error_callback: Option<Arc<dyn ErrorCallback>>,
     shard_number: usize,
-    // todo добавить ShardStrategy
 }
 
-impl IggyProducer {
-    #[allow(clippy::too_many_arguments)]
-    pub(crate) fn new(
-        client: IggySharedMut<Box<dyn Client>>,
-        stream: Identifier,
-        stream_name: String,
-        topic: Identifier,
-        topic_name: String,
-        batch_length: Option<usize>,
-        partitioning: Option<Partitioning>,
-        encryptor: Option<Arc<EncryptorKind>>,
-        partitioner: Option<Arc<dyn Partitioner>>,
-        linger_time: Option<IggyDuration>,
-        create_stream_if_not_exists: bool,
-        create_topic_if_not_exists: bool,
-        topic_partitions_count: u32,
-        topic_replication_factor: Option<u8>,
-        topic_message_expiry: IggyExpiry,
-        topic_max_size: MaxTopicSize,
-        send_retries_count: Option<u32>,
-        send_retries_interval: Option<IggyDuration>,
-        send_mode: SendMode,
-        error_callback: Option<Arc<dyn ErrorCallback>>,
-    ) -> Self {
-        Self {
-            initialized: false,
-            client: Arc::new(client),
-            can_send: Arc::new(AtomicBool::new(true)),
-            stream_id: Arc::new(stream),
-            stream_name,
-            topic_id: Arc::new(topic),
-            topic_name,
-            batch_length,
-            partitioning: partitioning.map(Arc::new),
-            send_mode: Arc::new(send_mode),
-            encryptor,
-            partitioner,
-            linger_time_micros: linger_time.map_or(0, |i| i.as_micros()),
-            create_stream_if_not_exists,
-            create_topic_if_not_exists,
-            topic_partitions_count,
-            topic_replication_factor,
-            topic_message_expiry,
-            topic_max_size,
-            default_partitioning: Arc::new(Partitioning::balanced()),
-            can_send_immediately: linger_time.is_none(),
-            last_sent_at: Arc::new(AtomicU64::new(0)),
-            send_retries_count,
-            send_retries_interval,
-            _join_handle: None,
-            sema: Arc::new(Semaphore::new(10)),
-            sender: None,
-            error_callback,
-            shard_number: default_shard_count(),
-        }
-    }
-
-    pub fn stream(&self) -> &Identifier {
-        &self.stream_id
-    }
-
-    pub fn topic(&self) -> &Identifier {
-        &self.topic_id
-    }
-
-    /// Initializes the producer by subscribing to diagnostic events, creating 
the stream and topic if they do not exist etc.
-    ///
-    /// Note: This method must be invoked before producing messages.
-    pub async fn init(&mut self) -> Result<(), IggyError> {
-        if self.initialized {
+impl ProducerCore {
+    pub async fn init(&self) -> Result<(), IggyError> {
+        if self.initialized.compare_exchange(
+            false, true,
+            Ordering::SeqCst, Ordering::SeqCst,
+        ).is_err() {
             return Ok(());
         }
 
@@ -222,85 +133,49 @@ impl IggyProducer {
                 .await?;
         }
 
-        let (tx, rx) = flume::bounded::<Vec<IggyMessage>>(0); // todo задать 
какое-то значение
-        let stream_id = self.stream_id.clone();
-        let topic_id = self.topic_id.clone();
-        let partitioning = self.partitioning.clone();
-        let default_partitioning = self.default_partitioning.clone();
-        let partitioner = self.partitioner.clone();
-        let client = self.client.clone();
-        let send_retries_count = self.send_retries_count.clone();
-        let send_retries_interval = self.send_retries_interval.clone();
-        let can_send = self.can_send.clone();
-        let send_batches = AtomicUsize::new(0);
-        let num_shard = self.shard_number;
-        // let sema = self.sema.clone();
-
-        let handle = tokio::spawn(async move {
-            while let Ok(mut batch) = rx.recv_async().await {
-                // let sema = sema.clone();
-                // let partitioner = partitioner.clone();
-                // let stream_id = stream_id.clone();
-                // let topic_id = topic_id.clone();
-                // let partitioning = partitioning.clone();
-                // let default_partitioning = default_partitioning.clone();
-                // let client = client.clone();
-                // let can_send = can_send.clone();
-
-                // if round_robin
-                let shard_ix = send_batches.fetch_add(1, Ordering::SeqCst) % 
num_shard;
-
-
-                let partitioning = get_partitioning(
-                    &partitioner,
-                    &stream_id,
-                    &topic_id,
-                    &batch,
-                    partitioning.clone(),
-                    &partitioning,
-                    default_partitioning.clone(),
-                )
-                .unwrap();
-                try_send_messages(
-                    client.clone(),
-                    send_retries_count,
-                    send_retries_interval,
-                    can_send.clone(),
-                    &stream_id,
-                    &topic_id,
-                    &partitioning,
-                    &mut batch,
-                )
-                .await
-                .unwrap();
-                // tokio::spawn(async move {
-                //     let _permit = sema.acquire().await.unwrap();
-                //     let partitioning = get_partitioning(
-                //         &partitioner,
-                //         &stream_id,
-                //         &topic_id,
-                //         &batch,
-                //         partitioning.clone(),
-                //         &partitioning,
-                //         default_partitioning.clone(),
-                //     )
-                //     .unwrap();
-                //     try_send_messages(
-                //         client.clone(),
-                //         send_retries_count,
-                //         send_retries_interval,
-                //         can_send.clone(),
-                //         &stream_id,
-                //         &topic_id,
-                //         &partitioning,
-                //         &mut batch,
-                //     ).await.unwrap();
-                // });
-            }
-        });
-        self._join_handle = Some(handle);
-        self.sender = Some(Arc::new(tx));
-        self.initialized = true;
+        // let (tx, rx) = flume::bounded::<Vec<IggyMessage>>(0); // todo 
задать какое-то значение
+        // let stream_id = self.stream_id.clone();
+        // let topic_id = self.topic_id.clone();
+        // let partitioning = self.partitioning.clone();
+        // let default_partitioning = self.default_partitioning.clone();
+        // let partitioner = self.partitioner.clone();
+        // let client = self.client.clone();
+        // let send_retries_count = self.send_retries_count.clone();
+        // let send_retries_interval = self.send_retries_interval.clone();
+        // let send_batches = AtomicUsize::new(0);
+        // let num_shard = self.shard_number;
+        // let can_send = self.can_send.clone();
+
+        // let handle = tokio::spawn(async move {
+        //     while let Ok(mut batch) = rx.recv_async().await {
+        //         // if round_robin
+        //         let shard_ix = send_batches.fetch_add(1, Ordering::SeqCst) 
% num_shard;
+        //         let partitioning = get_partitioning(
+        //             &partitioner,
+        //             &stream_id,
+        //             &topic_id,
+        //             &batch,
+        //             partitioning.clone(),
+        //             &partitioning,
+        //             default_partitioning.clone(),
+        //         )
+        //         .unwrap();
+        //         try_send_messages(
+        //             client.clone(),
+        //             send_retries_count,
+        //             send_retries_interval,
+        //             can_send.clone(),
+        //             &stream_id,
+        //             &topic_id,
+        //             &partitioning,
+        //             &mut batch,
+        //         )
+        //         .await
+        //         .unwrap();
+        //     }
+        // });
+        // self._join_handle = Some(handle);
+        // self.sender = Some(Arc::new(tx));
         info!(
             "Producer has been initialized for stream: {} and topic: {}.",
             self.stream_id.clone(),
@@ -346,214 +221,184 @@ impl IggyProducer {
         });
     }
 
-    pub async fn send(&self, messages: Vec<IggyMessage>) -> Result<(), 
IggyError> {
-        if messages.is_empty() {
-            trace!("No messages to send.");
+    async fn send_internal(
+        &self,
+        stream: &Identifier,
+        topic: &Identifier,
+        mut msgs: Vec<IggyMessage>,
+        partitioning: Option<Arc<Partitioning>>,
+    ) -> Result<(), IggyError> {
+        if msgs.is_empty() {
             return Ok(());
         }
 
-        if self.can_send_immediately {
-            return self
-                .send_immediately(&self.stream_id, &self.topic_id, messages, 
None)
-                .await;
-        }
+        self.encrypt_messages(&mut msgs)?;
 
-        self.send_buffered(
-            self.stream_id.clone(),
-            self.topic_id.clone(),
-            messages,
-            None,
-        )
-        .await
-    }
+        let part = self.get_partitioning(stream, topic, &msgs, partitioning)?;
 
-    // todod добавить канал для считывания
-    pub async fn send_async(&self, messages: Vec<IggyMessage>) {
-        let sender = self.sender.clone();
-        sender.unwrap().send_async(messages).await.unwrap();
-    }
+        let max = self.batch_length.unwrap_or(MAX_BATCH_LENGTH);
 
-    pub async fn send_one(&self, message: IggyMessage) -> Result<(), 
IggyError> {
-        self.send(vec![message]).await
+        if self.can_send_immediately && self.linger_time_micros > 0 {
+            Self::wait_before_sending(self.linger_time_micros, 
self.last_sent_at.load(ORDERING))
+                .await;
+        }
+
+        for chunk in msgs.chunks_mut(max) {
+            self.last_sent_at
+                .store(IggyTimestamp::now().into(), ORDERING);
+            self.try_send_messages(stream, topic, &part, chunk).await?;
+        }
+        Ok(())
     }
 
-    pub async fn send_with_partitioning(
+    async fn try_send_messages(
         &self,
-        messages: Vec<IggyMessage>,
-        partitioning: Option<Arc<Partitioning>>,
+        stream: &Identifier,
+        topic: &Identifier,
+        partitioning: &Arc<Partitioning>,
+        messages: &mut [IggyMessage],
     ) -> Result<(), IggyError> {
-        if messages.is_empty() {
-            trace!("No messages to send.");
-            return Ok(());
-        }
+        let client = self.client.read().await;
+        let Some(max_retries) = self.send_retries_count else {
+            return client
+                .send_messages(stream, topic, partitioning, messages)
+                .await;
+        };
 
-        if self.can_send_immediately {
-            return self
-                .send_immediately(&self.stream_id, &self.topic_id, messages, 
partitioning)
+        if max_retries == 0 {
+            return client
+                .send_messages(stream, topic, partitioning, messages)
                 .await;
         }
 
-        self.send_buffered(
-            self.stream_id.clone(),
-            self.topic_id.clone(),
-            messages,
+        let mut timer = if let Some(interval) = self.send_retries_interval {
+            let mut timer = tokio::time::interval(interval.get_duration());
+            timer.tick().await;
+            Some(timer)
+        } else {
+            None
+        };
+
+        self.wait_until_connected(max_retries, stream, topic, &mut timer)
+            .await?;
+        self.send_with_retries(
+            max_retries,
+            stream,
+            topic,
             partitioning,
+            messages,
+            &mut timer,
         )
         .await
     }
 
-    pub async fn send_to(
+    async fn wait_until_connected(
         &self,
-        stream: Arc<Identifier>,
-        topic: Arc<Identifier>,
-        messages: Vec<IggyMessage>,
-        partitioning: Option<Arc<Partitioning>>,
+        max_retries: u32,
+        stream: &Identifier,
+        topic: &Identifier,
+        timer: &mut Option<Interval>,
     ) -> Result<(), IggyError> {
-        if messages.is_empty() {
-            trace!("No messages to send.");
-            return Ok(());
-        }
+        let mut retries = 0;
+        while !self.can_send.load(ORDERING) {
+            retries += 1;
+            if retries > max_retries {
+                error!(
+                    "Failed to send messages to topic: {topic}, stream: 
{stream} \
+                     after {max_retries} retries. Client is disconnected."
+                );
+                return 
Err(IggyError::CannotSendMessagesDueToClientDisconnection);
+            }
 
-        if self.can_send_immediately {
-            return self
-                .send_immediately(&self.stream_id, &self.topic_id, messages, 
partitioning)
-                .await;
-        }
+            error!(
+                "Trying to send messages to topic: {topic}, stream: {stream} \
+                 but the client is disconnected. Retrying 
{retries}/{max_retries}..."
+            );
 
-        self.send_buffered(stream, topic, messages, partitioning)
-            .await
+            if let Some(timer) = timer.as_mut() {
+                trace!(
+                    "Waiting for the next retry to send messages to topic: 
{topic}, \
+                     stream: {stream} for disconnected client..."
+                );
+                timer.tick().await;
+            }
+        }
+        Ok(())
     }
 
-    // TODO add batch_size
-    async fn send_buffered(
+    async fn send_with_retries(
         &self,
-        stream: Arc<Identifier>,
-        topic: Arc<Identifier>,
-        mut messages: Vec<IggyMessage>,
-        partitioning: Option<Arc<Partitioning>>,
+        max_retries: u32,
+        stream: &Identifier,
+        topic: &Identifier,
+        partitioning: &Arc<Partitioning>,
+        messages: &mut [IggyMessage],
+        timer: &mut Option<Interval>,
     ) -> Result<(), IggyError> {
-        self.encrypt_messages(&mut messages)?;
-        let default_partitioning = self.default_partitioning.clone();
-        let partitioner = self.partitioner.clone();
-        let partitioning = get_partitioning(
-            &partitioner,
-            &stream,
-            &topic,
-            &messages,
-            partitioning,
-            &self.partitioning,
-            default_partitioning,
-        )?;
-        let batch_length = self.batch_length.unwrap_or(MAX_BATCH_LENGTH);
-        let batches = messages.chunks_mut(batch_length);
-        let mut current_batch = 1;
-        let batches_count = batches.len();
-        for batch in batches {
-            if self.linger_time_micros > 0 {
-                Self::wait_before_sending(
-                    self.linger_time_micros,
-                    self.last_sent_at.load(ORDERING),
-                )
-                .await;
-            }
+        let client = self.client.read().await;
+        let mut retries = 0;
+        loop {
+            match client
+                .send_messages(stream, topic, partitioning, messages)
+                .await
+            {
+                Ok(_) => return Ok(()),
+                Err(error) => {
+                    retries += 1;
+                    if retries > max_retries {
+                        error!(
+                            "Failed to send messages to topic: {topic}, 
stream: {stream} \
+                             after {max_retries} retries. {error}."
+                        );
+                        return Err(error);
+                    }
 
-            let messages_count = batch.len();
-            trace!(
-                "Sending {messages_count} messages 
({current_batch}/{batches_count} batch(es))..."
-            );
-            self.last_sent_at
-                .store(IggyTimestamp::now().into(), ORDERING);
+                    error!(
+                        "Failed to send messages to topic: {topic}, stream: 
{stream}. \
+                         {error} Retrying {retries}/{max_retries}..."
+                    );
 
-            let client = self.client.clone();
-            let send_retries_count = self.send_retries_count.clone();
-            let send_retries_interval = self.send_retries_interval.clone();
-            let can_send = self.can_send.clone();
-
-            let sender = self.sender.clone();
-            let send_mode = self.send_mode.clone();
-            try_send_messages_new(
-                BackpressureMode::Block,
-                sender,
-                self.error_callback.clone(),
-                send_mode,
-                client,
-                send_retries_count,
-                send_retries_interval,
-                can_send,
-                &self.stream_id,
-                &self.topic_id,
-                &partitioning,
-                batch,
-            )
-            .await?;
+                    if let Some(t) = timer.as_mut() {
+                        trace!(
+                            "Waiting for the next retry to send messages to 
topic: {topic}, \
+                             stream: {stream}..."
+                        );
+                        t.tick().await;
+                    }
+                }
+            }
+        }
+    }
 
-            trace!("Sent {messages_count} messages 
({current_batch}/{batches_count} batch(es)).");
-            current_batch += 1;
+    fn encrypt_messages(&self, messages: &mut [IggyMessage]) -> Result<(), 
IggyError> {
+        if let Some(encryptor) = &self.encryptor {
+            for message in messages {
+                message.payload = 
Bytes::from(encryptor.encrypt(&message.payload)?);
+                message.header.payload_length = message.payload.len() as u32;
+            }
         }
         Ok(())
     }
 
-    async fn send_immediately(
+    fn get_partitioning(
         &self,
         stream: &Identifier,
         topic: &Identifier,
-        mut messages: Vec<IggyMessage>,
+        messages: &[IggyMessage],
         partitioning: Option<Arc<Partitioning>>,
-    ) -> Result<(), IggyError> {
-        trace!("No batch size specified, sending messages immediately.");
-        self.encrypt_messages(&mut messages)?;
-        let default_partitioning = self.default_partitioning.clone();
-        let partitioner = self.partitioner.clone();
-        let client = self.client.clone();
-        let send_retries_count = self.send_retries_count.clone();
-        let send_retries_interval = self.send_retries_interval.clone();
-        let can_send = self.can_send.clone();
-
-        let partitioning = get_partitioning(
-            &partitioner,
-            &stream,
-            &topic,
-            &messages,
-            partitioning,
-            &self.partitioning,
-            default_partitioning,
-        )?;
-        let batch_length = self.batch_length.unwrap_or(MAX_BATCH_LENGTH);
-        if messages.len() <= batch_length {
-            self.last_sent_at
-                .store(IggyTimestamp::now().into(), ORDERING);
-            try_send_messages(
-                client.clone(),
-                send_retries_count,
-                send_retries_interval,
-                can_send,
-                stream,
-                topic,
-                &partitioning,
-                &mut messages,
-            )
-            .await?;
-            return Ok(());
+    ) -> Result<Arc<Partitioning>, IggyError> {
+        if let Some(partitioner) = &self.partitioner {
+            trace!("Calculating partition id using custom partitioner.");
+            let partition_id = partitioner.calculate_partition_id(stream, 
topic, messages)?;
+            Ok(Arc::new(Partitioning::partition_id(partition_id)))
+        } else {
+            trace!("Using the provided partitioning.");
+            Ok(partitioning.unwrap_or_else(|| {
+                self.partitioning
+                    .clone()
+                    .unwrap_or_else(|| self.default_partitioning.clone())
+            }))
         }
-
-        for batch in messages.chunks_mut(batch_length) {
-            let client = client.clone();
-            let can_send = can_send.clone();
-            self.last_sent_at
-                .store(IggyTimestamp::now().into(), ORDERING);
-            try_send_messages(
-                client,
-                send_retries_count,
-                send_retries_interval,
-                can_send,
-                stream,
-                topic,
-                &partitioning,
-                batch,
-            )
-            .await?;
-        }
-        Ok(())
     }
 
     async fn wait_before_sending(interval: u64, last_sent_at: u64) {
@@ -574,15 +419,160 @@ impl IggyProducer {
         );
         sleep(Duration::from_micros(remaining)).await;
     }
+}
+
+
+pub trait ErrorCallback: Send + Sync + Debug {
+    fn call(&self, error: IggyError, messages: Vec<IggyMessage>);
+}
+
+unsafe impl Send for IggyProducer {}
+unsafe impl Sync for IggyProducer {}
+
+pub struct IggyProducer {
+    core: Arc<ProducerCore>,
+    send_mode: SendMode,
+}
+
+impl IggyProducer {
+    #[allow(clippy::too_many_arguments)]
+    pub(crate) fn new(
+        client: IggySharedMut<Box<dyn Client>>,
+        stream: Identifier,
+        stream_name: String,
+        topic: Identifier,
+        topic_name: String,
+        batch_length: Option<usize>,
+        partitioning: Option<Partitioning>,
+        encryptor: Option<Arc<EncryptorKind>>,
+        partitioner: Option<Arc<dyn Partitioner>>,
+        linger_time: Option<IggyDuration>,
+        create_stream_if_not_exists: bool,
+        create_topic_if_not_exists: bool,
+        topic_partitions_count: u32,
+        topic_replication_factor: Option<u8>,
+        topic_message_expiry: IggyExpiry,
+        topic_max_size: MaxTopicSize,
+        send_retries_count: Option<u32>,
+        send_retries_interval: Option<IggyDuration>,
+        send_mode: SendMode,
+        error_callback: Option<Arc<dyn ErrorCallback>>,
+    ) -> Self {
+        let core = Arc::new(ProducerCore {
+                initialized: AtomicBool::new(true),
+                client: Arc::new(client),
+                can_send: Arc::new(AtomicBool::new(true)),
+                stream_id: Arc::new(stream),
+                stream_name,
+                topic_id: Arc::new(topic),
+                topic_name,
+                batch_length,
+                partitioning: partitioning.map(Arc::new),
+                encryptor,
+                partitioner,
+                linger_time_micros: linger_time.map_or(0, |i| i.as_micros()),
+                create_stream_if_not_exists,
+                create_topic_if_not_exists,
+                topic_partitions_count,
+                topic_replication_factor,
+                topic_message_expiry,
+                topic_max_size,
+                default_partitioning: Arc::new(Partitioning::balanced()),
+                can_send_immediately: linger_time.is_none(),
+                last_sent_at: Arc::new(AtomicU64::new(0)),
+                send_retries_count,
+                send_retries_interval,
+                _join_handle: None,
+                sema: Arc::new(Semaphore::new(10)),
+                sender: None,
+                error_callback,
+                shard_number: default_shard_count(),
+            });
+        // todo поставить количество шардов по-умолчанию = num_cpu
+        let num_shards = default_shard_count(); // todo потом заменмить на 
норм значепние
+        let config = BackgroundConfig{
+            max_in_flight: 4,
+            in_flight_timeout: None,
+            batch_size: None,
+            failure_mode: BackpressureMode::Block,
+        };
+        let dispatcher = Dispatcher::new(core.clone(), num_shards, config);
+
+
+        Self {
+            core,
+            send_mode,
+        }
+    }
+
+    pub fn stream(&self) -> &Identifier {
+        &self.core.stream_id
+    }
+
+    pub fn topic(&self) -> &Identifier {
+        &self.core.topic_id
+    }
+
+    /// Initializes the producer by subscribing to diagnostic events, creating 
the stream and topic if they do not exist etc.
+    ///
+    /// Note: This method must be invoked before producing messages.
+    pub async fn init(&mut self) -> Result<(), IggyError> {
+        self.core.init().await
+    }
+
+    pub async fn send(&self, messages: Vec<IggyMessage>) -> Result<(), 
IggyError> {
+        if messages.is_empty() {
+            trace!("No messages to send.");
+            return Ok(());
+        }
+
+        match &self.send_mode {
+            SendMode::Sync => {
+                let stream_id = &self.core.stream_id;
+                let topic_id = &self.core.topic_id;
+                self.core.send_internal(stream_id, topic_id, messages, 
None).await
 
-    fn encrypt_messages(&self, messages: &mut [IggyMessage]) -> Result<(), 
IggyError> {
-        if let Some(encryptor) = &self.encryptor {
-            for message in messages {
-                message.payload = 
Bytes::from(encryptor.encrypt(&message.payload)?);
-                message.header.payload_length = message.payload.len() as u32;
             }
+            // SendMode::Background(cfg) => {
+
+            // }
         }
-        Ok(())
+
+    }
+
+    pub async fn send_one(&self, message: IggyMessage) -> Result<(), 
IggyError> {
+        self.send(vec![message]).await
+    }
+
+    pub async fn send_with_partitioning(
+        &self,
+        messages: Vec<IggyMessage>,
+        partitioning: Option<Arc<Partitioning>>,
+    ) -> Result<(), IggyError> {
+        if messages.is_empty() {
+            trace!("No messages to send.");
+            return Ok(());
+        }
+
+        let stream_id = &self.core.stream_id;
+        let topic_id = &self.core.topic_id;
+
+        self.core.send_internal(stream_id, topic_id, messages, 
partitioning).await
+    }
+
+    pub async fn send_to(
+        &self,
+        stream: Arc<Identifier>,
+        topic: Arc<Identifier>,
+        messages: Vec<IggyMessage>,
+        partitioning: Option<Arc<Partitioning>>,
+    ) -> Result<(), IggyError> {
+        if messages.is_empty() {
+            trace!("No messages to send.");
+            return Ok(());
+        }
+
+        self.core.send_internal(&stream, &topic, messages, partitioning).await
     }
 }
 
diff --git a/core/sdk/src/clients/send_mode.rs 
b/core/sdk/src/clients/send_mode.rs
index 98b3abef..a42ed2e8 100644
--- a/core/sdk/src/clients/send_mode.rs
+++ b/core/sdk/src/clients/send_mode.rs
@@ -1,4 +1,10 @@
-use iggy_common::IggyDuration;
+use std::{path::Display, sync::{atomic::{AtomicUsize, Ordering}, Arc}};
+
+use iggy_common::{IggyDuration, IggyMessage};
+use tokio::task::JoinHandle;
+use dashmap::DashMap;
+
+use super::producer::ProducerCore;
 
 #[derive(Debug, Clone, Default)]
 pub enum SendMode {
@@ -25,3 +31,54 @@ pub struct BackgroundConfig {
     pub batch_size: Option<usize>,
     pub failure_mode: BackpressureMode,
 }
+
+struct shard {
+    core: Arc<ProducerCore>,
+    tx: flume::Sender<Vec<IggyMessage>>,
+    _join_handle: JoinHandle<()>,
+}
+
+impl shard {
+    fn new(id: usize, core: Arc<ProducerCore>) -> Self {
+        let (tx, rx) = flume::bounded::<Vec<IggyMessage>>(10); // todo 
добавить размер в конфигурацию
+        let core = core.clone();
+        let handle = tokio::spawn(async move {
+            while let Ok(message) = rx.recv_async().await { // todo поменять 
на match
+                core.send_internal();
+            }
+        });
+        Self { core, tx, _join_handle: handle }
+    }
+
+    async fn send(&self, messages: Vec<IggyMessage>) -> Result<(), 
flume::SendError<Vec<IggyMessage>>> {
+        self.tx.send(messages)
+    }
+}
+
+pub struct Dispatcher {
+    config: BackgroundConfig,
+    sender: flume::Sender<Vec<IggyMessage>>,
+    _join_handle: JoinHandle<()>,
+}
+
+impl Dispatcher {
+    pub async fn new(core: Arc<ProducerCore>, num_shards: usize, config: 
BackgroundConfig) -> Self {
+        let shards = DashMap::with_capacity(num_shards);
+        for i in 0..num_shards {
+            shards.insert(i, shard::new(i, core.clone()));
+        }
+
+        let (tx, rx) = flume::bounded::<Vec<IggyMessage>>(0);
+        let sent = AtomicUsize::new(0);
+        let handle = tokio::spawn(async move {
+            loop {
+                if let Ok(msg) = rx.recv_async().await {
+                    let ix = sent.fetch_add(1, Ordering::SeqCst) % num_shards;
+                    let shard = shards.get(&ix).unwrap();
+                    shard.send(msg).await.unwrap();
+                }
+            }
+        });
+        Self{ config, sender: tx, _join_handle: handle }
+    }
+}

Reply via email to