This is an automated email from the ASF dual-hosted git repository.
bashirbekov pushed a commit to branch add-sync-client
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/add-sync-client by this push:
new bc17f18b del
bc17f18b is described below
commit bc17f18baf1805778fe46b317b61e1a3dfb7007a
Author: haze518 <[email protected]>
AuthorDate: Mon Sep 22 12:03:07 2025 +0600
del
---
core/sdk/src/clients/producer.rs | 45 ++++++++++++++++++++++++----------------
core/sdk/src/lib.rs | 1 +
core/sdk/src/runtime/mod.rs | 30 +++++++++++++++++++++++++++
3 files changed, 58 insertions(+), 18 deletions(-)
diff --git a/core/sdk/src/clients/producer.rs b/core/sdk/src/clients/producer.rs
index ac21d0b5..47b31308 100644
--- a/core/sdk/src/clients/producer.rs
+++ b/core/sdk/src/clients/producer.rs
@@ -21,6 +21,7 @@ use crate::clients::MAX_BATCH_LENGTH;
use crate::clients::producer_builder::SendMode;
use crate::clients::producer_config::DirectConfig;
use crate::clients::producer_dispatcher::ProducerDispatcher;
+use crate::runtime::{Interval, Runtime};
use bytes::Bytes;
use futures_util::StreamExt;
use iggy_binary_protocol::{Client, MessageClient, StreamClient, TopicClient};
@@ -33,24 +34,24 @@ use std::sync::Arc;
use std::sync::atomic::Ordering;
use std::sync::atomic::{AtomicBool, AtomicU64};
use std::time::Duration;
-use tokio::time::{Interval, sleep};
use tracing::{error, info, trace, warn};
#[cfg(test)]
use mockall::automock;
+#[maybe_async::maybe_async]
#[cfg_attr(test, automock)]
pub trait ProducerCoreBackend: Send + Sync + 'static {
- fn send_internal(
+ async fn send_internal(
&self,
stream: &Identifier,
topic: &Identifier,
msgs: Vec<IggyMessage>,
partitioning: Option<Arc<Partitioning>>,
- ) -> impl Future<Output = Result<(), IggyError>> + Send;
+ ) -> Result<(), IggyError>;
}
-pub struct ProducerCore {
+pub struct ProducerCore<R: Runtime> {
initialized: AtomicBool,
can_send: Arc<AtomicBool>,
client: Arc<IggySharedMut<ClientWrapper>>,
@@ -72,9 +73,11 @@ pub struct ProducerCore {
send_retries_count: Option<u32>,
send_retries_interval: Option<IggyDuration>,
direct_config: Option<DirectConfig>,
+ rt: Arc<R>,
}
-impl ProducerCore {
+impl<R: Runtime> ProducerCore<R> {
+ #[maybe_async::maybe_async]
pub async fn init(&self) -> Result<(), IggyError> {
if self.initialized.load(Ordering::SeqCst) {
return Ok(());
@@ -141,6 +144,7 @@ impl ProducerCore {
Ok(())
}
+ #[maybe_async::maybe_async]
async fn subscribe_events(&self) {
trace!("Subscribing to diagnostic events");
let mut receiver;
@@ -178,6 +182,7 @@ impl ProducerCore {
});
}
+ #[maybe_async::maybe_async]
async fn try_send_messages(
&self,
stream: &Identifier,
@@ -199,7 +204,7 @@ impl ProducerCore {
}
let mut timer = if let Some(interval) = self.send_retries_interval {
- let mut timer = tokio::time::interval(interval.get_duration());
+ let mut timer = self.rt.interval(interval);
timer.tick().await;
Some(timer)
} else {
@@ -219,12 +224,13 @@ impl ProducerCore {
.await
}
+ #[maybe_async::maybe_async]
async fn wait_until_connected(
&self,
max_retries: u32,
stream: &Identifier,
topic: &Identifier,
- timer: &mut Option<Interval>,
+ timer: &mut Option<R::Interval>,
) -> Result<(), IggyError> {
let mut retries = 0;
while !self.can_send.load(ORDERING) {
@@ -253,6 +259,7 @@ impl ProducerCore {
Ok(())
}
+ #[maybe_async::maybe_async]
async fn send_with_retries(
&self,
max_retries: u32,
@@ -260,7 +267,7 @@ impl ProducerCore {
topic: &Identifier,
partitioning: &Arc<Partitioning>,
messages: &mut [IggyMessage],
- timer: &mut Option<Interval>,
+ timer: &mut Option<R::Interval>,
) -> Result<(), IggyError> {
let client = self.client.read().await;
let mut retries = 0;
@@ -328,7 +335,8 @@ impl ProducerCore {
}
}
- async fn wait_before_sending(interval: u64, last_sent_at: u64) {
+ #[maybe_async::maybe_async]
+ async fn wait_before_sending(&self, interval: u64, last_sent_at: u64) {
if interval == 0 {
return;
}
@@ -344,7 +352,7 @@ impl ProducerCore {
trace!(
"Waiting for {remaining} microseconds before sending messages...
{interval} - {elapsed} = {remaining}"
);
- sleep(Duration::from_micros(remaining)).await;
+
self.rt.sleep(IggyDuration::new(Duration::from_micros(remaining))).await;
}
fn make_failed_error(&self, cause: IggyError, failed: Vec<IggyMessage>) ->
IggyError {
@@ -357,7 +365,8 @@ impl ProducerCore {
}
}
-impl ProducerCoreBackend for ProducerCore {
+#[maybe_async::maybe_async]
+impl<R: Runtime + 'static> ProducerCoreBackend for ProducerCore<R> {
async fn send_internal(
&self,
stream: &Identifier,
@@ -384,7 +393,7 @@ impl ProducerCoreBackend for ProducerCore {
Some(cfg) => {
let linger_time_micros = cfg.linger_time.as_micros();
if linger_time_micros > 0 {
- Self::wait_before_sending(linger_time_micros,
self.last_sent_at.load(ORDERING))
+ self.wait_before_sending(linger_time_micros,
self.last_sent_at.load(ORDERING))
.await;
}
@@ -421,15 +430,12 @@ impl ProducerCoreBackend for ProducerCore {
}
}
-unsafe impl Send for IggyProducer {}
-unsafe impl Sync for IggyProducer {}
-
-pub struct IggyProducer {
- core: Arc<ProducerCore>,
+pub struct IggyProducer<R: Runtime> {
+ core: Arc<ProducerCore<R>>,
dispatcher: Option<ProducerDispatcher>,
}
-impl IggyProducer {
+impl<R: Runtime> IggyProducer<R> {
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
client: IggySharedMut<ClientWrapper>,
@@ -449,6 +455,7 @@ impl IggyProducer {
send_retries_count: Option<u32>,
send_retries_interval: Option<IggyDuration>,
mode: SendMode,
+ rt: Arc<R>,
) -> Self {
let core = Arc::new(ProducerCore {
initialized: AtomicBool::new(false),
@@ -475,6 +482,7 @@ impl IggyProducer {
SendMode::Direct(ref cfg) => Some(cfg.clone()),
_ => None,
},
+ rt,
});
let dispatcher = match mode {
SendMode::Background(cfg) =>
Some(ProducerDispatcher::new(core.clone(), cfg)),
@@ -495,6 +503,7 @@ impl IggyProducer {
/// Initializes the producer by subscribing to diagnostic events, creating
the stream and topic if they do not exist etc.
///
/// Note: This method must be invoked before producing messages.
+ #[maybe_async::maybe_async]
pub async fn init(&self) -> Result<(), IggyError> {
self.core.init().await
}
diff --git a/core/sdk/src/lib.rs b/core/sdk/src/lib.rs
index e57b7d04..34885a42 100644
--- a/core/sdk/src/lib.rs
+++ b/core/sdk/src/lib.rs
@@ -30,3 +30,4 @@ pub mod stream_builder;
pub mod tcp;
pub mod protocol;
pub mod connection;
+pub mod runtime;
diff --git a/core/sdk/src/runtime/mod.rs b/core/sdk/src/runtime/mod.rs
new file mode 100644
index 00000000..f4747bfb
--- /dev/null
+++ b/core/sdk/src/runtime/mod.rs
@@ -0,0 +1,30 @@
+use std::{fmt::Debug, time::Instant};
+use iggy_common::IggyDuration;
+
+#[cfg(feature = "async")]
+use std::pin::Pin;
+
+#[cfg(feature = "async")]
+pub type Job = Box<
+ dyn FnOnce() -> Pin<Box<dyn Future<Output = ()> + Send + 'static>>
+ + Send
+ + 'static,
+>;
+
+#[cfg(feature = "sync")]
+pub type Job = Box<dyn FnOnce() + Send + 'static>;
+
+#[maybe_async::maybe_async(AFIT)]
+pub trait Interval {
+ async fn tick(&self) -> Instant;
+}
+
+#[maybe_async::maybe_async(AFIT)]
+pub trait Runtime: Send + Sync + Debug {
+ type Join: Send + 'static;
+ type Interval: Interval + Send + 'static;
+
+ async fn sleep(&self, dur: IggyDuration);
+ fn spawn(&self, job: Job);
+ fn interval(&self, dur: IggyDuration) -> Self::Interval;
+}