This is an automated email from the ASF dual-hosted git repository.

bashirbekov pushed a commit to branch feat/add-background-send
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/feat/add-background-send by 
this push:
     new 1685b879 del
1685b879 is described below

commit 1685b8798cd95f7b1b9db2784866d0e0f79d47ed
Author: haze518 <[email protected]>
AuthorDate: Thu May 29 06:06:08 2025 +0600

    del
---
 core/sdk/src/clients/producer.rs | 133 +++++++++++++++++++++------------------
 1 file changed, 71 insertions(+), 62 deletions(-)

diff --git a/core/sdk/src/clients/producer.rs b/core/sdk/src/clients/producer.rs
index ba2ccc94..d9355358 100644
--- a/core/sdk/src/clients/producer.rs
+++ b/core/sdk/src/clients/producer.rs
@@ -1,4 +1,4 @@
-use super::send_mode::{shardMessage, BackgroundConfig, BackpressureMode, 
Dispatcher, SendMode};
+use super::send_mode::{BackgroundConfig, BackpressureMode, Dispatcher, 
SendMode, shardMessage};
 /* Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -36,6 +36,20 @@ use tokio::task::JoinHandle;
 use tokio::time::{Interval, sleep};
 use tracing::{error, info, trace, warn};
 
+pub trait SendStrategy: Send + Sync + 'static {
+    fn send_batch(
+        &self,
+        stream: &Identifier,
+        topic: &Identifier,
+        msgs: Vec<IggyMessage>,
+        partitioning: Option<Arc<Partitioning>>,
+    ) -> impl Future<Output = Result<(), IggyError>> + Send;
+}
+
+pub trait AsyncSendStrategy: SendStrategy {
+    fn flush(&self) -> Result<(), IggyError>;
+}
+
 pub struct ProducerCore {
     initialized: AtomicBool,
     can_send: Arc<AtomicBool>,
@@ -70,10 +84,11 @@ pub struct ProducerCore {
 
 impl ProducerCore {
     pub async fn init(&self) -> Result<(), IggyError> {
-        if self.initialized.compare_exchange(
-            false, true,
-            Ordering::SeqCst, Ordering::SeqCst,
-        ).is_err() {
+        if self
+            .initialized
+            .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
+            .is_err()
+        {
             return Ok(());
         }
 
@@ -376,7 +391,6 @@ impl ProducerCore {
     }
 }
 
-
 pub trait ErrorCallback: Send + Sync + Debug {
     fn call(&self, error: IggyError, messages: Vec<IggyMessage>);
 }
@@ -415,37 +429,37 @@ impl IggyProducer {
         error_callback: Option<Arc<dyn ErrorCallback>>,
     ) -> Self {
         let core = Arc::new(ProducerCore {
-                initialized: AtomicBool::new(true),
-                client: Arc::new(client),
-                can_send: Arc::new(AtomicBool::new(true)),
-                stream_id: Arc::new(stream),
-                stream_name,
-                topic_id: Arc::new(topic),
-                topic_name,
-                batch_length,
-                partitioning: partitioning.map(Arc::new),
-                encryptor,
-                partitioner,
-                linger_time_micros: linger_time.map_or(0, |i| i.as_micros()),
-                create_stream_if_not_exists,
-                create_topic_if_not_exists,
-                topic_partitions_count,
-                topic_replication_factor,
-                topic_message_expiry,
-                topic_max_size,
-                default_partitioning: Arc::new(Partitioning::balanced()),
-                can_send_immediately: linger_time.is_none(),
-                last_sent_at: Arc::new(AtomicU64::new(0)),
-                send_retries_count,
-                send_retries_interval,
-                _join_handle: None,
-                sema: Arc::new(Semaphore::new(10)),
-                sender: None,
-                error_callback,
-                shard_number: default_shard_count(),
-            });
+            initialized: AtomicBool::new(true),
+            client: Arc::new(client),
+            can_send: Arc::new(AtomicBool::new(true)),
+            stream_id: Arc::new(stream),
+            stream_name,
+            topic_id: Arc::new(topic),
+            topic_name,
+            batch_length,
+            partitioning: partitioning.map(Arc::new),
+            encryptor,
+            partitioner,
+            linger_time_micros: linger_time.map_or(0, |i| i.as_micros()),
+            create_stream_if_not_exists,
+            create_topic_if_not_exists,
+            topic_partitions_count,
+            topic_replication_factor,
+            topic_message_expiry,
+            topic_max_size,
+            default_partitioning: Arc::new(Partitioning::balanced()),
+            can_send_immediately: linger_time.is_none(),
+            last_sent_at: Arc::new(AtomicU64::new(0)),
+            send_retries_count,
+            send_retries_interval,
+            _join_handle: None,
+            sema: Arc::new(Semaphore::new(10)),
+            sender: None,
+            error_callback,
+            shard_number: default_shard_count(),
+        });
         let num_shards = default_shard_count(); // todo потом заменмить на 
норм значепние
-        let config = BackgroundConfig{
+        let config = BackgroundConfig {
             max_in_flight: 4,
             in_flight_timeout: None,
             batch_size: None,
@@ -489,24 +503,23 @@ impl IggyProducer {
 
         match &self.send_mode {
             SendMode::Sync => {
-                self.core.send_internal(&stream_id, &topic_id, messages, 
None).await
-
-            }
-            SendMode::Background => {
-                match &self.dispatcher {
-                    Some(disp) => {
-                        disp.dispatch(shardMessage{
-                            messages,
-                            stream: stream_id,
-                            topic: topic_id,
-                            partitioning: None,
-                        }).await.map_err(|err| IggyError::BackgroundSendError)
-                    }
-                    None => Ok(())
-                }
+                self.core
+                    .send_internal(&stream_id, &topic_id, messages, None)
+                    .await
             }
+            SendMode::Background => match &self.dispatcher {
+                Some(disp) => disp
+                    .dispatch(shardMessage {
+                        messages,
+                        stream: stream_id,
+                        topic: topic_id,
+                        partitioning: None,
+                    })
+                    .await
+                    .map_err(|err| IggyError::BackgroundSendError),
+                None => Ok(()),
+            },
         }
-
     }
 
     pub async fn send_one(&self, message: IggyMessage) -> Result<(), 
IggyError> {
@@ -526,7 +539,9 @@ impl IggyProducer {
         let stream_id = &self.core.stream_id;
         let topic_id = &self.core.topic_id;
 
-        self.core.send_internal(stream_id, topic_id, messages, 
partitioning).await
+        self.core
+            .send_internal(stream_id, topic_id, messages, partitioning)
+            .await
     }
 
     pub async fn send_to(
@@ -541,7 +556,9 @@ impl IggyProducer {
             return Ok(());
         }
 
-        self.core.send_internal(&stream, &topic, messages, partitioning).await
+        self.core
+            .send_internal(&stream, &topic, messages, partitioning)
+            .await
     }
 }
 
@@ -549,11 +566,3 @@ fn default_shard_count() -> usize {
     let cpus = num_cpus::get();
     cpus.clamp(2, 16)
 }
-
-fn default_sharder(msg: &IggyMessage) -> &[u8] {
-    if let Some(h) = &msg.user_headers {
-        return &h[..h.len().min(16)];
-    }
-
-    &msg.payload[..msg.payload.len().min(16)]
-}

Reply via email to