This is an automated email from the ASF dual-hosted git repository.
bashirbekov pushed a commit to branch add-sync-client
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/add-sync-client by this push:
new 60363657 del
60363657 is described below
commit 60363657e94c40ada4bef92bda1b1720e3d3abbb
Author: haze518 <[email protected]>
AuthorDate: Sat Sep 27 14:03:02 2025 +0600
del
---
core/sdk/src/clients/mod.rs | 2 +
core/sdk/src/clients/producer.rs | 229 +++++++++++++++++-----------
core/sdk/src/clients/producer_builder.rs | 3 +-
core/sdk/src/clients/producer_dispatcher.rs | 17 +--
4 files changed, 153 insertions(+), 98 deletions(-)
diff --git a/core/sdk/src/clients/mod.rs b/core/sdk/src/clients/mod.rs
index 9f51cc52..df84b3b6 100644
--- a/core/sdk/src/clients/mod.rs
+++ b/core/sdk/src/clients/mod.rs
@@ -33,8 +33,10 @@ pub mod consumer_builder;
pub mod producer;
pub mod producer_builder;
pub mod producer_config;
+// #[cfg(feature = "async")]
pub mod producer_dispatcher;
pub mod producer_error_callback;
+// #[cfg(feature = "async")]
pub mod producer_sharding;
const ORDERING: std::sync::atomic::Ordering =
std::sync::atomic::Ordering::SeqCst;
diff --git a/core/sdk/src/clients/producer.rs b/core/sdk/src/clients/producer.rs
index 47b31308..6d235191 100644
--- a/core/sdk/src/clients/producer.rs
+++ b/core/sdk/src/clients/producer.rs
@@ -18,7 +18,6 @@
use super::ORDERING;
use crate::client_wrappers::ClientWrapper;
use crate::clients::MAX_BATCH_LENGTH;
-use crate::clients::producer_builder::SendMode;
use crate::clients::producer_config::DirectConfig;
use crate::clients::producer_dispatcher::ProducerDispatcher;
use crate::runtime::{Interval, Runtime};
@@ -36,6 +35,83 @@ use std::sync::atomic::{AtomicBool, AtomicU64};
use std::time::Duration;
use tracing::{error, info, trace, warn};
+#[maybe_async::maybe_async]
+pub trait Dispatcher: Send + Sync + 'static {
+ async fn send(&self, stream: Arc<Identifier>, topic: Arc<Identifier>,
msgs: Vec<IggyMessage>, partitioning: Option<Arc<Partitioning>>) -> Result<(),
iggy_common::IggyError>;
+ async fn shutdown(&mut self);
+}
+
+pub struct BackgroundDispatcher {
+ dispatcher: ProducerDispatcher,
+}
+
+#[maybe_async::async_impl]
+impl Dispatcher for BackgroundDispatcher {
+ async fn send(&self, stream: Arc<Identifier>, topic: Arc<Identifier>,
msgs: Vec<IggyMessage>, partitioning: Option<Arc<Partitioning>>) -> Result<(),
iggy_common::IggyError> {
+ self.dispatcher.dispatch(msgs, stream, topic, partitioning).await
+ }
+
+ async fn shutdown(&mut self) {
+ self.dispatcher.shutdown().await
+ }
+}
+
+pub struct DirectDispatcher<R: Runtime> {
+ core: Arc<ProducerCore<R>>,
+ config: DirectConfig,
+}
+
+impl<R: Runtime> DirectDispatcher<R> {
+ pub fn new(core: Arc<ProducerCore<R>>, config: DirectConfig) -> Self {
+ Self { core, config }
+ }
+}
+
+#[maybe_async::maybe_async]
+impl<R: Runtime + 'static> Dispatcher for DirectDispatcher<R> {
+ async fn send(&self, stream: Arc<Identifier>, topic: Arc<Identifier>, mut
msgs: Vec<IggyMessage>, partitioning: Option<Arc<Partitioning>>) -> Result<(),
iggy_common::IggyError> {
+ if msgs.is_empty() {
+ return Ok(());
+ }
+
+ // Handle linger time
+ let linger_time_micros = self.config.linger_time.as_micros();
+ if linger_time_micros > 0 {
+ self.core.wait_before_sending(
+ linger_time_micros,
+ self.core.last_sent_at.load(ORDERING)
+ ).await;
+ }
+
+ // Handle batching
+ let max_batch = if self.config.batch_length == 0 {
+ MAX_BATCH_LENGTH
+ } else {
+ self.config.batch_length as usize
+ };
+
+ while !msgs.is_empty() {
+ let batch_size = max_batch.min(msgs.len());
+
+ // Split off the batch from the front of the vector
+ let chunk: Vec<IggyMessage> = msgs.drain(0..batch_size).collect();
+
+ if let Err(err) = self.core.send_raw(&stream, &topic, chunk,
partitioning.clone()).await {
+ // Return remaining messages as failed
+ return Err(self.core.make_failed_error(err, msgs));
+ }
+
+ self.core.last_sent_at.store(IggyTimestamp::now().into(),
ORDERING);
+ // No need to update index since we're draining from the front
+ }
+
+ Ok(())
+ }
+
+ async fn shutdown(&mut self) {}
+}
+
+
#[cfg(test)]
use mockall::automock;
@@ -72,7 +148,6 @@ pub struct ProducerCore<R: Runtime> {
last_sent_at: Arc<AtomicU64>,
send_retries_count: Option<u32>,
send_retries_interval: Option<IggyDuration>,
- direct_config: Option<DirectConfig>,
rt: Arc<R>,
}
@@ -155,7 +230,8 @@ impl<R: Runtime> ProducerCore<R> {
let can_send = self.can_send.clone();
- tokio::spawn(async move {
+ #[cfg(feature = "async")]
+ self.rt.spawn(Box::new(async move {
while let Some(event) = receiver.next().await {
trace!("Received diagnostic event: {event}");
match event {
@@ -179,7 +255,35 @@ impl<R: Runtime> ProducerCore<R> {
}
}
}
- });
+ }));
+
+ #[cfg(feature = "sync")]
+ // TODO change async-broadcast to sync impl
+ self.rt.spawn(Box::new(move || {
+ // while let Some(event) = receiver.next() {
+ // trace!("Received diagnostic event: {event}");
+ // match event {
+ // DiagnosticEvent::Shutdown => {
+ // can_send.store(false, ORDERING);
+ // warn!("Client has been shutdown");
+ // }
+ // DiagnosticEvent::Connected => {
+ // can_send.store(false, ORDERING);
+ // trace!("Connected to the server");
+ // }
+ // DiagnosticEvent::Disconnected => {
+ // can_send.store(false, ORDERING);
+ // warn!("Disconnected from the server");
+ // }
+ // DiagnosticEvent::SignedIn => {
+ // can_send.store(true, ORDERING);
+ // }
+ // DiagnosticEvent::SignedOut => {
+ // can_send.store(false, ORDERING);
+ // }
+ // }
+ // }
+ }));
}
#[maybe_async::maybe_async]
@@ -363,11 +467,10 @@ impl<R: Runtime> ProducerCore<R> {
topic_name: self.topic_name.clone(),
}
}
-}
-#[maybe_async::maybe_async]
-impl<R: Runtime + 'static> ProducerCoreBackend for ProducerCore<R> {
- async fn send_internal(
+ /// Raw send method without batching or linger time - for use by
dispatchers
+ #[maybe_async::maybe_async]
+ pub async fn send_raw(
&self,
stream: &Identifier,
topic: &Identifier,
@@ -389,53 +492,32 @@ impl<R: Runtime + 'static> ProducerCoreBackend for
ProducerCore<R> {
}
};
- match &self.direct_config {
- Some(cfg) => {
- let linger_time_micros = cfg.linger_time.as_micros();
- if linger_time_micros > 0 {
- self.wait_before_sending(linger_time_micros,
self.last_sent_at.load(ORDERING))
- .await;
- }
-
- let max = if cfg.batch_length == 0 {
- MAX_BATCH_LENGTH
- } else {
- cfg.batch_length as usize
- };
- let mut index = 0;
- while index < msgs.len() {
- let end = (index + max).min(msgs.len());
- let chunk = &mut msgs[index..end];
-
- if let Err(err) = self.try_send_messages(stream, topic,
&part, chunk).await {
- let failed_tail = msgs.split_off(index);
- return Err(self.make_failed_error(err, failed_tail));
- }
- self.last_sent_at
- .store(IggyTimestamp::now().into(), ORDERING);
- index = end;
- }
- }
- // background send on
- _ => {
- self.try_send_messages(stream, topic, &part, &mut msgs)
- .await
- .map_err(|err| self.make_failed_error(err, msgs))?;
- self.last_sent_at
- .store(IggyTimestamp::now().into(), ORDERING);
- }
- }
+ self.try_send_messages(stream, topic, &part, &mut msgs)
+ .await
+ .map_err(|err| self.make_failed_error(err, msgs))
+ }
+}
- Ok(())
+#[maybe_async::maybe_async]
+impl<R: Runtime + 'static> ProducerCoreBackend for ProducerCore<R> {
+ async fn send_internal(
+ &self,
+ stream: &Identifier,
+ topic: &Identifier,
+ msgs: Vec<IggyMessage>,
+ partitioning: Option<Arc<Partitioning>>,
+ ) -> Result<(), IggyError> {
+ // Simple pass-through to send_raw for backward compatibility
+ self.send_raw(stream, topic, msgs, partitioning).await
}
}
-pub struct IggyProducer<R: Runtime> {
+pub struct IggyProducer<R: Runtime, D: Dispatcher> {
core: Arc<ProducerCore<R>>,
- dispatcher: Option<ProducerDispatcher>,
+ dispatcher: D,
}
-impl<R: Runtime> IggyProducer<R> {
+impl<R: Runtime + 'static, D: Dispatcher> IggyProducer<R, D> {
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
client: IggySharedMut<ClientWrapper>,
@@ -454,8 +536,8 @@ impl<R: Runtime> IggyProducer<R> {
topic_max_size: MaxTopicSize,
send_retries_count: Option<u32>,
send_retries_interval: Option<IggyDuration>,
- mode: SendMode,
rt: Arc<R>,
+ dispatcher: D,
) -> Self {
let core = Arc::new(ProducerCore {
initialized: AtomicBool::new(false),
@@ -478,16 +560,8 @@ impl<R: Runtime> IggyProducer<R> {
last_sent_at: Arc::new(AtomicU64::new(0)),
send_retries_count,
send_retries_interval,
- direct_config: match mode {
- SendMode::Direct(ref cfg) => Some(cfg.clone()),
- _ => None,
- },
rt,
});
- let dispatcher = match mode {
- SendMode::Background(cfg) =>
Some(ProducerDispatcher::new(core.clone(), cfg)),
- _ => None,
- };
Self { core, dispatcher }
}
@@ -508,6 +582,7 @@ impl<R: Runtime> IggyProducer<R> {
self.core.init().await
}
+ #[maybe_async::maybe_async]
pub async fn send(&self, messages: Vec<IggyMessage>) -> Result<(),
IggyError> {
if messages.is_empty() {
trace!("No messages to send.");
@@ -517,20 +592,15 @@ impl<R: Runtime> IggyProducer<R> {
let stream_id = self.core.stream_id.clone();
let topic_id = self.core.topic_id.clone();
- match &self.dispatcher {
- Some(disp) => disp.dispatch(messages, stream_id, topic_id,
None).await,
- None => {
- self.core
- .send_internal(&stream_id, &topic_id, messages, None)
- .await
- }
- }
+ self.dispatcher.send(stream_id, topic_id, messages, None).await
}
+ #[maybe_async::maybe_async]
pub async fn send_one(&self, message: IggyMessage) -> Result<(),
IggyError> {
self.send(vec![message]).await
}
+ #[maybe_async::maybe_async]
pub async fn send_with_partitioning(
&self,
messages: Vec<IggyMessage>,
@@ -544,19 +614,10 @@ impl<R: Runtime> IggyProducer<R> {
let stream_id = self.core.stream_id.clone();
let topic_id = self.core.topic_id.clone();
- match &self.dispatcher {
- Some(disp) => {
- disp.dispatch(messages, stream_id, topic_id, partitioning)
- .await
- }
- None => {
- self.core
- .send_internal(&stream_id, &topic_id, messages,
partitioning)
- .await
- }
- }
+ self.dispatcher.send(stream_id, topic_id, messages, partitioning).await
}
+ #[maybe_async::maybe_async]
pub async fn send_to(
&self,
stream: Arc<Identifier>,
@@ -569,19 +630,11 @@ impl<R: Runtime> IggyProducer<R> {
return Ok(());
}
- match &self.dispatcher {
- Some(disp) => disp.dispatch(messages, stream, topic,
partitioning).await,
- None => {
- self.core
- .send_internal(&stream, &topic, messages, partitioning)
- .await
- }
- }
+ self.dispatcher.send(stream, topic, messages, partitioning).await
}
- pub async fn shutdown(self) {
- if let Some(disp) = self.dispatcher {
- disp.shutdown().await;
- }
+ #[maybe_async::maybe_async]
+ pub async fn shutdown(&mut self) {
+ self.dispatcher.shutdown().await
}
}
diff --git a/core/sdk/src/clients/producer_builder.rs
b/core/sdk/src/clients/producer_builder.rs
index 03b9a8d6..b9866aac 100644
--- a/core/sdk/src/clients/producer_builder.rs
+++ b/core/sdk/src/clients/producer_builder.rs
@@ -17,7 +17,8 @@
use crate::client_wrappers::ClientWrapper;
use crate::clients::producer_config::{BackgroundConfig, DirectConfig};
-use crate::prelude::IggyProducer;
+use crate::clients::producer::{IggyProducer, DirectDispatcher,
BackgroundDispatcher};
+use crate::runtime::Runtime;
use iggy_common::locking::IggySharedMut;
use iggy_common::{
EncryptorKind, Identifier, IggyDuration, IggyExpiry, MaxTopicSize,
Partitioner, Partitioning,
diff --git a/core/sdk/src/clients/producer_dispatcher.rs
b/core/sdk/src/clients/producer_dispatcher.rs
index fa6dded4..0a1e88bd 100644
--- a/core/sdk/src/clients/producer_dispatcher.rs
+++ b/core/sdk/src/clients/producer_dispatcher.rs
@@ -33,7 +33,7 @@ pub struct ProducerDispatcher {
slots_permit: Arc<Semaphore>,
bytes_permit: Arc<Semaphore>,
stop_tx: broadcast::Sender<()>,
- _join_handle: JoinHandle<()>,
+ _join_handle: Option<JoinHandle<()>>,
}
impl ProducerDispatcher {
@@ -98,7 +98,7 @@ impl ProducerDispatcher {
bytes_permit: Arc::new(Semaphore::new(bytes_permit)),
slots_permit: Arc::new(Semaphore::new(slot_permit)),
stop_tx,
- _join_handle: handle,
+ _join_handle: Some(handle),
}
}
@@ -211,21 +211,20 @@ impl ProducerDispatcher {
.await
}
- pub async fn shutdown(mut self) {
+ pub async fn shutdown(&mut self) {
if self.closed.swap(true, Ordering::Relaxed) {
return;
}
-
let _ = self.stop_tx.send(());
-
- for shard in self.shards.drain(..) {
+ for shard in std::mem::take(&mut self.shards).into_iter() {
if let Err(e) = shard._handle.await {
tracing::error!("shard panicked: {e:?}");
}
}
-
- if let Err(e) = self._join_handle.await {
- tracing::error!("error-worker panicked: {e:?}");
+ if let Some(j) = self._join_handle.take() {
+ if let Err(e) = j.await {
+ tracing::error!("error-worker panicked: {e:?}");
+ }
}
}
}