This is an automated email from the ASF dual-hosted git repository.

bashirbekov pushed a commit to branch add-sync-client
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/add-sync-client by this push:
     new 60363657 del
60363657 is described below

commit 60363657e94c40ada4bef92bda1b1720e3d3abbb
Author: haze518 <[email protected]>
AuthorDate: Sat Sep 27 14:03:02 2025 +0600

    del
---
 core/sdk/src/clients/mod.rs                 |   2 +
 core/sdk/src/clients/producer.rs            | 229 +++++++++++++++++-----------
 core/sdk/src/clients/producer_builder.rs    |   3 +-
 core/sdk/src/clients/producer_dispatcher.rs |  17 +--
 4 files changed, 153 insertions(+), 98 deletions(-)

diff --git a/core/sdk/src/clients/mod.rs b/core/sdk/src/clients/mod.rs
index 9f51cc52..df84b3b6 100644
--- a/core/sdk/src/clients/mod.rs
+++ b/core/sdk/src/clients/mod.rs
@@ -33,8 +33,10 @@ pub mod consumer_builder;
 pub mod producer;
 pub mod producer_builder;
 pub mod producer_config;
+// #[cfg(feature = "async")]
 pub mod producer_dispatcher;
 pub mod producer_error_callback;
+// #[cfg(feature = "async")]
 pub mod producer_sharding;
 
 const ORDERING: std::sync::atomic::Ordering = 
std::sync::atomic::Ordering::SeqCst;
diff --git a/core/sdk/src/clients/producer.rs b/core/sdk/src/clients/producer.rs
index 47b31308..6d235191 100644
--- a/core/sdk/src/clients/producer.rs
+++ b/core/sdk/src/clients/producer.rs
@@ -18,7 +18,6 @@
 use super::ORDERING;
 use crate::client_wrappers::ClientWrapper;
 use crate::clients::MAX_BATCH_LENGTH;
-use crate::clients::producer_builder::SendMode;
 use crate::clients::producer_config::DirectConfig;
 use crate::clients::producer_dispatcher::ProducerDispatcher;
 use crate::runtime::{Interval, Runtime};
@@ -36,6 +35,83 @@ use std::sync::atomic::{AtomicBool, AtomicU64};
 use std::time::Duration;
 use tracing::{error, info, trace, warn};
 
+#[maybe_async::maybe_async]
+pub trait Dispatcher: Send + Sync + 'static {
+    async fn send(&self, stream: Arc<Identifier>, topic: Arc<Identifier>, 
msgs: Vec<IggyMessage>, partitioning: Option<Arc<Partitioning>>) -> Result<(), 
iggy_common::IggyError>;
+    async fn shutdown(&mut self);
+}
+
+pub struct BackgroundDispatcher {
+    dispatcher: ProducerDispatcher,
+}
+
+#[maybe_async::async_impl]
+impl Dispatcher for BackgroundDispatcher {
+    async fn send(&self, stream: Arc<Identifier>, topic: Arc<Identifier>, 
msgs: Vec<IggyMessage>, partitioning: Option<Arc<Partitioning>>) -> Result<(), 
iggy_common::IggyError> {
+        self.dispatcher.dispatch(msgs, stream, topic, partitioning).await
+    }
+
+    async fn shutdown(&mut self) {
+        self.dispatcher.shutdown().await
+    }
+}
+
+pub struct DirectDispatcher<R: Runtime> {
+    core: Arc<ProducerCore<R>>,
+    config: DirectConfig,
+}
+
+impl<R: Runtime> DirectDispatcher<R> {
+    pub fn new(core: Arc<ProducerCore<R>>, config: DirectConfig) -> Self {
+        Self { core, config }
+    }
+}
+
+#[maybe_async::maybe_async]
+impl<R: Runtime + 'static> Dispatcher for DirectDispatcher<R> {
+    async fn send(&self, stream: Arc<Identifier>, topic: Arc<Identifier>, mut 
msgs: Vec<IggyMessage>, partitioning: Option<Arc<Partitioning>>) -> Result<(), 
iggy_common::IggyError> {
+        if msgs.is_empty() {
+            return Ok(());
+        }
+
+        // Handle linger time
+        let linger_time_micros = self.config.linger_time.as_micros();
+        if linger_time_micros > 0 {
+            self.core.wait_before_sending(
+                linger_time_micros,
+                self.core.last_sent_at.load(ORDERING)
+            ).await;
+        }
+
+        // Handle batching
+        let max_batch = if self.config.batch_length == 0 {
+            MAX_BATCH_LENGTH
+        } else {
+            self.config.batch_length as usize
+        };
+
+        while !msgs.is_empty() {
+            let batch_size = max_batch.min(msgs.len());
+
+            // Split off the batch from the front of the vector
+            let chunk: Vec<IggyMessage> = msgs.drain(0..batch_size).collect();
+
+            if let Err(err) = self.core.send_raw(&stream, &topic, chunk, 
partitioning.clone()).await {
+                // Return remaining messages as failed
+                return Err(self.core.make_failed_error(err, msgs));
+            }
+
+            self.core.last_sent_at.store(IggyTimestamp::now().into(), 
ORDERING);
+            // No need to update index since we're draining from the front
+        }
+
+        Ok(())
+    }
+
+    async fn shutdown(&mut self) {}
+}
+
+
 #[cfg(test)]
 use mockall::automock;
 
