This is an automated email from the ASF dual-hosted git repository.

bashirbekov pushed a commit to branch add-sync-client
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/add-sync-client by this push:
     new bc17f18b del
bc17f18b is described below

commit bc17f18baf1805778fe46b317b61e1a3dfb7007a
Author: haze518 <[email protected]>
AuthorDate: Mon Sep 22 12:03:07 2025 +0600

    del
---
 core/sdk/src/clients/producer.rs | 45 ++++++++++++++++++++++++----------------
 core/sdk/src/lib.rs              |  1 +
 core/sdk/src/runtime/mod.rs      | 30 +++++++++++++++++++++++++++
 3 files changed, 58 insertions(+), 18 deletions(-)

diff --git a/core/sdk/src/clients/producer.rs b/core/sdk/src/clients/producer.rs
index ac21d0b5..47b31308 100644
--- a/core/sdk/src/clients/producer.rs
+++ b/core/sdk/src/clients/producer.rs
@@ -21,6 +21,7 @@ use crate::clients::MAX_BATCH_LENGTH;
 use crate::clients::producer_builder::SendMode;
 use crate::clients::producer_config::DirectConfig;
 use crate::clients::producer_dispatcher::ProducerDispatcher;
+use crate::runtime::{Interval, Runtime};
 use bytes::Bytes;
 use futures_util::StreamExt;
 use iggy_binary_protocol::{Client, MessageClient, StreamClient, TopicClient};
@@ -33,24 +34,24 @@ use std::sync::Arc;
 use std::sync::atomic::Ordering;
 use std::sync::atomic::{AtomicBool, AtomicU64};
 use std::time::Duration;
-use tokio::time::{Interval, sleep};
 use tracing::{error, info, trace, warn};
 
 #[cfg(test)]
 use mockall::automock;
 
+#[maybe_async::maybe_async]
 #[cfg_attr(test, automock)]
 pub trait ProducerCoreBackend: Send + Sync + 'static {
-    fn send_internal(
+    async fn send_internal(
         &self,
         stream: &Identifier,
         topic: &Identifier,
         msgs: Vec<IggyMessage>,
         partitioning: Option<Arc<Partitioning>>,
-    ) -> impl Future<Output = Result<(), IggyError>> + Send;
+    ) -> Result<(), IggyError>;
 }
 
