This is an automated email from the ASF dual-hosted git repository.
bashirbekov pushed a commit to branch feat/add-background-send
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/feat/add-background-send by
this push:
new 1685b879 del
1685b879 is described below
commit 1685b8798cd95f7b1b9db2784866d0e0f79d47ed
Author: haze518 <[email protected]>
AuthorDate: Thu May 29 06:06:08 2025 +0600
del
---
core/sdk/src/clients/producer.rs | 133 +++++++++++++++++++++------------------
1 file changed, 71 insertions(+), 62 deletions(-)
diff --git a/core/sdk/src/clients/producer.rs b/core/sdk/src/clients/producer.rs
index ba2ccc94..d9355358 100644
--- a/core/sdk/src/clients/producer.rs
+++ b/core/sdk/src/clients/producer.rs
@@ -1,4 +1,4 @@
-use super::send_mode::{shardMessage, BackgroundConfig, BackpressureMode,
Dispatcher, SendMode};
+use super::send_mode::{BackgroundConfig, BackpressureMode, Dispatcher,
SendMode, shardMessage};
/* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -36,6 +36,20 @@ use tokio::task::JoinHandle;
use tokio::time::{Interval, sleep};
use tracing::{error, info, trace, warn};
+pub trait SendStrategy: Send + Sync + 'static {
+ fn send_batch(
+ &self,
+ stream: &Identifier,
+ topic: &Identifier,
+ msgs: Vec<IggyMessage>,
+ partitioning: Option<Arc<Partitioning>>,
+ ) -> impl Future<Output = Result<(), IggyError>> + Send;
+}
+
+pub trait AsyncSendStrategy: SendStrategy {
+ fn flush(&self) -> Result<(), IggyError>;
+}
+
pub struct ProducerCore {
initialized: AtomicBool,
can_send: Arc<AtomicBool>,
@@ -70,10 +84,11 @@ pub struct ProducerCore {
impl ProducerCore {
pub async fn init(&self) -> Result<(), IggyError> {
- if self.initialized.compare_exchange(
- false, true,
- Ordering::SeqCst, Ordering::SeqCst,
- ).is_err() {
+ if self
+ .initialized
+ .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
+ .is_err()
+ {
return Ok(());
}
@@ -376,7 +391,6 @@ impl ProducerCore {
}
}
-
pub trait ErrorCallback: Send + Sync + Debug {
fn call(&self, error: IggyError, messages: Vec<IggyMessage>);
}
@@ -415,37 +429,37 @@ impl IggyProducer {
error_callback: Option<Arc<dyn ErrorCallback>>,
) -> Self {
let core = Arc::new(ProducerCore {
- initialized: AtomicBool::new(true),
- client: Arc::new(client),
- can_send: Arc::new(AtomicBool::new(true)),
- stream_id: Arc::new(stream),
- stream_name,
- topic_id: Arc::new(topic),
- topic_name,
- batch_length,
- partitioning: partitioning.map(Arc::new),
- encryptor,
- partitioner,
- linger_time_micros: linger_time.map_or(0, |i| i.as_micros()),
- create_stream_if_not_exists,
- create_topic_if_not_exists,
- topic_partitions_count,
- topic_replication_factor,
- topic_message_expiry,
- topic_max_size,
- default_partitioning: Arc::new(Partitioning::balanced()),
- can_send_immediately: linger_time.is_none(),
- last_sent_at: Arc::new(AtomicU64::new(0)),
- send_retries_count,
- send_retries_interval,
- _join_handle: None,
- sema: Arc::new(Semaphore::new(10)),
- sender: None,
- error_callback,
- shard_number: default_shard_count(),
- });
+ initialized: AtomicBool::new(true),
+ client: Arc::new(client),
+ can_send: Arc::new(AtomicBool::new(true)),
+ stream_id: Arc::new(stream),
+ stream_name,
+ topic_id: Arc::new(topic),
+ topic_name,
+ batch_length,
+ partitioning: partitioning.map(Arc::new),
+ encryptor,
+ partitioner,
+ linger_time_micros: linger_time.map_or(0, |i| i.as_micros()),
+ create_stream_if_not_exists,
+ create_topic_if_not_exists,
+ topic_partitions_count,
+ topic_replication_factor,
+ topic_message_expiry,
+ topic_max_size,
+ default_partitioning: Arc::new(Partitioning::balanced()),
+ can_send_immediately: linger_time.is_none(),
+ last_sent_at: Arc::new(AtomicU64::new(0)),
+ send_retries_count,
+ send_retries_interval,
+ _join_handle: None,
+ sema: Arc::new(Semaphore::new(10)),
+ sender: None,
+ error_callback,
+ shard_number: default_shard_count(),
+ });
let num_shards = default_shard_count(); // todo потом заменмить на
норм значепние
- let config = BackgroundConfig{
+ let config = BackgroundConfig {
max_in_flight: 4,
in_flight_timeout: None,
batch_size: None,
@@ -489,24 +503,23 @@ impl IggyProducer {
match &self.send_mode {
SendMode::Sync => {
- self.core.send_internal(&stream_id, &topic_id, messages,
None).await
-
- }
- SendMode::Background => {
- match &self.dispatcher {
- Some(disp) => {
- disp.dispatch(shardMessage{
- messages,
- stream: stream_id,
- topic: topic_id,
- partitioning: None,
- }).await.map_err(|err| IggyError::BackgroundSendError)
- }
- None => Ok(())
- }
+ self.core
+ .send_internal(&stream_id, &topic_id, messages, None)
+ .await
}
+ SendMode::Background => match &self.dispatcher {
+ Some(disp) => disp
+ .dispatch(shardMessage {
+ messages,
+ stream: stream_id,
+ topic: topic_id,
+ partitioning: None,
+ })
+ .await
+ .map_err(|err| IggyError::BackgroundSendError),
+ None => Ok(()),
+ },
}
-
}
pub async fn send_one(&self, message: IggyMessage) -> Result<(),
IggyError> {
@@ -526,7 +539,9 @@ impl IggyProducer {
let stream_id = &self.core.stream_id;
let topic_id = &self.core.topic_id;
- self.core.send_internal(stream_id, topic_id, messages,
partitioning).await
+ self.core
+ .send_internal(stream_id, topic_id, messages, partitioning)
+ .await
}
pub async fn send_to(
@@ -541,7 +556,9 @@ impl IggyProducer {
return Ok(());
}
- self.core.send_internal(&stream, &topic, messages, partitioning).await
+ self.core
+ .send_internal(&stream, &topic, messages, partitioning)
+ .await
}
}
@@ -549,11 +566,3 @@ fn default_shard_count() -> usize {
let cpus = num_cpus::get();
cpus.clamp(2, 16)
}
-
-fn default_sharder(msg: &IggyMessage) -> &[u8] {
- if let Some(h) = &msg.user_headers {
- return &h[..h.len().min(16)];
- }
-
- &msg.payload[..msg.payload.len().min(16)]
-}