@@ -72,7 +148,6 @@ pub struct ProducerCore<R: Runtime> {
     last_sent_at: Arc<AtomicU64>,
     send_retries_count: Option<u32>,
     send_retries_interval: Option<IggyDuration>,
-    direct_config: Option<DirectConfig>,
     rt: Arc<R>,
 }
 
@@ -155,7 +230,8 @@ impl<R: Runtime> ProducerCore<R> {
 
         let can_send = self.can_send.clone();
 
-        tokio::spawn(async move {
+        #[cfg(feature = "async")]
+        self.rt.spawn(Box::new(async move {
             while let Some(event) = receiver.next().await {
                 trace!("Received diagnostic event: {event}");
                 match event {
@@ -179,7 +255,35 @@ impl<R: Runtime> ProducerCore<R> {
                     }
                 }
             }
-        });
+        }));
+
+        #[cfg(feature = "sync")]
+        // TODO change async-broadcast to sync impl
+        self.rt.spawn(Box::new(move || {
+            // while let Some(event) = receiver.next() {
+            //     trace!("Received diagnostic event: {event}");
+            //     match event {
+            //         DiagnosticEvent::Shutdown => {
+            //             can_send.store(false, ORDERING);
+            //             warn!("Client has been shutdown");
+            //         }
+            //         DiagnosticEvent::Connected => {
+            //             can_send.store(false, ORDERING);
+            //             trace!("Connected to the server");
+            //         }
+            //         DiagnosticEvent::Disconnected => {
+            //             can_send.store(false, ORDERING);
+            //             warn!("Disconnected from the server");
+            //         }
+            //         DiagnosticEvent::SignedIn => {
+            //             can_send.store(true, ORDERING);
+            //         }
+            //         DiagnosticEvent::SignedOut => {
+            //             can_send.store(false, ORDERING);
+            //         }
+            //     }
+            // }
+        }));
     }
 
     #[maybe_async::maybe_async]
@@ -363,11 +467,10 @@ impl<R: Runtime> ProducerCore<R> {
             topic_name: self.topic_name.clone(),
         }
     }
-}
 
-#[maybe_async::maybe_async]
-impl<R: Runtime + 'static> ProducerCoreBackend for ProducerCore<R> {
-    async fn send_internal(
+    /// Raw send method without batching or linger time - for use by 
dispatchers
+    #[maybe_async::maybe_async]
+    pub async fn send_raw(
         &self,
         stream: &Identifier,
         topic: &Identifier,
@@ -389,53 +492,32 @@ impl<R: Runtime + 'static> ProducerCoreBackend for 
ProducerCore<R> {
             }
         };
 
-        match &self.direct_config {
-            Some(cfg) => {
-                let linger_time_micros = cfg.linger_time.as_micros();
-                if linger_time_micros > 0 {
-                    self.wait_before_sending(linger_time_micros, 
self.last_sent_at.load(ORDERING))
-                        .await;
-                }
-
-                let max = if cfg.batch_length == 0 {
-                    MAX_BATCH_LENGTH
-                } else {
-                    cfg.batch_length as usize
-                };
-                let mut index = 0;
-                while index < msgs.len() {
-                    let end = (index + max).min(msgs.len());
-                    let chunk = &mut msgs[index..end];
-
-                    if let Err(err) = self.try_send_messages(stream, topic, 
&part, chunk).await {
-                        let failed_tail = msgs.split_off(index);
-                        return Err(self.make_failed_error(err, failed_tail));
-                    }
-                    self.last_sent_at
-                        .store(IggyTimestamp::now().into(), ORDERING);
-                    index = end;
-                }
-            }
-            // background send on
-            _ => {
-                self.try_send_messages(stream, topic, &part, &mut msgs)
-                    .await
-                    .map_err(|err| self.make_failed_error(err, msgs))?;
-                self.last_sent_at
-                    .store(IggyTimestamp::now().into(), ORDERING);
-            }
-        }
+        self.try_send_messages(stream, topic, &part, &mut msgs)
+            .await
+            .map_err(|err| self.make_failed_error(err, msgs))
+    }
+}
 