-pub struct ProducerCore {
+pub struct ProducerCore<R: Runtime> {
     initialized: AtomicBool,
     can_send: Arc<AtomicBool>,
     client: Arc<IggySharedMut<ClientWrapper>>,
@@ -72,9 +73,11 @@ pub struct ProducerCore {
     send_retries_count: Option<u32>,
     send_retries_interval: Option<IggyDuration>,
     direct_config: Option<DirectConfig>,
+    rt: Arc<R>,
 }
 
-impl ProducerCore {
+impl<R: Runtime> ProducerCore<R> {
+    #[maybe_async::maybe_async]
     pub async fn init(&self) -> Result<(), IggyError> {
         if self.initialized.load(Ordering::SeqCst) {
             return Ok(());
@@ -141,6 +144,7 @@ impl ProducerCore {
         Ok(())
     }
 
+    #[maybe_async::maybe_async]
     async fn subscribe_events(&self) {
         trace!("Subscribing to diagnostic events");
         let mut receiver;
@@ -178,6 +182,7 @@ impl ProducerCore {
         });
     }
 
+    #[maybe_async::maybe_async]
     async fn try_send_messages(
         &self,
         stream: &Identifier,
@@ -199,7 +204,7 @@ impl ProducerCore {
         }
 
         let mut timer = if let Some(interval) = self.send_retries_interval {
-            let mut timer = tokio::time::interval(interval.get_duration());
+            let mut timer = self.rt.interval(interval);
             timer.tick().await;
             Some(timer)
         } else {
@@ -219,12 +224,13 @@ impl ProducerCore {
         .await
     }
 
+    #[maybe_async::maybe_async]
     async fn wait_until_connected(
         &self,
         max_retries: u32,
         stream: &Identifier,
         topic: &Identifier,
-        timer: &mut Option<Interval>,
+        timer: &mut Option<R::Interval>,
     ) -> Result<(), IggyError> {
         let mut retries = 0;
         while !self.can_send.load(ORDERING) {
@@ -253,6 +259,7 @@ impl ProducerCore {
         Ok(())
     }
 
+    #[maybe_async::maybe_async]
     async fn send_with_retries(
         &self,
         max_retries: u32,
@@ -260,7 +267,7 @@ impl ProducerCore {
         topic: &Identifier,
         partitioning: &Arc<Partitioning>,
         messages: &mut [IggyMessage],
-        timer: &mut Option<Interval>,
+        timer: &mut Option<R::Interval>,
     ) -> Result<(), IggyError> {
         let client = self.client.read().await;
         let mut retries = 0;
@@ -328,7 +335,8 @@ impl ProducerCore {
         }
     }
 
-    async fn wait_before_sending(interval: u64, last_sent_at: u64) {
+    #[maybe_async::maybe_async]
+    async fn wait_before_sending(&self, interval: u64, last_sent_at: u64) {
         if interval == 0 {
             return;
         }
@@ -344,7 +352,7 @@ impl ProducerCore {
         trace!(
             "Waiting for {remaining} microseconds before sending messages... 
{interval} - {elapsed} = {remaining}"
         );
-        sleep(Duration::from_micros(remaining)).await;
+        
self.rt.sleep(IggyDuration::new(Duration::from_micros(remaining))).await;
     }
 
     fn make_failed_error(&self, cause: IggyError, failed: Vec<IggyMessage>) -> 
IggyError {
@@ -357,7 +365,8 @@ impl ProducerCore {
     }
 }
 
-impl ProducerCoreBackend for ProducerCore {
+#[maybe_async::maybe_async]
+impl<R: Runtime + 'static> ProducerCoreBackend for ProducerCore<R> {
     async fn send_internal(
         &self,
         stream: &Identifier,
@@ -384,7 +393,7 @@ impl ProducerCoreBackend for ProducerCore {
             Some(cfg) => {
                 let linger_time_micros = cfg.linger_time.as_micros();
                 if linger_time_micros > 0 {
-                    Self::wait_before_sending(linger_time_micros, 
self.last_sent_at.load(ORDERING))
+                    self.wait_before_sending(linger_time_micros, 
self.last_sent_at.load(ORDERING))
                         .await;
                 }
 
@@ -421,15 +430,12 @@ impl ProducerCoreBackend for ProducerCore {
     }
 }
 
-unsafe impl Send for IggyProducer {}
-unsafe impl Sync for IggyProducer {}
-
-pub struct IggyProducer {
-    core: Arc<ProducerCore>,
+pub struct IggyProducer<R: Runtime> {
+    core: Arc<ProducerCore<R>>,
     dispatcher: Option<ProducerDispatcher>,
 }
 
-impl IggyProducer {
+impl<R: Runtime> IggyProducer<R> {
     #[allow(clippy::too_many_arguments)]
     pub(crate) fn new(
         client: IggySharedMut<ClientWrapper>,
@@ -449,6 +455,7 @@ impl IggyProducer {
         send_retries_count: Option<u32>,
         send_retries_interval: Option<IggyDuration>,
         mode: SendMode,
+        rt: Arc<R>,
     ) -> Self {
         let core = Arc::new(ProducerCore {
             initialized: AtomicBool::new(false),
@@ -475,6 +482,7 @@ impl IggyProducer {
                 SendMode::Direct(ref cfg) => Some(cfg.clone()),
                 _ => None,
             },
+            rt,
         });
         let dispatcher = match mode {
             SendMode::Background(cfg) => 
Some(ProducerDispatcher::new(core.clone(), cfg)),
@@ -495,6 +503,7 @@ impl IggyProducer {
     /// Initializes the producer by subscribing to diagnostic events, creating 
the stream and topic if they do not exist etc.
     ///
     /// Note: This method must be invoked before producing messages.
+    #[maybe_async::maybe_async]
     pub async fn init(&self) -> Result<(), IggyError> {
         self.core.init().await
     }
diff --git a/core/sdk/src/lib.rs b/core/sdk/src/lib.rs
index e57b7d04..34885a42 100644
--- a/core/sdk/src/lib.rs
+++ b/core/sdk/src/lib.rs
@@ -30,3 +30,4 @@ pub mod stream_builder;
 pub mod tcp;
 pub mod protocol;
 pub mod connection;
+pub mod runtime;
diff --git a/core/sdk/src/runtime/mod.rs b/core/sdk/src/runtime/mod.rs
new file mode 100644
index 00000000..f4747bfb
--- /dev/null
+++ b/core/sdk/src/runtime/mod.rs
@@ -0,0 +1,30 @@
+use std::{fmt::Debug, time::Instant};
+use iggy_common::IggyDuration;
+
+#[cfg(feature = "async")]
+use std::pin::Pin;
+
+#[cfg(feature = "async")]
+pub type Job = Box<
+    dyn FnOnce() -> Pin<Box<dyn Future<Output = ()> + Send + 'static>>
+    + Send
+    + 'static,
+>;
+
+#[cfg(feature = "sync")]
+pub type Job = Box<dyn FnOnce() + Send + 'static>;
+
+#[maybe_async::maybe_async(AFIT)]
+pub trait Interval {
+    async fn tick(&self) -> Instant;
+}
+
+#[maybe_async::maybe_async(AFIT)]
+pub trait Runtime: Send + Sync + Debug {
+    type Join: Send + 'static;
+    type Interval: Interval + Send + 'static;
+
+    async fn sleep(&self, dur: IggyDuration);
+    fn spawn(&self, job: Job);
+    fn interval(&self, dur: IggyDuration) -> Self::Interval;
+}

Reply via email to