-        Ok(())
+#[maybe_async::maybe_async]
+impl<R: Runtime + 'static> ProducerCoreBackend for ProducerCore<R> {
+    async fn send_internal(
+        &self,
+        stream: &Identifier,
+        topic: &Identifier,
+        msgs: Vec<IggyMessage>,
+        partitioning: Option<Arc<Partitioning>>,
+    ) -> Result<(), IggyError> {
+        // Simple pass-through to send_raw for backward compatibility
+        self.send_raw(stream, topic, msgs, partitioning).await
     }
 }
 
-pub struct IggyProducer<R: Runtime> {
+pub struct IggyProducer<R: Runtime, D: Dispatcher> {
     core: Arc<ProducerCore<R>>,
-    dispatcher: Option<ProducerDispatcher>,
+    dispatcher: D,
 }
 
-impl<R: Runtime> IggyProducer<R> {
+impl<R: Runtime + 'static, D: Dispatcher> IggyProducer<R, D> {
     #[allow(clippy::too_many_arguments)]
     pub(crate) fn new(
         client: IggySharedMut<ClientWrapper>,
@@ -454,8 +536,8 @@ impl<R: Runtime> IggyProducer<R> {
         topic_max_size: MaxTopicSize,
         send_retries_count: Option<u32>,
         send_retries_interval: Option<IggyDuration>,
-        mode: SendMode,
         rt: Arc<R>,
+        dispatcher: D,
     ) -> Self {
         let core = Arc::new(ProducerCore {
             initialized: AtomicBool::new(false),
@@ -478,16 +560,8 @@ impl<R: Runtime> IggyProducer<R> {
             last_sent_at: Arc::new(AtomicU64::new(0)),
             send_retries_count,
             send_retries_interval,
-            direct_config: match mode {
-                SendMode::Direct(ref cfg) => Some(cfg.clone()),
-                _ => None,
-            },
             rt,
         });
-        let dispatcher = match mode {
-            SendMode::Background(cfg) => 
Some(ProducerDispatcher::new(core.clone(), cfg)),
-            _ => None,
-        };
 
         Self { core, dispatcher }
     }
@@ -508,6 +582,7 @@ impl<R: Runtime> IggyProducer<R> {
         self.core.init().await
     }
 
+    #[maybe_async::maybe_async]
     pub async fn send(&self, messages: Vec<IggyMessage>) -> Result<(), 
IggyError> {
         if messages.is_empty() {
             trace!("No messages to send.");
@@ -517,20 +592,15 @@ impl<R: Runtime> IggyProducer<R> {
         let stream_id = self.core.stream_id.clone();
         let topic_id = self.core.topic_id.clone();
 
-        match &self.dispatcher {
-            Some(disp) => disp.dispatch(messages, stream_id, topic_id, 
None).await,
-            None => {
-                self.core
-                    .send_internal(&stream_id, &topic_id, messages, None)
-                    .await
-            }
-        }
+        self.dispatcher.send(stream_id, topic_id, messages, None).await
     }
 
+    #[maybe_async::maybe_async]
     pub async fn send_one(&self, message: IggyMessage) -> Result<(), 
IggyError> {
         self.send(vec![message]).await
     }
 
+    #[maybe_async::maybe_async]
     pub async fn send_with_partitioning(
         &self,
         messages: Vec<IggyMessage>,
@@ -544,19 +614,10 @@ impl<R: Runtime> IggyProducer<R> {
         let stream_id = self.core.stream_id.clone();
         let topic_id = self.core.topic_id.clone();
 
-        match &self.dispatcher {
-            Some(disp) => {
-                disp.dispatch(messages, stream_id, topic_id, partitioning)
-                    .await
-            }
-            None => {
-                self.core
-                    .send_internal(&stream_id, &topic_id, messages, 
partitioning)
-                    .await
-            }
-        }
+        self.dispatcher.send(stream_id, topic_id, messages, partitioning).await
     }
 
+    #[maybe_async::maybe_async]
     pub async fn send_to(
         &self,
         stream: Arc<Identifier>,
@@ -569,19 +630,11 @@ impl<R: Runtime> IggyProducer<R> {
             return Ok(());
         }
 
-        match &self.dispatcher {
-            Some(disp) => disp.dispatch(messages, stream, topic, 
partitioning).await,
-            None => {
-                self.core
-                    .send_internal(&stream, &topic, messages, partitioning)
-                    .await
-            }
-        }
+        self.dispatcher.send(stream, topic, messages, partitioning).await
     }
 
-    pub async fn shutdown(self) {
-        if let Some(disp) = self.dispatcher {
-            disp.shutdown().await;
-        }
+    #[maybe_async::maybe_async]
+    pub async fn shutdown(&mut self) {
+        self.dispatcher.shutdown().await
     }
 }
diff --git a/core/sdk/src/clients/producer_builder.rs 
b/core/sdk/src/clients/producer_builder.rs
index 03b9a8d6..b9866aac 100644
--- a/core/sdk/src/clients/producer_builder.rs
+++ b/core/sdk/src/clients/producer_builder.rs
@@ -17,7 +17,8 @@
 
 use crate::client_wrappers::ClientWrapper;
 use crate::clients::producer_config::{BackgroundConfig, DirectConfig};
-use crate::prelude::IggyProducer;
+use crate::clients::producer::{IggyProducer, DirectDispatcher, 
BackgroundDispatcher};
+use crate::runtime::Runtime;
 use iggy_common::locking::IggySharedMut;
 use iggy_common::{
     EncryptorKind, Identifier, IggyDuration, IggyExpiry, MaxTopicSize, 
Partitioner, Partitioning,
diff --git a/core/sdk/src/clients/producer_dispatcher.rs 
b/core/sdk/src/clients/producer_dispatcher.rs
index fa6dded4..0a1e88bd 100644
--- a/core/sdk/src/clients/producer_dispatcher.rs
+++ b/core/sdk/src/clients/producer_dispatcher.rs
@@ -33,7 +33,7 @@ pub struct ProducerDispatcher {
     slots_permit: Arc<Semaphore>,
     bytes_permit: Arc<Semaphore>,
     stop_tx: broadcast::Sender<()>,
-    _join_handle: JoinHandle<()>,
+    _join_handle: Option<JoinHandle<()>>,
 }
 
 impl ProducerDispatcher {
@@ -98,7 +98,7 @@ impl ProducerDispatcher {
             bytes_permit: Arc::new(Semaphore::new(bytes_permit)),
             slots_permit: Arc::new(Semaphore::new(slot_permit)),
             stop_tx,
-            _join_handle: handle,
+            _join_handle: Some(handle),
         }
     }
 
@@ -211,21 +211,20 @@ impl ProducerDispatcher {
             .await
     }
 
-    pub async fn shutdown(mut self) {
+    pub async fn shutdown(&mut self) {
         if self.closed.swap(true, Ordering::Relaxed) {
             return;
         }
-
         let _ = self.stop_tx.send(());
-
-        for shard in self.shards.drain(..) {
+        for shard in std::mem::take(&mut self.shards).into_iter() {
             if let Err(e) = shard._handle.await {
                 tracing::error!("shard panicked: {e:?}");
             }
         }
-
-        if let Err(e) = self._join_handle.await {
-            tracing::error!("error-worker panicked: {e:?}");
+        if let Some(j) = self._join_handle.take() {
+            if let Err(e) = j.await {
+                tracing::error!("error-worker panicked: {e:?}");
+            }
         }
     }
 }

Reply via email to