This is an automated email from the ASF dual-hosted git repository. bashirbekov pushed a commit to branch add-sync-client in repository https://gitbox.apache.org/repos/asf/iggy.git
commit dae90f3c60606db9ba430677fa119d7fa8ead2f3 Author: haze518 <[email protected]> AuthorDate: Mon Sep 22 08:34:58 2025 +0600 first step --- Cargo.lock | 2 + core/binary_protocol/Cargo.toml | 5 ++ .../src/cli/binary_client/get_client.rs | 1 + .../src/cli/binary_client/get_clients.rs | 1 + .../create_consumer_group.rs | 1 + .../delete_consumer_group.rs | 1 + .../binary_consumer_groups/get_consumer_group.rs | 1 + .../binary_consumer_groups/get_consumer_groups.rs | 1 + .../binary_consumer_offsets/get_consumer_offset.rs | 1 + .../binary_consumer_offsets/set_consumer_offset.rs | 1 + .../src/cli/binary_context/common.rs | 46 ++++++++++++++-- .../src/cli/binary_context/get_contexts.rs | 1 + .../src/cli/binary_context/use_context.rs | 1 + .../src/cli/binary_message/flush_messages.rs | 1 + .../src/cli/binary_message/poll_messages.rs | 27 +++++++--- .../src/cli/binary_message/send_messages.rs | 9 ++++ .../src/cli/binary_partitions/create_partitions.rs | 1 + .../src/cli/binary_partitions/delete_partitions.rs | 1 + .../create_personal_access_token.rs | 1 + .../delete_personal_access_tokens.rs | 1 + .../get_personal_access_tokens.rs | 1 + .../src/cli/binary_segments/delete_segments.rs | 1 + .../src/cli/binary_streams/create_stream.rs | 1 + .../src/cli/binary_streams/delete_stream.rs | 1 + .../src/cli/binary_streams/get_stream.rs | 1 + .../src/cli/binary_streams/get_streams.rs | 1 + .../src/cli/binary_streams/purge_stream.rs | 1 + .../src/cli/binary_streams/update_stream.rs | 1 + .../binary_protocol/src/cli/binary_system/login.rs | 1 + .../src/cli/binary_system/logout.rs | 1 + core/binary_protocol/src/cli/binary_system/me.rs | 1 + core/binary_protocol/src/cli/binary_system/ping.rs | 1 + .../src/cli/binary_system/snapshot.rs | 9 ++++ .../binary_protocol/src/cli/binary_system/stats.rs | 1 + .../src/cli/binary_topics/create_topic.rs | 1 + .../src/cli/binary_topics/delete_topic.rs | 1 + .../src/cli/binary_topics/get_topic.rs | 1 + .../src/cli/binary_topics/get_topics.rs | 1 + .../src/cli/binary_topics/purge_topic.rs | 1 + .../src/cli/binary_topics/update_topic.rs | 1 + .../src/cli/binary_users/change_password.rs | 1 + .../src/cli/binary_users/create_user.rs | 1 + .../src/cli/binary_users/delete_user.rs | 1 + .../src/cli/binary_users/get_user.rs | 1 + .../src/cli/binary_users/get_users.rs | 1 + .../src/cli/binary_users/update_permissions.rs | 1 + .../src/cli/binary_users/update_user.rs | 1 + core/binary_protocol/src/cli/cli_command.rs | 1 + .../src/client/binary_clients/binary_client.rs | 1 + .../src/client/binary_clients/client.rs | 1 + .../client/binary_clients/consumer_group_client.rs | 1 + .../binary_clients/consumer_offset_client.rs | 1 + .../src/client/binary_clients/message_client.rs | 1 + .../src/client/binary_clients/partition_client.rs | 1 + .../binary_clients/personal_access_token_client.rs | 1 + .../src/client/binary_clients/segment_client.rs | 1 + .../src/client/binary_clients/stream_client.rs | 1 + .../src/client/binary_clients/system_client.rs | 1 + .../src/client/binary_clients/topic_client.rs | 1 + .../src/client/binary_clients/user_client.rs | 1 + .../src/client/binary_consumer_groups/mod.rs | 1 + .../src/client/binary_consumer_offsets/mod.rs | 1 + .../src/client/binary_messages/mod.rs | 1 + .../src/client/binary_partitions/mod.rs | 1 + .../client/binary_personal_access_tokens/mod.rs | 1 + .../src/client/binary_segments/mod.rs | 1 + .../src/client/binary_streams/mod.rs | 1 + .../src/client/binary_system/mod.rs | 1 + .../src/client/binary_topics/mod.rs | 1 + .../src/client/binary_transport/mod.rs | 2 + .../binary_protocol/src/client/binary_users/mod.rs | 1 + core/binary_protocol/src/utils/auth.rs | 1 + core/common/Cargo.toml | 3 +- core/common/src/locking/mod.rs | 31 +++++++++++ core/common/src/locking/std_sync_lock.rs | 42 +++++++++++++++ core/sdk/Cargo.toml | 3 ++ core/sdk/src/client_wrappers/binary_client.rs | 1 + .../binary_consumer_group_client.rs | 14 +++++ .../binary_consumer_offset_client.rs | 1 + .../src/client_wrappers/binary_message_client.rs | 1 + .../src/client_wrappers/binary_partition_client.rs | 1 + .../binary_personal_access_token_client.rs | 1 + .../src/client_wrappers/binary_segment_client.rs | 1 + .../src/client_wrappers/binary_stream_client.rs | 1 + .../src/client_wrappers/binary_system_client.rs | 1 + .../sdk/src/client_wrappers/binary_topic_client.rs | 1 + core/sdk/src/client_wrappers/binary_user_client.rs | 1 + core/sdk/src/clients/binary_consumer_group.rs | 9 ++++ core/sdk/src/clients/binary_consumer_offset.rs | 1 + core/sdk/src/clients/binary_message.rs | 1 + core/sdk/src/clients/binary_partitions.rs | 1 + .../src/clients/binary_personal_access_tokens.rs | 1 + core/sdk/src/clients/binary_segments.rs | 1 + core/sdk/src/clients/binary_streams.rs | 1 + core/sdk/src/clients/binary_system.rs | 1 + core/sdk/src/clients/binary_topics.rs | 1 + core/sdk/src/clients/binary_users.rs | 1 + core/sdk/src/lib.rs | 2 + core/sdk/src/prelude.rs | 5 ++ .../src/stream_builder/build/build_iggy_client.rs | 1 + .../stream_builder/build/build_iggy_consumer.rs | 1 + .../stream_builder/build/build_iggy_producer.rs | 1 + .../src/stream_builder/build/build_stream_topic.rs | 1 + core/sdk/src/stream_builder/iggy_stream.rs | 3 ++ .../sdk/src/stream_builder/iggy_stream_consumer.rs | 3 ++ .../sdk/src/stream_builder/iggy_stream_producer.rs | 3 ++ core/sdk/src/tcp/mod.rs | 7 ++- core/sdk/src/tcp/tcp_client.rs | 4 +- core/sdk/src/tcp/tcp_client_sync.rs | 62 +++++++++------------- 109 files changed, 327 insertions(+), 52 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index dc1ec265..893ef40d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3802,6 +3802,7 @@ dependencies = [ "futures-util", "iggy_binary_protocol", "iggy_common", + "maybe-async", "mockall", "num_cpus", "quinn", @@ -3930,6 +3931,7 @@ dependencies = [ "dirs", "iggy_common", "keyring", + "maybe-async", "passterm", "serde", "serde_json", diff --git a/core/binary_protocol/Cargo.toml b/core/binary_protocol/Cargo.toml index 9a7f57ec..9f3778c2 100644 --- a/core/binary_protocol/Cargo.toml +++ b/core/binary_protocol/Cargo.toml @@ -28,6 +28,10 @@ documentation = "https://iggy.apache.org/docs" repository = "https://github.com/apache/iggy" readme = "../../README.md" +[features] +sync = ["maybe-async/is_sync"] +async = [] + [dependencies] anyhow = { workspace = true } async-broadcast = { workspace = true } @@ -44,3 +48,4 @@ serde_json = { workspace = true } tokio = { workspace = true } toml = { workspace = true } tracing = { workspace = true } +maybe-async = "0.2.10" diff --git a/core/binary_protocol/src/cli/binary_client/get_client.rs b/core/binary_protocol/src/cli/binary_client/get_client.rs index 50237424..6b84c5ba 100644 --- a/core/binary_protocol/src/cli/binary_client/get_client.rs +++ b/core/binary_protocol/src/cli/binary_client/get_client.rs @@ -36,6 +36,7 @@ impl GetClientCmd { } } +#[maybe_async::maybe_async] #[async_trait] impl CliCommand for GetClientCmd { fn explain(&self) -> String { diff --git a/core/binary_protocol/src/cli/binary_client/get_clients.rs b/core/binary_protocol/src/cli/binary_client/get_clients.rs index d956bb6b..1cdc0fe6 100644 --- a/core/binary_protocol/src/cli/binary_client/get_clients.rs +++ b/core/binary_protocol/src/cli/binary_client/get_clients.rs @@ -52,6 +52,7 @@ impl Default for GetClientsCmd { } } +#[maybe_async::maybe_async] #[async_trait] impl CliCommand for GetClientsCmd { fn explain(&self) -> String { diff --git a/core/binary_protocol/src/cli/binary_consumer_groups/create_consumer_group.rs b/core/binary_protocol/src/cli/binary_consumer_groups/create_consumer_group.rs index e206b070..cc924ae0 100644 --- a/core/binary_protocol/src/cli/binary_consumer_groups/create_consumer_group.rs +++ b/core/binary_protocol/src/cli/binary_consumer_groups/create_consumer_group.rs @@ -53,6 +53,7 @@ impl CreateConsumerGroupCmd { } } +#[maybe_async::maybe_async] #[async_trait] impl CliCommand for CreateConsumerGroupCmd { fn explain(&self) -> String { diff --git a/core/binary_protocol/src/cli/binary_consumer_groups/delete_consumer_group.rs b/core/binary_protocol/src/cli/binary_consumer_groups/delete_consumer_group.rs index bd113d7d..92fbd4ee 100644 --- a/core/binary_protocol/src/cli/binary_consumer_groups/delete_consumer_group.rs +++ b/core/binary_protocol/src/cli/binary_consumer_groups/delete_consumer_group.rs @@ -40,6 +40,7 @@ impl DeleteConsumerGroupCmd { } } +#[maybe_async::maybe_async] #[async_trait] impl CliCommand for DeleteConsumerGroupCmd { fn explain(&self) -> String { diff --git a/core/binary_protocol/src/cli/binary_consumer_groups/get_consumer_group.rs b/core/binary_protocol/src/cli/binary_consumer_groups/get_consumer_group.rs index c81dabc2..8f44db2c 100644 --- a/core/binary_protocol/src/cli/binary_consumer_groups/get_consumer_group.rs +++ b/core/binary_protocol/src/cli/binary_consumer_groups/get_consumer_group.rs @@ -41,6 +41,7 @@ impl GetConsumerGroupCmd { } } +#[maybe_async::maybe_async] #[async_trait] impl CliCommand for GetConsumerGroupCmd { fn explain(&self) -> String { diff --git a/core/binary_protocol/src/cli/binary_consumer_groups/get_consumer_groups.rs b/core/binary_protocol/src/cli/binary_consumer_groups/get_consumer_groups.rs index 8b5f88b8..a799baf7 100644 --- a/core/binary_protocol/src/cli/binary_consumer_groups/get_consumer_groups.rs +++ b/core/binary_protocol/src/cli/binary_consumer_groups/get_consumer_groups.rs @@ -63,6 +63,7 @@ impl GetConsumerGroupsCmd { } } +#[maybe_async::maybe_async] #[async_trait] impl CliCommand for GetConsumerGroupsCmd { fn explain(&self) -> String { diff --git a/core/binary_protocol/src/cli/binary_consumer_offsets/get_consumer_offset.rs b/core/binary_protocol/src/cli/binary_consumer_offsets/get_consumer_offset.rs index 3b5bf893..540bbf09 100644 --- a/core/binary_protocol/src/cli/binary_consumer_offsets/get_consumer_offset.rs +++ b/core/binary_protocol/src/cli/binary_consumer_offsets/get_consumer_offset.rs @@ -62,6 +62,7 @@ impl GetConsumerOffsetCmd { } } +#[maybe_async::maybe_async] #[async_trait] impl CliCommand for GetConsumerOffsetCmd { fn explain(&self) -> String { diff --git a/core/binary_protocol/src/cli/binary_consumer_offsets/set_consumer_offset.rs b/core/binary_protocol/src/cli/binary_consumer_offsets/set_consumer_offset.rs index 3c200a02..9b87543d 100644 --- a/core/binary_protocol/src/cli/binary_consumer_offsets/set_consumer_offset.rs +++ b/core/binary_protocol/src/cli/binary_consumer_offsets/set_consumer_offset.rs @@ -52,6 +52,7 @@ impl SetConsumerOffsetCmd { } } +#[maybe_async::maybe_async] #[async_trait] impl CliCommand for SetConsumerOffsetCmd { fn explain(&self) -> String { diff --git a/core/binary_protocol/src/cli/binary_context/common.rs b/core/binary_protocol/src/cli/binary_context/common.rs index 1d96e5ae..e4b1996f 100644 --- a/core/binary_protocol/src/cli/binary_context/common.rs +++ b/core/binary_protocol/src/cli/binary_context/common.rs @@ -19,7 +19,8 @@ use anyhow::{Context, Result, bail}; use dirs::home_dir; use serde::{Deserialize, Serialize}; -use std::path::PathBuf; +use std::io; +use std::path::{Path, PathBuf}; use std::{collections::HashMap, env::var, path}; use tokio::join; @@ -33,6 +34,24 @@ pub(crate) static DEFAULT_CONTEXT_NAME: &str = "default"; pub type ContextsConfigMap = HashMap<String, ContextConfig>; +#[maybe_async::maybe_async] +async fn read_to_string<P: AsRef<Path>>(path: P) -> Result<String, io::Error> { + #[cfg(not(feature = "sync"))] + return tokio::fs::read_to_string(path).await; + + #[cfg(feature = "sync")] + return std::fs::read_to_string(path); +} + +#[maybe_async::maybe_async] +async fn write<P: AsRef<Path>, C: AsRef<[u8]>>(path: P, contents: C) -> Result<(), io::Error> { + #[cfg(not(feature = "sync"))] + return tokio::fs::write(path, contents).await; + + #[cfg(feature = "sync")] + return std::fs::write(path, contents); +} + #[derive(Deserialize, Serialize, Clone, Debug, Default)] pub struct ContextConfig { #[serde(skip_serializing_if = "Option::is_none")] @@ -84,6 +103,7 @@ impl ContextManager { } } + #[maybe_async::maybe_async] pub async fn get_active_context(&mut self) -> Result<ContextConfig> { let active_context_key = self.get_active_context_key().await?; let contexts = self.get_contexts().await?; @@ -95,6 +115,7 @@ impl ContextManager { Ok(active_context.clone()) } + #[maybe_async::maybe_async] pub async fn set_active_context_key(&mut self, context_name: &str) -> Result<()> { self.get_context_state().await?; let cs = self.context_state.take().unwrap(); @@ -116,23 +137,34 @@ impl ContextManager { Ok(()) } + #[maybe_async::maybe_async] pub async fn get_active_context_key(&mut self) -> Result<String> { let context_state = self.get_context_state().await?; Ok(context_state.active_context.clone()) } + #[maybe_async::maybe_async] pub async fn get_contexts(&mut self) -> Result<ContextsConfigMap> { let context_state = self.get_context_state().await?; Ok(context_state.contexts.clone()) } + #[maybe_async::maybe_async] async fn get_context_state(&mut self) -> Result<&ContextState> { if self.context_state.is_none() { + + #[cfg(not(feature = "sync"))] let (active_context_res, contexts_res) = join!( self.context_rw.read_active_context(), self.context_rw.read_contexts() ); + #[cfg(feature = "sync")] + let (active_context_res, contexts_res) = ( + self.context_rw.read_active_context(), + self.context_rw.read_contexts() + ); + let (maybe_active_context, maybe_contexts) = active_context_res .and_then(|a| contexts_res.map(|b| (a, b))) .context("could not read context state")?; @@ -170,11 +202,12 @@ impl ContextReaderWriter { Self { iggy_home } } + #[maybe_async::maybe_async] pub async fn read_contexts(&self) -> Result<Option<ContextsConfigMap>> { let maybe_contexts_path = &self.contexts_path(); if let Some(contexts_path) = maybe_contexts_path { - let maybe_contents = tokio::fs::read_to_string(contexts_path) + let maybe_contents = read_to_string(contexts_path) .await .map(Some) .or_else(|err| { @@ -205,6 +238,7 @@ impl ContextReaderWriter { } } + #[maybe_async::maybe_async] pub async fn write_contexts(&self, contexts: ContextsConfigMap) -> Result<()> { let maybe_contexts_path = self.contexts_path(); @@ -214,17 +248,18 @@ impl ContextReaderWriter { contexts_path.display() ))?; - tokio::fs::write(contexts_path, contents).await?; + write(contexts_path, contents).await?; } Ok(()) } + #[maybe_async::maybe_async] pub async fn read_active_context(&self) -> Result<Option<String>> { let maybe_active_context_path = self.active_context_path(); if let Some(active_context_path) = maybe_active_context_path { - tokio::fs::read_to_string(active_context_path.clone()) + read_to_string(active_context_path.clone()) .await .map(Some) .or_else(|err| { @@ -243,11 +278,12 @@ impl ContextReaderWriter { } } + #[maybe_async::maybe_async] pub async fn write_active_context(&self, context_name: &str) -> Result<()> { let maybe_active_context_path = self.active_context_path(); if let Some(active_context_path) = maybe_active_context_path { - tokio::fs::write(active_context_path.clone(), context_name) + write(active_context_path.clone(), context_name) .await .context(format!( "failed writing active context file {}", diff --git a/core/binary_protocol/src/cli/binary_context/get_contexts.rs b/core/binary_protocol/src/cli/binary_context/get_contexts.rs index edcb2ccf..c5f449a0 100644 --- a/core/binary_protocol/src/cli/binary_context/get_contexts.rs +++ b/core/binary_protocol/src/cli/binary_context/get_contexts.rs @@ -56,6 +56,7 @@ impl Default for GetContextsCmd { } } +#[maybe_async::maybe_async] #[async_trait] impl CliCommand for GetContextsCmd { fn explain(&self) -> String { diff --git a/core/binary_protocol/src/cli/binary_context/use_context.rs b/core/binary_protocol/src/cli/binary_context/use_context.rs index ff88b57c..885aa328 100644 --- a/core/binary_protocol/src/cli/binary_context/use_context.rs +++ b/core/binary_protocol/src/cli/binary_context/use_context.rs @@ -42,6 +42,7 @@ impl Default for UseContextCmd { } } +#[maybe_async::maybe_async] #[async_trait] impl CliCommand for UseContextCmd { fn explain(&self) -> String { diff --git a/core/binary_protocol/src/cli/binary_message/flush_messages.rs b/core/binary_protocol/src/cli/binary_message/flush_messages.rs index 7551ebbf..55eb1e5f 100644 --- a/core/binary_protocol/src/cli/binary_message/flush_messages.rs +++ b/core/binary_protocol/src/cli/binary_message/flush_messages.rs @@ -46,6 +46,7 @@ impl FlushMessagesCmd { } } +#[maybe_async::maybe_async] #[async_trait] impl CliCommand for FlushMessagesCmd { fn explain(&self) -> String { diff --git a/core/binary_protocol/src/cli/binary_message/poll_messages.rs b/core/binary_protocol/src/cli/binary_message/poll_messages.rs index d4867a4a..bb2560f7 100644 --- a/core/binary_protocol/src/cli/binary_message/poll_messages.rs +++ b/core/binary_protocol/src/cli/binary_message/poll_messages.rs @@ -28,6 +28,7 @@ use iggy_common::{ use std::collections::{HashMap, HashSet}; use tokio::io::AsyncWriteExt; use tracing::{Level, event}; +use std::io::Write; pub struct PollMessagesCmd { poll_messages: PollMessages, @@ -160,6 +161,7 @@ impl PollMessagesCmd { } } +#[maybe_async::maybe_async] #[async_trait] impl CliCommand for PollMessagesCmd { fn explain(&self) -> String { @@ -215,12 +217,25 @@ impl CliCommand for PollMessagesCmd { event!(target: PRINT_TARGET, Level::INFO, "Storing messages to {output_file} binary file"); let mut saved_size = IggyByteSize::default(); - let mut file = tokio::fs::OpenOptions::new() - .append(true) - .create(true) - .open(output_file) - .await - .with_context(|| format!("Problem opening file for writing: {output_file}"))?; + + #[cfg(not(feature = "sync"))] + let mut file = { + tokio::fs::OpenOptions::new() + .append(true) + .create(true) + .open(output_file) + .await + .with_context(|| format!("Problem opening file for writing: {output_file}"))? + }; + + #[cfg(feature = "sync")] + let mut file = { + std::fs::OpenOptions::new() + .append(true) + .create(true) + .open(output_file) + .with_context(|| format!("Problem opening file for writing: {output_file}"))? + }; for message in polled_messages.messages.iter() { let message = message.to_bytes(); diff --git a/core/binary_protocol/src/cli/binary_message/send_messages.rs b/core/binary_protocol/src/cli/binary_message/send_messages.rs index 6a51a66e..c5d9f124 100644 --- a/core/binary_protocol/src/cli/binary_message/send_messages.rs +++ b/core/binary_protocol/src/cli/binary_message/send_messages.rs @@ -83,6 +83,7 @@ impl SendMessagesCmd { } } +#[maybe_async::maybe_async] #[async_trait] impl CliCommand for SendMessagesCmd { fn explain(&self) -> String { @@ -94,11 +95,19 @@ impl CliCommand for SendMessagesCmd { async fn execute_cmd(&mut self, client: &dyn Client) -> anyhow::Result<(), anyhow::Error> { let mut messages = if let Some(input_file) = &self.input_file { + #[cfg(not(feature = "sync"))] let mut file = tokio::fs::OpenOptions::new() .read(true) .open(input_file) .await .with_context(|| format!("Problem opening file for reading: {input_file}"))?; + + #[cfg(feature = "sync")] + let mut file = std::fs::OpenOptions::new() + .read(true) + .open(input_file) + .with_context(|| format!("Problem opening file for reading: {input_file}"))?; + let mut buffer = Vec::new(); file.read_to_end(&mut buffer) .await diff --git a/core/binary_protocol/src/cli/binary_partitions/create_partitions.rs b/core/binary_protocol/src/cli/binary_partitions/create_partitions.rs index 0ddce4dd..508f0d18 100644 --- a/core/binary_protocol/src/cli/binary_partitions/create_partitions.rs +++ b/core/binary_protocol/src/cli/binary_partitions/create_partitions.rs @@ -40,6 +40,7 @@ impl CreatePartitionsCmd { } } +#[maybe_async::maybe_async] #[async_trait] impl CliCommand for CreatePartitionsCmd { fn explain(&self) -> String { diff --git a/core/binary_protocol/src/cli/binary_partitions/delete_partitions.rs b/core/binary_protocol/src/cli/binary_partitions/delete_partitions.rs index a05d6a07..eb9cbcd6 100644 --- a/core/binary_protocol/src/cli/binary_partitions/delete_partitions.rs +++ b/core/binary_protocol/src/cli/binary_partitions/delete_partitions.rs @@ -40,6 +40,7 @@ impl DeletePartitionsCmd { } } +#[maybe_async::maybe_async] #[async_trait] impl CliCommand for DeletePartitionsCmd { fn explain(&self) -> String { diff --git a/core/binary_protocol/src/cli/binary_personal_access_tokens/create_personal_access_token.rs b/core/binary_protocol/src/cli/binary_personal_access_tokens/create_personal_access_token.rs index e6ff07ea..0e443283 100644 --- a/core/binary_protocol/src/cli/binary_personal_access_tokens/create_personal_access_token.rs +++ b/core/binary_protocol/src/cli/binary_personal_access_tokens/create_personal_access_token.rs @@ -57,6 +57,7 @@ impl CreatePersonalAccessTokenCmd { } } +#[maybe_async::maybe_async] #[async_trait] impl CliCommand for CreatePersonalAccessTokenCmd { fn explain(&self) -> String { diff --git a/core/binary_protocol/src/cli/binary_personal_access_tokens/delete_personal_access_tokens.rs b/core/binary_protocol/src/cli/binary_personal_access_tokens/delete_personal_access_tokens.rs index d8ca5ce4..7202ff53 100644 --- a/core/binary_protocol/src/cli/binary_personal_access_tokens/delete_personal_access_tokens.rs +++ b/core/binary_protocol/src/cli/binary_personal_access_tokens/delete_personal_access_tokens.rs @@ -38,6 +38,7 @@ impl DeletePersonalAccessTokenCmd { } } +#[maybe_async::maybe_async] #[async_trait] impl CliCommand for DeletePersonalAccessTokenCmd { fn explain(&self) -> String { diff --git a/core/binary_protocol/src/cli/binary_personal_access_tokens/get_personal_access_tokens.rs b/core/binary_protocol/src/cli/binary_personal_access_tokens/get_personal_access_tokens.rs index 51617d99..d64924e1 100644 --- a/core/binary_protocol/src/cli/binary_personal_access_tokens/get_personal_access_tokens.rs +++ b/core/binary_protocol/src/cli/binary_personal_access_tokens/get_personal_access_tokens.rs @@ -43,6 +43,7 @@ impl GetPersonalAccessTokensCmd { } } +#[maybe_async::maybe_async] #[async_trait] impl CliCommand for GetPersonalAccessTokensCmd { fn explain(&self) -> String { diff --git a/core/binary_protocol/src/cli/binary_segments/delete_segments.rs b/core/binary_protocol/src/cli/binary_segments/delete_segments.rs index 64a0be28..380cbb20 100644 --- a/core/binary_protocol/src/cli/binary_segments/delete_segments.rs +++ b/core/binary_protocol/src/cli/binary_segments/delete_segments.rs @@ -45,6 +45,7 @@ impl DeleteSegmentsCmd { } } +#[maybe_async::maybe_async] #[async_trait] impl CliCommand for DeleteSegmentsCmd { fn explain(&self) -> String { diff --git a/core/binary_protocol/src/cli/binary_streams/create_stream.rs b/core/binary_protocol/src/cli/binary_streams/create_stream.rs index 46ed1d74..8b0fe4a4 100644 --- a/core/binary_protocol/src/cli/binary_streams/create_stream.rs +++ b/core/binary_protocol/src/cli/binary_streams/create_stream.rs @@ -42,6 +42,7 @@ impl CreateStreamCmd { } } +#[maybe_async::maybe_async] #[async_trait] impl CliCommand for CreateStreamCmd { fn explain(&self) -> String { diff --git a/core/binary_protocol/src/cli/binary_streams/delete_stream.rs b/core/binary_protocol/src/cli/binary_streams/delete_stream.rs index b1cf60ff..929cb9bf 100644 --- a/core/binary_protocol/src/cli/binary_streams/delete_stream.rs +++ b/core/binary_protocol/src/cli/binary_streams/delete_stream.rs @@ -36,6 +36,7 @@ impl DeleteStreamCmd { } } +#[maybe_async::maybe_async] #[async_trait] impl CliCommand for DeleteStreamCmd { fn explain(&self) -> String { diff --git a/core/binary_protocol/src/cli/binary_streams/get_stream.rs b/core/binary_protocol/src/cli/binary_streams/get_stream.rs index 64b4512e..19424ac0 100644 --- a/core/binary_protocol/src/cli/binary_streams/get_stream.rs +++ b/core/binary_protocol/src/cli/binary_streams/get_stream.rs @@ -37,6 +37,7 @@ impl GetStreamCmd { } } +#[maybe_async::maybe_async] #[async_trait] impl CliCommand for GetStreamCmd { fn explain(&self) -> String { diff --git a/core/binary_protocol/src/cli/binary_streams/get_streams.rs b/core/binary_protocol/src/cli/binary_streams/get_streams.rs index 15926a86..0db75c73 100644 --- a/core/binary_protocol/src/cli/binary_streams/get_streams.rs +++ b/core/binary_protocol/src/cli/binary_streams/get_streams.rs @@ -52,6 +52,7 @@ impl Default for GetStreamsCmd { } } +#[maybe_async::maybe_async] #[async_trait] impl CliCommand for GetStreamsCmd { fn explain(&self) -> String { diff --git a/core/binary_protocol/src/cli/binary_streams/purge_stream.rs b/core/binary_protocol/src/cli/binary_streams/purge_stream.rs index e633faa8..b7cc6e83 100644 --- a/core/binary_protocol/src/cli/binary_streams/purge_stream.rs +++ b/core/binary_protocol/src/cli/binary_streams/purge_stream.rs @@ -36,6 +36,7 @@ impl PurgeStreamCmd { } } +#[maybe_async::maybe_async] #[async_trait] impl CliCommand for PurgeStreamCmd { fn explain(&self) -> String { diff --git a/core/binary_protocol/src/cli/binary_streams/update_stream.rs b/core/binary_protocol/src/cli/binary_streams/update_stream.rs index 246ab5b1..d19efd65 100644 --- a/core/binary_protocol/src/cli/binary_streams/update_stream.rs +++ b/core/binary_protocol/src/cli/binary_streams/update_stream.rs @@ -36,6 +36,7 @@ impl UpdateStreamCmd { } } +#[maybe_async::maybe_async] #[async_trait] impl CliCommand for UpdateStreamCmd { fn explain(&self) -> String { diff --git a/core/binary_protocol/src/cli/binary_system/login.rs b/core/binary_protocol/src/cli/binary_system/login.rs index 1ec321c1..7d582bfd 100644 --- a/core/binary_protocol/src/cli/binary_system/login.rs +++ b/core/binary_protocol/src/cli/binary_system/login.rs @@ -41,6 +41,7 @@ impl LoginCmd { } } +#[maybe_async::maybe_async] #[async_trait] impl CliCommand for LoginCmd { fn explain(&self) -> String { diff --git a/core/binary_protocol/src/cli/binary_system/logout.rs b/core/binary_protocol/src/cli/binary_system/logout.rs index 8466dd73..12bc8b75 100644 --- a/core/binary_protocol/src/cli/binary_system/logout.rs +++ b/core/binary_protocol/src/cli/binary_system/logout.rs @@ -35,6 +35,7 @@ impl LogoutCmd { } } +#[maybe_async::maybe_async] #[async_trait] impl CliCommand for LogoutCmd { fn explain(&self) -> String { diff --git a/core/binary_protocol/src/cli/binary_system/me.rs b/core/binary_protocol/src/cli/binary_system/me.rs index fb6cfc75..cb71579e 100644 --- a/core/binary_protocol/src/cli/binary_system/me.rs +++ b/core/binary_protocol/src/cli/binary_system/me.rs @@ -40,6 +40,7 @@ impl Default for GetMeCmd { } } +#[maybe_async::maybe_async] #[async_trait] impl CliCommand for GetMeCmd { fn explain(&self) -> String { diff --git a/core/binary_protocol/src/cli/binary_system/ping.rs b/core/binary_protocol/src/cli/binary_system/ping.rs index 5718691a..9408711b 100644 --- a/core/binary_protocol/src/cli/binary_system/ping.rs +++ b/core/binary_protocol/src/cli/binary_system/ping.rs @@ -106,6 +106,7 @@ impl Display for PingStats { } } +#[maybe_async::maybe_async] #[async_trait] impl CliCommand for PingCmd { fn explain(&self) -> String { diff --git a/core/binary_protocol/src/cli/binary_system/snapshot.rs b/core/binary_protocol/src/cli/binary_system/snapshot.rs index 6a97b8c8..6c4074e1 100644 --- a/core/binary_protocol/src/cli/binary_system/snapshot.rs +++ b/core/binary_protocol/src/cli/binary_system/snapshot.rs @@ -16,6 +16,7 @@ * under the License. */ +use std::io::Write; use std::path::Path; use crate::Client; @@ -64,6 +65,7 @@ impl Default for GetSnapshotCmd { } } +#[maybe_async::maybe_async] #[async_trait] impl CliCommand for GetSnapshotCmd { fn explain(&self) -> String { @@ -84,10 +86,17 @@ impl CliCommand for GetSnapshotCmd { )); let file_size = snapshot_data.0.len(); + #[cfg(not(feature = "sync"))] let mut file = tokio::fs::File::create(&file_path) .await .with_context(|| format!("Failed to create file at {file_path:?}"))?; + + #[cfg(feature = "sync")] + let mut file = std::fs::File::create(&file_path) + .await + .with_context(|| format!("Failed to create file at {file_path:?}"))?; + file.write_all(&snapshot_data.0) .await .with_context(|| "Failed to write snapshot data to file".to_owned())?; diff --git a/core/binary_protocol/src/cli/binary_system/stats.rs b/core/binary_protocol/src/cli/binary_system/stats.rs index f61cf496..14480c8c 100644 --- a/core/binary_protocol/src/cli/binary_system/stats.rs +++ b/core/binary_protocol/src/cli/binary_system/stats.rs @@ -61,6 +61,7 @@ impl GetStatsCmd { } } +#[maybe_async::maybe_async] #[async_trait] impl CliCommand for GetStatsCmd { fn explain(&self) -> String { diff --git a/core/binary_protocol/src/cli/binary_topics/create_topic.rs b/core/binary_protocol/src/cli/binary_topics/create_topic.rs index 56a82387..658ce608 100644 --- a/core/binary_protocol/src/cli/binary_topics/create_topic.rs +++ b/core/binary_protocol/src/cli/binary_topics/create_topic.rs @@ -69,6 +69,7 @@ impl CreateTopicCmd { } } +#[maybe_async::maybe_async] #[async_trait] impl CliCommand for CreateTopicCmd { fn explain(&self) -> String { diff --git a/core/binary_protocol/src/cli/binary_topics/delete_topic.rs b/core/binary_protocol/src/cli/binary_topics/delete_topic.rs index e48665b5..25a24a46 100644 --- a/core/binary_protocol/src/cli/binary_topics/delete_topic.rs +++ b/core/binary_protocol/src/cli/binary_topics/delete_topic.rs @@ -39,6 +39,7 @@ impl DeleteTopicCmd { } } +#[maybe_async::maybe_async] #[async_trait] impl CliCommand for DeleteTopicCmd { fn explain(&self) -> String { diff --git a/core/binary_protocol/src/cli/binary_topics/get_topic.rs b/core/binary_protocol/src/cli/binary_topics/get_topic.rs index 29360506..b86b02a1 100644 --- a/core/binary_protocol/src/cli/binary_topics/get_topic.rs +++ b/core/binary_protocol/src/cli/binary_topics/get_topic.rs @@ -41,6 +41,7 @@ impl GetTopicCmd { } } +#[maybe_async::maybe_async] #[async_trait] impl CliCommand for GetTopicCmd { fn explain(&self) -> String { diff --git a/core/binary_protocol/src/cli/binary_topics/get_topics.rs b/core/binary_protocol/src/cli/binary_topics/get_topics.rs index 2f6c2089..e7899831 100644 --- a/core/binary_protocol/src/cli/binary_topics/get_topics.rs +++ b/core/binary_protocol/src/cli/binary_topics/get_topics.rs @@ -56,6 +56,7 @@ impl GetTopicsCmd { } } +#[maybe_async::maybe_async] #[async_trait] impl CliCommand for GetTopicsCmd { fn explain(&self) -> String { diff --git a/core/binary_protocol/src/cli/binary_topics/purge_topic.rs b/core/binary_protocol/src/cli/binary_topics/purge_topic.rs index 65447f77..b5723b49 100644 --- a/core/binary_protocol/src/cli/binary_topics/purge_topic.rs +++ b/core/binary_protocol/src/cli/binary_topics/purge_topic.rs @@ -39,6 +39,7 @@ impl PurgeTopicCmd { } } +#[maybe_async::maybe_async] #[async_trait] impl CliCommand for PurgeTopicCmd { fn explain(&self) -> String { diff --git a/core/binary_protocol/src/cli/binary_topics/update_topic.rs b/core/binary_protocol/src/cli/binary_topics/update_topic.rs index c9b8da2d..24186e91 100644 --- a/core/binary_protocol/src/cli/binary_topics/update_topic.rs +++ b/core/binary_protocol/src/cli/binary_topics/update_topic.rs @@ -59,6 +59,7 @@ impl UpdateTopicCmd { } } +#[maybe_async::maybe_async] #[async_trait] impl CliCommand for UpdateTopicCmd { fn explain(&self) -> String { diff --git a/core/binary_protocol/src/cli/binary_users/change_password.rs b/core/binary_protocol/src/cli/binary_users/change_password.rs index a64e8be3..64216fc0 100644 --- a/core/binary_protocol/src/cli/binary_users/change_password.rs +++ b/core/binary_protocol/src/cli/binary_users/change_password.rs @@ -48,6 +48,7 @@ impl ChangePasswordCmd { } } +#[maybe_async::maybe_async] #[async_trait] impl CliCommand for ChangePasswordCmd { fn explain(&self) -> String { diff --git a/core/binary_protocol/src/cli/binary_users/create_user.rs b/core/binary_protocol/src/cli/binary_users/create_user.rs index 5054f7c0..8b2ea835 100644 --- a/core/binary_protocol/src/cli/binary_users/create_user.rs +++ b/core/binary_protocol/src/cli/binary_users/create_user.rs @@ -47,6 +47,7 @@ impl CreateUserCmd { } } +#[maybe_async::maybe_async] #[async_trait] impl CliCommand for CreateUserCmd { fn explain(&self) -> String { diff --git a/core/binary_protocol/src/cli/binary_users/delete_user.rs b/core/binary_protocol/src/cli/binary_users/delete_user.rs index 23e027d6..26c678d0 100644 --- a/core/binary_protocol/src/cli/binary_users/delete_user.rs +++ b/core/binary_protocol/src/cli/binary_users/delete_user.rs @@ -36,6 +36,7 @@ impl DeleteUserCmd { } } +#[maybe_async::maybe_async] #[async_trait] impl CliCommand for DeleteUserCmd { fn explain(&self) -> String { diff --git a/core/binary_protocol/src/cli/binary_users/get_user.rs b/core/binary_protocol/src/cli/binary_users/get_user.rs index 13b2f07c..54833875 100644 --- a/core/binary_protocol/src/cli/binary_users/get_user.rs +++ b/core/binary_protocol/src/cli/binary_users/get_user.rs @@ -37,6 +37,7 @@ impl GetUserCmd { } } +#[maybe_async::maybe_async] #[async_trait] impl CliCommand for GetUserCmd { fn explain(&self) -> String { diff --git a/core/binary_protocol/src/cli/binary_users/get_users.rs b/core/binary_protocol/src/cli/binary_users/get_users.rs index 72f327bd..d57baa11 100644 --- a/core/binary_protocol/src/cli/binary_users/get_users.rs +++ b/core/binary_protocol/src/cli/binary_users/get_users.rs @@ -52,6 +52,7 @@ impl Default for GetUsersCmd { } } +#[maybe_async::maybe_async] #[async_trait] impl CliCommand for GetUsersCmd { fn explain(&self) -> String { diff --git a/core/binary_protocol/src/cli/binary_users/update_permissions.rs b/core/binary_protocol/src/cli/binary_users/update_permissions.rs index 395983da..741404af 100644 --- a/core/binary_protocol/src/cli/binary_users/update_permissions.rs +++ b/core/binary_protocol/src/cli/binary_users/update_permissions.rs @@ -39,6 +39,7 @@ impl UpdatePermissionsCmd { } } +#[maybe_async::maybe_async] #[async_trait] impl CliCommand for UpdatePermissionsCmd { fn explain(&self) -> String { diff --git a/core/binary_protocol/src/cli/binary_users/update_user.rs b/core/binary_protocol/src/cli/binary_users/update_user.rs index 16444c8e..17026cbe 100644 --- a/core/binary_protocol/src/cli/binary_users/update_user.rs +++ b/core/binary_protocol/src/cli/binary_users/update_user.rs @@ -61,6 +61,7 @@ impl UpdateUserCmd { } } +#[maybe_async::maybe_async] #[async_trait] impl CliCommand for UpdateUserCmd { fn explain(&self) -> String { diff --git a/core/binary_protocol/src/cli/cli_command.rs b/core/binary_protocol/src/cli/cli_command.rs index 41d440b6..a50f6342 100644 --- a/core/binary_protocol/src/cli/cli_command.rs +++ b/core/binary_protocol/src/cli/cli_command.rs @@ -22,6 +22,7 @@ use async_trait::async_trait; pub static PRINT_TARGET: &str = "iggy::cli::output"; +#[maybe_async::maybe_async] #[async_trait] pub trait CliCommand { fn explain(&self) -> String; diff --git a/core/binary_protocol/src/client/binary_clients/binary_client.rs b/core/binary_protocol/src/client/binary_clients/binary_client.rs index b0d5f29f..35926acd 100644 --- a/core/binary_protocol/src/client/binary_clients/binary_client.rs +++ b/core/binary_protocol/src/client/binary_clients/binary_client.rs @@ -21,5 +21,6 @@ use crate::client::binary_clients::client::Client; use async_trait::async_trait; /// A client that can send and receive binary messages. +#[maybe_async::maybe_async] #[async_trait] pub trait BinaryClient: BinaryTransport + Client {} diff --git a/core/binary_protocol/src/client/binary_clients/client.rs b/core/binary_protocol/src/client/binary_clients/client.rs index 64c2f87b..6e9f4d41 100644 --- a/core/binary_protocol/src/client/binary_clients/client.rs +++ b/core/binary_protocol/src/client/binary_clients/client.rs @@ -28,6 +28,7 @@ use std::fmt::Debug; /// The client trait which is the main interface to the Iggy server. /// It consists of multiple modules, each of which is responsible for a specific set of commands. /// Except the ping, login and get me, all the other methods require authentication. +#[maybe_async::maybe_async(AFIT)] #[async_trait] pub trait Client: SystemClient diff --git a/core/binary_protocol/src/client/binary_clients/consumer_group_client.rs b/core/binary_protocol/src/client/binary_clients/consumer_group_client.rs index d379ca56..1a9b811e 100644 --- a/core/binary_protocol/src/client/binary_clients/consumer_group_client.rs +++ b/core/binary_protocol/src/client/binary_clients/consumer_group_client.rs @@ -20,6 +20,7 @@ use async_trait::async_trait; use iggy_common::{ConsumerGroup, ConsumerGroupDetails, Identifier, IggyError}; /// This trait defines the methods to interact with the consumer group module. +#[maybe_async::maybe_async] #[async_trait] pub trait ConsumerGroupClient { /// Get the info about a specific consumer group by unique ID or name for the given stream and topic by unique IDs or names. diff --git a/core/binary_protocol/src/client/binary_clients/consumer_offset_client.rs b/core/binary_protocol/src/client/binary_clients/consumer_offset_client.rs index 07b33e7c..f140a9e0 100644 --- a/core/binary_protocol/src/client/binary_clients/consumer_offset_client.rs +++ b/core/binary_protocol/src/client/binary_clients/consumer_offset_client.rs @@ -20,6 +20,7 @@ use async_trait::async_trait; use iggy_common::{Consumer, ConsumerOffsetInfo, Identifier, IggyError}; /// This trait defines the methods to interact with the consumer offset module. +#[maybe_async::maybe_async] #[async_trait] pub trait ConsumerOffsetClient { /// Store the consumer offset for a specific consumer or consumer group for the given stream and topic by unique IDs or names. diff --git a/core/binary_protocol/src/client/binary_clients/message_client.rs b/core/binary_protocol/src/client/binary_clients/message_client.rs index 95bc9511..c626d3d8 100644 --- a/core/binary_protocol/src/client/binary_clients/message_client.rs +++ b/core/binary_protocol/src/client/binary_clients/message_client.rs @@ -21,6 +21,7 @@ use iggy_common::{ }; /// This trait defines the methods to interact with the messaging module. +#[maybe_async::maybe_async] #[async_trait] pub trait MessageClient { /// Poll given amount of messages using the specified consumer and strategy from the specified stream and topic by unique IDs or names. diff --git a/core/binary_protocol/src/client/binary_clients/partition_client.rs b/core/binary_protocol/src/client/binary_clients/partition_client.rs index 7f7324d9..20e2f933 100644 --- a/core/binary_protocol/src/client/binary_clients/partition_client.rs +++ b/core/binary_protocol/src/client/binary_clients/partition_client.rs @@ -20,6 +20,7 @@ use async_trait::async_trait; use iggy_common::{Identifier, IggyError}; /// This trait defines the methods to interact with the partition module. +#[maybe_async::maybe_async] #[async_trait] pub trait PartitionClient { /// Create new N partitions for a topic by unique ID or name. diff --git a/core/binary_protocol/src/client/binary_clients/personal_access_token_client.rs b/core/binary_protocol/src/client/binary_clients/personal_access_token_client.rs index fe2e90fe..50f7ddc5 100644 --- a/core/binary_protocol/src/client/binary_clients/personal_access_token_client.rs +++ b/core/binary_protocol/src/client/binary_clients/personal_access_token_client.rs @@ -23,6 +23,7 @@ use iggy_common::{ }; /// This trait defines the methods to interact with the personal access token module. +#[maybe_async::maybe_async] #[async_trait] pub trait PersonalAccessTokenClient { /// Get the info about all the personal access tokens of the currently authenticated user. diff --git a/core/binary_protocol/src/client/binary_clients/segment_client.rs b/core/binary_protocol/src/client/binary_clients/segment_client.rs index 3e3ba0d1..1001ae65 100644 --- a/core/binary_protocol/src/client/binary_clients/segment_client.rs +++ b/core/binary_protocol/src/client/binary_clients/segment_client.rs @@ -20,6 +20,7 @@ use async_trait::async_trait; use iggy_common::{Identifier, IggyError}; /// This trait defines the methods to interact with the partition module. +#[maybe_async::maybe_async] #[async_trait] pub trait SegmentClient { /// Delete last N segments for a partition by unique ID or name. diff --git a/core/binary_protocol/src/client/binary_clients/stream_client.rs b/core/binary_protocol/src/client/binary_clients/stream_client.rs index bf4a7db4..91b2782e 100644 --- a/core/binary_protocol/src/client/binary_clients/stream_client.rs +++ b/core/binary_protocol/src/client/binary_clients/stream_client.rs @@ -20,6 +20,7 @@ use async_trait::async_trait; use iggy_common::{Identifier, IggyError, Stream, StreamDetails}; /// This trait defines the methods to interact with the stream module. +#[maybe_async::maybe_async] #[async_trait] pub trait StreamClient { /// Get the info about a specific stream by unique ID or name. diff --git a/core/binary_protocol/src/client/binary_clients/system_client.rs b/core/binary_protocol/src/client/binary_clients/system_client.rs index 93767362..65ce5498 100644 --- a/core/binary_protocol/src/client/binary_clients/system_client.rs +++ b/core/binary_protocol/src/client/binary_clients/system_client.rs @@ -23,6 +23,7 @@ use iggy_common::{ }; /// This trait defines the methods to interact with the system module. +#[maybe_async::maybe_async] #[async_trait] pub trait SystemClient { /// Get the stats of the system such as PID, memory usage, streams count etc. diff --git a/core/binary_protocol/src/client/binary_clients/topic_client.rs b/core/binary_protocol/src/client/binary_clients/topic_client.rs index 3e89de03..240682ff 100644 --- a/core/binary_protocol/src/client/binary_clients/topic_client.rs +++ b/core/binary_protocol/src/client/binary_clients/topic_client.rs @@ -22,6 +22,7 @@ use iggy_common::{ }; /// This trait defines the methods to interact with the topic module. +#[maybe_async::maybe_async] #[allow(clippy::too_many_arguments)] #[async_trait] pub trait TopicClient { diff --git a/core/binary_protocol/src/client/binary_clients/user_client.rs b/core/binary_protocol/src/client/binary_clients/user_client.rs index bc794a1c..c1d449b1 100644 --- a/core/binary_protocol/src/client/binary_clients/user_client.rs +++ b/core/binary_protocol/src/client/binary_clients/user_client.rs @@ -22,6 +22,7 @@ use iggy_common::{ }; /// This trait defines the methods to interact with the user module. +#[maybe_async::maybe_async] #[async_trait] pub trait UserClient { /// Get the info about a specific user by unique ID or username. diff --git a/core/binary_protocol/src/client/binary_consumer_groups/mod.rs b/core/binary_protocol/src/client/binary_consumer_groups/mod.rs index bad644d3..242df9c9 100644 --- a/core/binary_protocol/src/client/binary_consumer_groups/mod.rs +++ b/core/binary_protocol/src/client/binary_consumer_groups/mod.rs @@ -28,6 +28,7 @@ use iggy_common::join_consumer_group::JoinConsumerGroup; use iggy_common::leave_consumer_group::LeaveConsumerGroup; use iggy_common::{ConsumerGroup, ConsumerGroupDetails, Identifier, IggyError}; +#[maybe_async::maybe_async] #[async_trait::async_trait] impl<B: BinaryClient> ConsumerGroupClient for B { async fn get_consumer_group( diff --git a/core/binary_protocol/src/client/binary_consumer_offsets/mod.rs b/core/binary_protocol/src/client/binary_consumer_offsets/mod.rs index 9d2ceda7..59122df5 100644 --- a/core/binary_protocol/src/client/binary_consumer_offsets/mod.rs +++ b/core/binary_protocol/src/client/binary_consumer_offsets/mod.rs @@ -24,6 +24,7 @@ use iggy_common::get_consumer_offset::GetConsumerOffset; use iggy_common::store_consumer_offset::StoreConsumerOffset; use iggy_common::{Consumer, ConsumerOffsetInfo, Identifier, IggyError}; +#[maybe_async::maybe_async] #[async_trait::async_trait] impl<B: BinaryClient> ConsumerOffsetClient for B { async fn store_consumer_offset( diff --git a/core/binary_protocol/src/client/binary_messages/mod.rs b/core/binary_protocol/src/client/binary_messages/mod.rs index cef26620..c6d562b3 100644 --- a/core/binary_protocol/src/client/binary_messages/mod.rs +++ b/core/binary_protocol/src/client/binary_messages/mod.rs @@ -23,6 +23,7 @@ use iggy_common::{ SEND_MESSAGES_CODE, SendMessages, }; +#[maybe_async::maybe_async] #[async_trait::async_trait] impl<B: BinaryClient> MessageClient for B { async fn poll_messages( diff --git a/core/binary_protocol/src/client/binary_partitions/mod.rs b/core/binary_protocol/src/client/binary_partitions/mod.rs index cbe57f26..7c65c806 100644 --- a/core/binary_protocol/src/client/binary_partitions/mod.rs +++ b/core/binary_protocol/src/client/binary_partitions/mod.rs @@ -22,6 +22,7 @@ use iggy_common::create_partitions::CreatePartitions; use iggy_common::delete_partitions::DeletePartitions; use iggy_common::{Identifier, IggyError}; +#[maybe_async::maybe_async] #[async_trait::async_trait] impl<B: BinaryClient> PartitionClient for B { async fn create_partitions( diff --git a/core/binary_protocol/src/client/binary_personal_access_tokens/mod.rs b/core/binary_protocol/src/client/binary_personal_access_tokens/mod.rs index fffca48d..58ebec8b 100644 --- a/core/binary_protocol/src/client/binary_personal_access_tokens/mod.rs +++ b/core/binary_protocol/src/client/binary_personal_access_tokens/mod.rs @@ -28,6 +28,7 @@ use iggy_common::{ RawPersonalAccessToken, }; +#[maybe_async::maybe_async] #[async_trait::async_trait] impl<B: BinaryClient> PersonalAccessTokenClient for B { async fn get_personal_access_tokens(&self) -> Result<Vec<PersonalAccessTokenInfo>, IggyError> { diff --git a/core/binary_protocol/src/client/binary_segments/mod.rs b/core/binary_protocol/src/client/binary_segments/mod.rs index f7fa840f..ad306bf9 100644 --- a/core/binary_protocol/src/client/binary_segments/mod.rs +++ b/core/binary_protocol/src/client/binary_segments/mod.rs @@ -23,6 +23,7 @@ use crate::utils::auth::fail_if_not_authenticated; use iggy_common::delete_segments::DeleteSegments; use iggy_common::{Identifier, IggyError}; +#[maybe_async::maybe_async] #[async_trait::async_trait] impl<B: BinaryClient> SegmentClient for B { async fn delete_segments( diff --git a/core/binary_protocol/src/client/binary_streams/mod.rs b/core/binary_protocol/src/client/binary_streams/mod.rs index b48a017c..c5d87bd8 100644 --- a/core/binary_protocol/src/client/binary_streams/mod.rs +++ b/core/binary_protocol/src/client/binary_streams/mod.rs @@ -27,6 +27,7 @@ use iggy_common::purge_stream::PurgeStream; use iggy_common::update_stream::UpdateStream; use iggy_common::{Identifier, IggyError, Stream, StreamDetails}; +#[maybe_async::maybe_async] #[async_trait::async_trait] impl<B: BinaryClient> StreamClient for B { async fn get_stream(&self, stream_id: &Identifier) -> Result<Option<StreamDetails>, IggyError> { diff --git a/core/binary_protocol/src/client/binary_system/mod.rs b/core/binary_protocol/src/client/binary_system/mod.rs index 4ef9ea89..520a0be9 100644 --- a/core/binary_protocol/src/client/binary_system/mod.rs +++ b/core/binary_protocol/src/client/binary_system/mod.rs @@ -30,6 +30,7 @@ use iggy_common::{ SystemSnapshotType, }; +#[maybe_async::maybe_async] #[async_trait::async_trait] impl<B: BinaryClient> SystemClient for B { async fn get_stats(&self) -> Result<Stats, IggyError> { diff --git a/core/binary_protocol/src/client/binary_topics/mod.rs b/core/binary_protocol/src/client/binary_topics/mod.rs index 5d1575b7..20014031 100644 --- a/core/binary_protocol/src/client/binary_topics/mod.rs +++ b/core/binary_protocol/src/client/binary_topics/mod.rs @@ -29,6 +29,7 @@ use iggy_common::{ CompressionAlgorithm, Identifier, IggyError, IggyExpiry, MaxTopicSize, Topic, TopicDetails, }; +#[maybe_async::maybe_async] #[async_trait::async_trait] impl<B: BinaryClient> TopicClient for B { async fn get_topic( diff --git a/core/binary_protocol/src/client/binary_transport/mod.rs b/core/binary_protocol/src/client/binary_transport/mod.rs index e15f435a..e9af5b14 100644 --- a/core/binary_protocol/src/client/binary_transport/mod.rs +++ b/core/binary_protocol/src/client/binary_transport/mod.rs @@ -19,7 +19,9 @@ use async_trait::async_trait; use bytes::Bytes; use iggy_common::{ClientState, Command, DiagnosticEvent, IggyDuration, IggyError}; +use maybe_async::maybe_async; +#[maybe_async(AFIT)] #[async_trait] pub trait BinaryTransport { /// Gets the state of the client. diff --git a/core/binary_protocol/src/client/binary_users/mod.rs b/core/binary_protocol/src/client/binary_users/mod.rs index a1b484a0..07baff79 100644 --- a/core/binary_protocol/src/client/binary_users/mod.rs +++ b/core/binary_protocol/src/client/binary_users/mod.rs @@ -33,6 +33,7 @@ use iggy_common::{ UserInfoDetails, UserStatus, }; +#[maybe_async::maybe_async] #[async_trait::async_trait] impl<B: BinaryClient> UserClient for B { async fn get_user(&self, user_id: &Identifier) -> Result<Option<UserInfoDetails>, IggyError> { diff --git a/core/binary_protocol/src/utils/auth.rs b/core/binary_protocol/src/utils/auth.rs index 8299d8ba..2fbafaa3 100644 --- a/core/binary_protocol/src/utils/auth.rs +++ b/core/binary_protocol/src/utils/auth.rs @@ -19,6 +19,7 @@ use crate::BinaryTransport; use iggy_common::{ClientState, IggyError}; +#[maybe_async::maybe_async] pub(crate) async fn fail_if_not_authenticated<T: BinaryTransport>( transport: &T, ) -> Result<(), IggyError> { diff --git a/core/common/Cargo.toml b/core/common/Cargo.toml index 80f92ebc..ebfad449 100644 --- a/core/common/Cargo.toml +++ b/core/common/Cargo.toml @@ -28,9 +28,10 @@ repository = "https://github.com/apache/iggy" readme = "../../README.md" [features] -default = ["tokio_lock"] +default = ["std_sync_lock"] tokio_lock = [] fast_async_lock = ["dep:fast-async-mutex"] +std_sync_lock = [] # TODO add sync/async feature [dependencies] aes-gcm = { workspace = true } diff --git a/core/common/src/locking/mod.rs b/core/common/src/locking/mod.rs index 42d72ebb..858918a6 100644 --- a/core/common/src/locking/mod.rs +++ b/core/common/src/locking/mod.rs @@ -27,6 +27,12 @@ mod tokio_lock; #[cfg(feature = "fast_async_lock")] mod fast_async_lock; +#[cfg(feature = "std_sync_lock")] +mod std_sync_lock; + +#[cfg(feature = "std_sync_lock")] +pub type IggySharedMut<T> = std_sync_lock::IggyStdSyncRwLock<T>; + #[cfg(feature = "tokio_lock")] #[cfg(not(any(feature = "fast_async_lock")))] pub type IggySharedMut<T> = tokio_lock::IggyTokioRwLock<T>; @@ -35,6 +41,7 @@ pub type IggySharedMut<T> = tokio_lock::IggyTokioRwLock<T>; #[cfg(feature = "fast_async_lock")] pub type IggySharedMut<T> = fast_async_lock::IggyFastAsyncRwLock<T>; +#[cfg(not(feature = "std_sync_lock"))] #[allow(async_fn_in_trait)] pub trait IggySharedMutFn<T>: Send + Sync { type ReadGuard<'a>: Deref<Target = T> + Send @@ -58,3 +65,27 @@ pub trait IggySharedMutFn<T>: Send + Sync { where T: 'a; } + +#[cfg(feature = "std_sync_lock")] +pub trait IggySharedMutFn<T>: Sync { + type ReadGuard<'a>: Deref<Target = T> + where + T: 'a, + Self: 'a; + type WriteGuard<'a>: DerefMut<Target = T> + where + T: 'a, + Self: 'a; + + fn new(data: T) -> Self + where + Self: Sized; + + fn read<'a>(&'a self) -> Self::ReadGuard<'a> + where + T: 'a; + + fn write<'a>(&'a self) -> Self::WriteGuard<'a> + where + T: 'a; +} diff --git a/core/common/src/locking/std_sync_lock.rs b/core/common/src/locking/std_sync_lock.rs new file mode 100644 index 00000000..fc08568f --- /dev/null +++ b/core/common/src/locking/std_sync_lock.rs @@ -0,0 +1,42 @@ +use std::sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard}; + +use crate::locking::IggySharedMutFn; + +#[derive(Debug)] +pub struct IggyStdSyncRwLock<T>(Arc<RwLock<T>>); + +impl<T> IggySharedMutFn<T> for IggyStdSyncRwLock<T> +where + T: Send + Sync, +{ + type ReadGuard<'a> = RwLockReadGuard<'a, T> + where + T: 'a; + type WriteGuard<'a> = RwLockWriteGuard<'a, T> + where + T: 'a; + + fn new(data: T) -> Self { + IggyStdSyncRwLock(Arc::new(RwLock::new(data))) + } + + fn read<'a>(&'a self) -> Self::ReadGuard<'a> + where + T: 'a, + { + self.0.read().unwrap() + } + + fn write<'a>(&'a self) -> Self::WriteGuard<'a> + where + T: 'a, + { + self.0.write().unwrap() + } +} + +impl<T> Clone for IggyStdSyncRwLock<T> { + fn clone(&self) -> Self { + Self(Arc::clone(&self.0)) + } +} diff --git a/core/sdk/Cargo.toml b/core/sdk/Cargo.toml index c6b26729..b3c55308 100644 --- a/core/sdk/Cargo.toml +++ b/core/sdk/Cargo.toml @@ -31,6 +31,8 @@ readme = "../../README.md" [features] default = ["tokio_lock"] tokio_lock = [] +sync = ["maybe-async/is_sync"] +async = [] # Moved to common crate. Not sure if this is even needed anymore #fast_async_lock = ["dep:fast-async-mutex"] @@ -48,6 +50,7 @@ futures = { workspace = true } futures-util = { workspace = true } iggy_binary_protocol = { workspace = true } iggy_common = { workspace = true } +maybe-async = "0.2.10" num_cpus = "1.17.0" quinn = { workspace = true } reqwest = { workspace = true } diff --git a/core/sdk/src/client_wrappers/binary_client.rs b/core/sdk/src/client_wrappers/binary_client.rs index 757d16a7..a4206134 100644 --- a/core/sdk/src/client_wrappers/binary_client.rs +++ b/core/sdk/src/client_wrappers/binary_client.rs @@ -22,6 +22,7 @@ use async_trait::async_trait; use iggy_binary_protocol::Client; use iggy_common::{DiagnosticEvent, IggyError}; +#[maybe_async::maybe_async] #[async_trait] impl Client for ClientWrapper { async fn connect(&self) -> Result<(), IggyError> { diff --git a/core/sdk/src/client_wrappers/binary_consumer_group_client.rs b/core/sdk/src/client_wrappers/binary_consumer_group_client.rs index 976be37c..41fbe045 100644 --- a/core/sdk/src/client_wrappers/binary_consumer_group_client.rs +++ b/core/sdk/src/client_wrappers/binary_consumer_group_client.rs @@ -22,6 +22,7 @@ use async_trait::async_trait; use iggy_binary_protocol::{ConsumerGroupClient, UserClient}; use iggy_common::{ConsumerGroup, ConsumerGroupDetails, Identifier, IggyError}; +#[maybe_async::maybe_async] #[async_trait] impl ConsumerGroupClient for ClientWrapper { async fn get_consumer_group( @@ -215,6 +216,7 @@ impl ConsumerGroupClient for ClientWrapper { } } +#[cfg(feature = "async")] #[async_trait] impl AsyncDrop for ClientWrapper { async fn async_drop(&mut self) { @@ -237,3 +239,15 @@ impl AsyncDrop for ClientWrapper { } } } + +#[cfg(feature = "sync")] +impl Drop for ClientWrapper { + fn drop(&mut self) { + match self { + ClientWrapper::TcpSync(client) => { + let _ = client.logout_user(); + } + _ => return + } + } +} diff --git a/core/sdk/src/client_wrappers/binary_consumer_offset_client.rs b/core/sdk/src/client_wrappers/binary_consumer_offset_client.rs index 51c6ea2b..f7c6a9fe 100644 --- a/core/sdk/src/client_wrappers/binary_consumer_offset_client.rs +++ b/core/sdk/src/client_wrappers/binary_consumer_offset_client.rs @@ -21,6 +21,7 @@ use async_trait::async_trait; use iggy_binary_protocol::ConsumerOffsetClient; use iggy_common::{Consumer, ConsumerOffsetInfo, Identifier, IggyError}; +#[maybe_async::maybe_async] #[async_trait] impl ConsumerOffsetClient for ClientWrapper { async fn store_consumer_offset( diff --git a/core/sdk/src/client_wrappers/binary_message_client.rs b/core/sdk/src/client_wrappers/binary_message_client.rs index 98db2924..f1908de2 100644 --- a/core/sdk/src/client_wrappers/binary_message_client.rs +++ b/core/sdk/src/client_wrappers/binary_message_client.rs @@ -23,6 +23,7 @@ use iggy_common::{ Consumer, Identifier, IggyError, IggyMessage, Partitioning, PolledMessages, PollingStrategy, }; +#[maybe_async::maybe_async] #[async_trait] impl MessageClient for ClientWrapper { async fn poll_messages( diff --git a/core/sdk/src/client_wrappers/binary_partition_client.rs b/core/sdk/src/client_wrappers/binary_partition_client.rs index ceee0ad8..fc6fcce7 100644 --- a/core/sdk/src/client_wrappers/binary_partition_client.rs +++ b/core/sdk/src/client_wrappers/binary_partition_client.rs @@ -21,6 +21,7 @@ use async_trait::async_trait; use iggy_binary_protocol::PartitionClient; use iggy_common::{Identifier, IggyError}; +#[maybe_async::maybe_async] #[async_trait] impl PartitionClient for ClientWrapper { async fn create_partitions( diff --git a/core/sdk/src/client_wrappers/binary_personal_access_token_client.rs b/core/sdk/src/client_wrappers/binary_personal_access_token_client.rs index 98dd829f..119c9204 100644 --- a/core/sdk/src/client_wrappers/binary_personal_access_token_client.rs +++ b/core/sdk/src/client_wrappers/binary_personal_access_token_client.rs @@ -24,6 +24,7 @@ use iggy_common::{ RawPersonalAccessToken, }; +#[maybe_async::maybe_async] #[async_trait] impl PersonalAccessTokenClient for ClientWrapper { async fn get_personal_access_tokens(&self) -> Result<Vec<PersonalAccessTokenInfo>, IggyError> { diff --git a/core/sdk/src/client_wrappers/binary_segment_client.rs b/core/sdk/src/client_wrappers/binary_segment_client.rs index 53845df0..b42e76fe 100644 --- a/core/sdk/src/client_wrappers/binary_segment_client.rs +++ b/core/sdk/src/client_wrappers/binary_segment_client.rs @@ -21,6 +21,7 @@ use async_trait::async_trait; use iggy_binary_protocol::SegmentClient; use iggy_common::{Identifier, IggyError}; +#[maybe_async::maybe_async] #[async_trait] impl SegmentClient for ClientWrapper { async fn delete_segments( diff --git a/core/sdk/src/client_wrappers/binary_stream_client.rs b/core/sdk/src/client_wrappers/binary_stream_client.rs index 37ee448c..cd8ce8d4 100644 --- a/core/sdk/src/client_wrappers/binary_stream_client.rs +++ b/core/sdk/src/client_wrappers/binary_stream_client.rs @@ -21,6 +21,7 @@ use async_trait::async_trait; use iggy_binary_protocol::StreamClient; use iggy_common::{Identifier, IggyError, Stream, StreamDetails}; +#[maybe_async::maybe_async] #[async_trait] impl StreamClient for ClientWrapper { async fn get_stream(&self, stream_id: &Identifier) -> Result<Option<StreamDetails>, IggyError> { diff --git a/core/sdk/src/client_wrappers/binary_system_client.rs b/core/sdk/src/client_wrappers/binary_system_client.rs index af69c08f..e9718bfd 100644 --- a/core/sdk/src/client_wrappers/binary_system_client.rs +++ b/core/sdk/src/client_wrappers/binary_system_client.rs @@ -24,6 +24,7 @@ use iggy_common::{ SystemSnapshotType, }; +#[maybe_async::maybe_async] #[async_trait] impl SystemClient for ClientWrapper { async fn get_stats(&self) -> Result<Stats, IggyError> { diff --git a/core/sdk/src/client_wrappers/binary_topic_client.rs b/core/sdk/src/client_wrappers/binary_topic_client.rs index ae6e733e..55fbdcca 100644 --- a/core/sdk/src/client_wrappers/binary_topic_client.rs +++ b/core/sdk/src/client_wrappers/binary_topic_client.rs @@ -23,6 +23,7 @@ use iggy_common::{ CompressionAlgorithm, Identifier, IggyError, IggyExpiry, MaxTopicSize, Topic, TopicDetails, }; +#[maybe_async::maybe_async] #[async_trait] impl TopicClient for ClientWrapper { async fn get_topic( diff --git a/core/sdk/src/client_wrappers/binary_user_client.rs b/core/sdk/src/client_wrappers/binary_user_client.rs index d05174f4..486e937e 100644 --- a/core/sdk/src/client_wrappers/binary_user_client.rs +++ b/core/sdk/src/client_wrappers/binary_user_client.rs @@ -23,6 +23,7 @@ use iggy_common::{ Identifier, IdentityInfo, IggyError, Permissions, UserInfo, UserInfoDetails, UserStatus, }; +#[maybe_async::maybe_async] #[async_trait] impl UserClient for ClientWrapper { async fn get_user(&self, user_id: &Identifier) -> Result<Option<UserInfoDetails>, IggyError> { diff --git a/core/sdk/src/clients/binary_consumer_group.rs b/core/sdk/src/clients/binary_consumer_group.rs index 59298131..5dd5a636 100644 --- a/core/sdk/src/clients/binary_consumer_group.rs +++ b/core/sdk/src/clients/binary_consumer_group.rs @@ -23,6 +23,7 @@ use iggy_binary_protocol::{ConsumerGroupClient, UserClient}; use iggy_common::locking::IggySharedMutFn; use iggy_common::{ConsumerGroup, ConsumerGroupDetails, Identifier, IggyError}; +#[maybe_async::maybe_async] #[async_trait] impl ConsumerGroupClient for IggyClient { async fn get_consumer_group( @@ -104,9 +105,17 @@ impl ConsumerGroupClient for IggyClient { } } +#[cfg(feature = "async")] #[async_trait] impl AsyncDrop for IggyClient { async fn async_drop(&mut self) { let _ = self.client.read().await.logout_user().await; } } + +#[cfg(feature = "sync")] +impl Drop for IggyClient { + fn drop(&mut self) { + let _ = self.client.read().logout_user(); + } +} diff --git a/core/sdk/src/clients/binary_consumer_offset.rs b/core/sdk/src/clients/binary_consumer_offset.rs index 1a6965c2..0ce822e2 100644 --- a/core/sdk/src/clients/binary_consumer_offset.rs +++ b/core/sdk/src/clients/binary_consumer_offset.rs @@ -22,6 +22,7 @@ use iggy_binary_protocol::ConsumerOffsetClient; use iggy_common::locking::IggySharedMutFn; use iggy_common::{Consumer, ConsumerOffsetInfo, Identifier, IggyError}; +#[maybe_async::maybe_async] #[async_trait] impl ConsumerOffsetClient for IggyClient { async fn store_consumer_offset( diff --git a/core/sdk/src/clients/binary_message.rs b/core/sdk/src/clients/binary_message.rs index 704fd8a5..ac9e1f76 100644 --- a/core/sdk/src/clients/binary_message.rs +++ b/core/sdk/src/clients/binary_message.rs @@ -25,6 +25,7 @@ use iggy_common::{ Consumer, Identifier, IggyError, IggyMessage, Partitioning, PolledMessages, PollingStrategy, }; +#[maybe_async::maybe_async] #[async_trait] impl MessageClient for IggyClient { async fn poll_messages( diff --git a/core/sdk/src/clients/binary_partitions.rs b/core/sdk/src/clients/binary_partitions.rs index 0af237fa..174b5351 100644 --- a/core/sdk/src/clients/binary_partitions.rs +++ b/core/sdk/src/clients/binary_partitions.rs @@ -22,6 +22,7 @@ use iggy_binary_protocol::PartitionClient; use iggy_common::locking::IggySharedMutFn; use iggy_common::{Identifier, IggyError}; +#[maybe_async::maybe_async] #[async_trait] impl PartitionClient for IggyClient { async fn create_partitions( diff --git a/core/sdk/src/clients/binary_personal_access_tokens.rs b/core/sdk/src/clients/binary_personal_access_tokens.rs index 54ac0f47..44b23441 100644 --- a/core/sdk/src/clients/binary_personal_access_tokens.rs +++ b/core/sdk/src/clients/binary_personal_access_tokens.rs @@ -25,6 +25,7 @@ use iggy_common::{ RawPersonalAccessToken, }; +#[maybe_async::maybe_async] #[async_trait] impl PersonalAccessTokenClient for IggyClient { async fn get_personal_access_tokens(&self) -> Result<Vec<PersonalAccessTokenInfo>, IggyError> { diff --git a/core/sdk/src/clients/binary_segments.rs b/core/sdk/src/clients/binary_segments.rs index a0c3894e..500e32c9 100644 --- a/core/sdk/src/clients/binary_segments.rs +++ b/core/sdk/src/clients/binary_segments.rs @@ -22,6 +22,7 @@ use iggy_binary_protocol::SegmentClient; use iggy_common::locking::IggySharedMutFn; use iggy_common::{Identifier, IggyError}; +#[maybe_async::maybe_async] #[async_trait] impl SegmentClient for IggyClient { async fn delete_segments( diff --git a/core/sdk/src/clients/binary_streams.rs b/core/sdk/src/clients/binary_streams.rs index 0236a3b4..01431a23 100644 --- a/core/sdk/src/clients/binary_streams.rs +++ b/core/sdk/src/clients/binary_streams.rs @@ -22,6 +22,7 @@ use iggy_binary_protocol::StreamClient; use iggy_common::locking::IggySharedMutFn; use iggy_common::{Identifier, IggyError, Stream, StreamDetails}; +#[maybe_async::maybe_async] #[async_trait] impl StreamClient for IggyClient { async fn get_stream(&self, stream_id: &Identifier) -> Result<Option<StreamDetails>, IggyError> { diff --git a/core/sdk/src/clients/binary_system.rs b/core/sdk/src/clients/binary_system.rs index edc45867..9d8fabf4 100644 --- a/core/sdk/src/clients/binary_system.rs +++ b/core/sdk/src/clients/binary_system.rs @@ -25,6 +25,7 @@ use iggy_common::{ SystemSnapshotType, }; +#[maybe_async::maybe_async] #[async_trait] impl SystemClient for IggyClient { async fn get_stats(&self) -> Result<Stats, IggyError> { diff --git a/core/sdk/src/clients/binary_topics.rs b/core/sdk/src/clients/binary_topics.rs index 6275d7a4..62d9e425 100644 --- a/core/sdk/src/clients/binary_topics.rs +++ b/core/sdk/src/clients/binary_topics.rs @@ -24,6 +24,7 @@ use iggy_common::{ CompressionAlgorithm, Identifier, IggyError, IggyExpiry, MaxTopicSize, Topic, TopicDetails, }; +#[maybe_async::maybe_async] #[async_trait] impl TopicClient for IggyClient { async fn get_topic( diff --git a/core/sdk/src/clients/binary_users.rs b/core/sdk/src/clients/binary_users.rs index 6408f231..146e6a7e 100644 --- a/core/sdk/src/clients/binary_users.rs +++ b/core/sdk/src/clients/binary_users.rs @@ -24,6 +24,7 @@ use iggy_common::{ Identifier, IdentityInfo, IggyError, Permissions, UserInfo, UserInfoDetails, UserStatus, }; +#[maybe_async::maybe_async] #[async_trait] impl UserClient for IggyClient { async fn get_user(&self, user_id: &Identifier) -> Result<Option<UserInfoDetails>, IggyError> { diff --git a/core/sdk/src/lib.rs b/core/sdk/src/lib.rs index 52e7e118..e57b7d04 100644 --- a/core/sdk/src/lib.rs +++ b/core/sdk/src/lib.rs @@ -21,8 +21,10 @@ pub mod client_provider; pub mod client_wrappers; pub mod clients; pub mod consumer_ext; +#[cfg(feature = "async")] pub mod http; pub mod prelude; +#[cfg(feature = "async")] pub mod quic; pub mod stream_builder; pub mod tcp; diff --git a/core/sdk/src/prelude.rs b/core/sdk/src/prelude.rs index e93112a1..a02f4b66 100644 --- a/core/sdk/src/prelude.rs +++ b/core/sdk/src/prelude.rs @@ -44,7 +44,12 @@ pub use crate::stream_builder::IggyConsumerConfig; pub use crate::stream_builder::IggyStreamConsumer; pub use crate::stream_builder::{IggyProducerConfig, IggyStreamProducer}; pub use crate::stream_builder::{IggyStream, IggyStreamConfig}; +// Async-only клиенты +#[cfg(feature = "async")] pub use crate::tcp::tcp_client::TcpClient; + +// Sync-only клиенты +#[cfg(feature = "sync")] pub use crate::tcp::tcp_client_sync::{TcpClientSync, TcpClientSyncTcp, TcpClientSyncTls}; pub use iggy_binary_protocol::{ Client, ConsumerGroupClient, ConsumerOffsetClient, MessageClient, PartitionClient, diff --git a/core/sdk/src/stream_builder/build/build_iggy_client.rs b/core/sdk/src/stream_builder/build/build_iggy_client.rs index bf06b972..b5a3b144 100644 --- a/core/sdk/src/stream_builder/build/build_iggy_client.rs +++ b/core/sdk/src/stream_builder/build/build_iggy_client.rs @@ -35,6 +35,7 @@ use iggy_binary_protocol::Client; /// If the connection string is invalid or the client cannot be initialized, /// an `IggyError` will be returned. /// +#[maybe_async::maybe_async] pub(crate) async fn build_iggy_client(connection_string: &str) -> Result<IggyClient, IggyError> { let client = IggyClient::from_connection_string(connection_string)?; client.connect().await?; diff --git a/core/sdk/src/stream_builder/build/build_iggy_consumer.rs b/core/sdk/src/stream_builder/build/build_iggy_consumer.rs index dd7260bd..2cd105eb 100644 --- a/core/sdk/src/stream_builder/build/build_iggy_consumer.rs +++ b/core/sdk/src/stream_builder/build/build_iggy_consumer.rs @@ -38,6 +38,7 @@ use tracing::{error, trace}; /// This function will create a new `IggyConsumer` with the given `IggyClient` and `IggyConsumerConfig`. /// The `IggyConsumerConfig` fields are used to configure the `IggyConsumer`. /// +#[maybe_async::maybe_async] pub(crate) async fn build_iggy_consumer( client: &IggyClient, config: &IggyConsumerConfig, diff --git a/core/sdk/src/stream_builder/build/build_iggy_producer.rs b/core/sdk/src/stream_builder/build/build_iggy_producer.rs index 9522452b..5780b41f 100644 --- a/core/sdk/src/stream_builder/build/build_iggy_producer.rs +++ b/core/sdk/src/stream_builder/build/build_iggy_producer.rs @@ -39,6 +39,7 @@ use tracing::{error, trace}; /// This function will create a new `IggyProducer` with the given `IggyClient` and `IggyProducerConfig`. /// The `IggyProducerConfig` fields are used to configure the `IggyProducer`. /// +#[maybe_async::maybe_async] pub(crate) async fn build_iggy_producer( client: &IggyClient, config: &IggyProducerConfig, diff --git a/core/sdk/src/stream_builder/build/build_stream_topic.rs b/core/sdk/src/stream_builder/build/build_stream_topic.rs index 2a8e0e73..59beb397 100644 --- a/core/sdk/src/stream_builder/build/build_stream_topic.rs +++ b/core/sdk/src/stream_builder/build/build_stream_topic.rs @@ -40,6 +40,7 @@ use tracing::{trace, warn}; /// /// * `IggyError` - If the iggy stream topic cannot be build. /// +#[maybe_async::maybe_async] pub(crate) async fn build_iggy_stream_topic_if_not_exists( client: &IggyClient, config: &IggyConsumerConfig, diff --git a/core/sdk/src/stream_builder/iggy_stream.rs b/core/sdk/src/stream_builder/iggy_stream.rs index df759824..fafed966 100644 --- a/core/sdk/src/stream_builder/iggy_stream.rs +++ b/core/sdk/src/stream_builder/iggy_stream.rs @@ -38,6 +38,7 @@ impl IggyStream { /// /// If the builds fails, an `IggyError` is returned. /// + #[maybe_async::maybe_async] pub async fn build( client: &IggyClient, config: &IggyStreamConfig, @@ -68,6 +69,7 @@ impl IggyStream { /// /// If the builds fails, an `IggyError` is returned. /// + #[maybe_async::maybe_async] pub async fn with_client_from_connection_string( connection_string: &str, config: &IggyStreamConfig, @@ -97,6 +99,7 @@ impl IggyStream { /// If the connection string is invalid or the client cannot be initialized, /// an `IggyError` will be returned. /// + #[maybe_async::maybe_async] pub async fn build_iggy_client(connection_string: &str) -> Result<IggyClient, IggyError> { trace!("Build and connect iggy client"); let client = build::build_iggy_client(connection_string).await?; diff --git a/core/sdk/src/stream_builder/iggy_stream_consumer.rs b/core/sdk/src/stream_builder/iggy_stream_consumer.rs index 1b972a6f..b1287456 100644 --- a/core/sdk/src/stream_builder/iggy_stream_consumer.rs +++ b/core/sdk/src/stream_builder/iggy_stream_consumer.rs @@ -37,6 +37,7 @@ impl IggyStreamConsumer { /// /// If the builds fails, an `IggyError` is returned. /// + #[maybe_async::maybe_async] pub async fn build( client: &IggyClient, config: &IggyConsumerConfig, @@ -67,6 +68,7 @@ impl IggyStreamConsumer { /// /// If the builds fails, an `IggyError` is returned. /// + #[maybe_async::maybe_async] pub async fn with_client_from_url( connection_string: &str, config: &IggyConsumerConfig, @@ -100,6 +102,7 @@ impl IggyStreamConsumer { /// If the connection string is invalid or the client cannot be initialized, /// an `IggyError` will be returned. /// + #[maybe_async::maybe_async] pub async fn build_iggy_client(connection_string: &str) -> Result<IggyClient, IggyError> { trace!("Build and connect iggy client"); let client = build::build_iggy_client(connection_string).await?; diff --git a/core/sdk/src/stream_builder/iggy_stream_producer.rs b/core/sdk/src/stream_builder/iggy_stream_producer.rs index 31669518..e20d791d 100644 --- a/core/sdk/src/stream_builder/iggy_stream_producer.rs +++ b/core/sdk/src/stream_builder/iggy_stream_producer.rs @@ -38,6 +38,7 @@ impl IggyStreamProducer { /// /// If the client is not connected or the producer cannot be built, an `IggyError` is returned. /// + #[maybe_async::maybe_async] pub async fn build( client: &IggyClient, config: &IggyProducerConfig, @@ -66,6 +67,7 @@ impl IggyStreamProducer { /// /// If the client cannot be connected or the producer cannot be built, an `IggyError` is returned. /// + #[maybe_async::maybe_async] pub async fn with_client_from_url( connection_string: &str, config: &IggyProducerConfig, @@ -97,6 +99,7 @@ impl IggyStreamProducer { /// If the connection string is invalid or the client cannot be initialized, /// an `IggyError` will be returned. /// + #[maybe_async::maybe_async] pub async fn build_iggy_client(connection_string: &str) -> Result<IggyClient, IggyError> { trace!("Build and connect iggy client"); let client = build::build_iggy_client(connection_string).await?; diff --git a/core/sdk/src/tcp/mod.rs b/core/sdk/src/tcp/mod.rs index e98605c4..37fd43c7 100644 --- a/core/sdk/src/tcp/mod.rs +++ b/core/sdk/src/tcp/mod.rs @@ -16,11 +16,14 @@ * under the License. */ +#[cfg(feature = "async")] pub(crate) mod tcp_client; + +#[cfg(feature = "sync")] +pub mod tcp_client_sync; + pub(crate) mod tcp_connection_stream; pub(crate) mod tcp_connection_stream_kind; pub(crate) mod tcp_stream; pub(crate) mod tcp_tls_connection_stream; pub(crate) mod tcp_tls_verifier; - -pub mod tcp_client_sync; diff --git a/core/sdk/src/tcp/tcp_client.rs b/core/sdk/src/tcp/tcp_client.rs index ab299cc4..3de6c868 100644 --- a/core/sdk/src/tcp/tcp_client.rs +++ b/core/sdk/src/tcp/tcp_client.rs @@ -62,6 +62,7 @@ impl Default for TcpClient { } } +#[maybe_async::async_impl] #[async_trait] impl Client for TcpClient { async fn connect(&self) -> Result<(), IggyError> { @@ -81,7 +82,7 @@ impl Client for TcpClient { } } -#[async_trait] +#[maybe_async::async_impl] #[async_trait] impl BinaryTransport for TcpClient { async fn get_state(&self) -> ClientState { @@ -144,6 +145,7 @@ impl BinaryTransport for TcpClient { } } +#[maybe_async::async_impl] impl BinaryClient for TcpClient {} impl TcpClient { diff --git a/core/sdk/src/tcp/tcp_client_sync.rs b/core/sdk/src/tcp/tcp_client_sync.rs index c60cb8f5..91c539a9 100644 --- a/core/sdk/src/tcp/tcp_client_sync.rs +++ b/core/sdk/src/tcp/tcp_client_sync.rs @@ -34,7 +34,7 @@ where pub(crate) config: Arc<T::Config>, inner: Mutex<ProtocolCore>, stream: Mutex<Option<T::Stream>>, - events: (Sender<DiagnosticEvent>, Receiver<DiagnosticEvent>), + events: (Sender<DiagnosticEvent>, Receiver<DiagnosticEvent>), // TODO change async_broadcast to sync crate via feature flag recv_buffer: Mutex<BytesMut>, client_address: Mutex<Option<SocketAddr>>, connected_at: Mutex<Option<IggyTimestamp>>, @@ -67,7 +67,7 @@ where }) } - async fn get_client_address_value(&self) -> String { + fn get_client_address_value(&self) -> String { let client_address = self.client_address.lock().unwrap(); if let Some(client_address) = &*client_address { client_address.to_string() @@ -77,6 +77,7 @@ where } } +#[maybe_async::sync_impl] #[async_trait] impl<T> Client for TcpClientSync<T> where @@ -84,7 +85,7 @@ where T::Config: Send + Sync + Debug, T::Stream: Send + Sync + Debug, { - async fn connect(&self) -> Result<(), IggyError> { + fn connect(&self) -> Result<(), IggyError> { let address = self.config.server_address(); let config = self.config.clone(); @@ -100,39 +101,38 @@ where let now = IggyTimestamp::now(); *self.connected_at.lock().unwrap() = Some(now); - self.publish_event(DiagnosticEvent::Connected).await; + self.publish_event(DiagnosticEvent::Connected); - let client_address = self.get_client_address_value().await; + let client_address = self.get_client_address_value(); debug!("TcpClientSync client: {client_address} has connected to server at: {now}"); } Ok(()) } - async fn disconnect(&self) -> Result<(), IggyError> { - if self.get_state().await == ClientState::Disconnected { + fn disconnect(&self) -> Result<(), IggyError> { + if self.get_state() == ClientState::Disconnected { return Ok(()); } - let client_address = self.get_client_address_value().await; + let client_address = self.get_client_address_value(); debug!("TcpClientSync client: {client_address} is disconnecting from server..."); - // Scope the mutex guards to ensure they're dropped before any await { let mut core = self.inner.lock().unwrap(); core.disconnect(); *self.stream.lock().unwrap() = None; } - self.publish_event(DiagnosticEvent::Disconnected).await; + self.publish_event(DiagnosticEvent::Disconnected); let now = IggyTimestamp::now(); debug!("TcpClientSync client: {client_address} has disconnected from server at: {now}."); Ok(()) } - async fn shutdown(&self) -> Result<(), IggyError> { - if self.get_state().await == ClientState::Shutdown { + fn shutdown(&self) -> Result<(), IggyError> { + if self.get_state() == ClientState::Shutdown { return Ok(()); } @@ -147,15 +147,16 @@ where core.shutdown(); } - self.publish_event(DiagnosticEvent::Shutdown).await; + self.publish_event(DiagnosticEvent::Shutdown); Ok(()) } - async fn subscribe_events(&self) -> async_broadcast::Receiver<iggy_common::DiagnosticEvent> { + fn subscribe_events(&self) -> async_broadcast::Receiver<iggy_common::DiagnosticEvent> { self.events.1.clone() } } +#[maybe_async::sync_impl] #[async_trait] impl<T> BinaryTransport for TcpClientSync<T> where @@ -163,17 +164,17 @@ where T::Config: Send + Sync + Debug, T::Stream: Send + Sync + Debug, { - async fn get_state(&self) -> ClientState { + fn get_state(&self) -> ClientState { self.inner.lock().unwrap().state } - async fn set_state(&self, client_state: ClientState) { + fn set_state(&self, client_state: ClientState) { let mut core = self.inner.lock().unwrap(); core.state = client_state } - async fn publish_event(&self, event: DiagnosticEvent) { - if let Err(error) = self.events.0.broadcast(event).await { + fn publish_event(&self, event: DiagnosticEvent) { + if let Err(error) = self.events.0.broadcast(event) { error!("Failed to send a TCP diagnostic event: {error}"); } } @@ -182,21 +183,19 @@ where self.config.heartbeat_interval() } - async fn send_with_response<C: Command>(&self, command: &C) -> Result<Bytes, IggyError> { + fn send_with_response<C: Command>(&self, command: &C) -> Result<Bytes, IggyError> { command.validate()?; self.send_raw_with_response(command.code(), command.to_bytes()) - .await } - async fn send_raw_with_response(&self, code: u32, payload: Bytes) -> Result<Bytes, IggyError> { - let result = self.send_raw(code, payload.clone()).await; + fn send_raw_with_response(&self, code: u32, payload: Bytes) -> Result<Bytes, IggyError> { + let result = self.send_raw(code, payload.clone()); if result.is_ok() { return result; } let error = result.unwrap_err(); - // Check if we should attempt reconnection using ProtocolCore logic let should_reconnect = { let core = self.inner.lock().unwrap(); core.should_reconnect_for_error(&error) @@ -206,16 +205,14 @@ where return Err(error); } - // Perform reconnection let server_address = self.config.server_address(); { let mut core = self.inner.lock().unwrap(); core.initiate_reconnection(server_address)?; } - // Attempt to reconnect - self.connect().await?; - self.send_raw(code, payload).await + self.connect()?; + self.send_raw(code, payload) } } @@ -301,7 +298,7 @@ where Ok(Some(stream)) } - async fn send_raw(&self, code: u32, payload: Bytes) -> Result<Bytes, IggyError> { + fn send_raw(&self, code: u32, payload: Bytes) -> Result<Bytes, IggyError> { let mut core = self.inner.lock().unwrap(); let mut stream = self.stream.lock().unwrap(); let mut recv_buf = self.recv_buffer.lock().unwrap(); @@ -399,9 +396,7 @@ fn read<T: Transport>( } } -// Specific implementations for TCP impl TcpClientSync<TcpTransport> { - /// Create a new TCP client for the provided server address. pub fn new( server_address: &str, auto_sign_in: AutoLogin, @@ -418,7 +413,6 @@ impl TcpClientSync<TcpTransport> { ) } - /// Create a new TCP client from the provided connection string. pub fn from_connection_string(connection_string: &str) -> Result<Self, IggyError> { if ConnectionStringUtils::parse_protocol(connection_string)? != TransportProtocol::Tcp { return Err(IggyError::InvalidConnectionString); @@ -432,7 +426,6 @@ impl TcpClientSync<TcpTransport> { ) } - /// Create a new TCP client based on the provided configuration. pub fn create_tcp(config: Arc<TcpClientConfig>) -> Result<Self, IggyError> { Self::create(Arc::new(TcpTransport), config) } @@ -443,7 +436,6 @@ pub type TcpClientSyncTcp = TcpClientSync<TcpTransport>; pub type TcpClientSyncTls = TcpClientSync<TcpTlsTransport>; impl TcpClientSyncTls { - /// Create a new TLS TCP client from the provided connection string. pub fn from_connection_string_tls(connection_string: &str) -> Result<Self, IggyError> { if ConnectionStringUtils::parse_protocol(connection_string)? != TransportProtocol::Tcp { return Err(IggyError::InvalidConnectionString); @@ -457,12 +449,10 @@ impl TcpClientSyncTls { ) } - /// Create a new TLS TCP client based on the provided configuration. pub fn create_tcp_tls(config: Arc<TcpClientConfig>) -> Result<Self, IggyError> { Self::create(Arc::new(TcpTlsTransport), config) } - /// Create a new TLS TCP client for the provided server address. pub fn new_tls( server_address: &str, domain: &str, @@ -489,8 +479,6 @@ impl Default for TcpClientSync<TcpTransport> { } } -/// Unit tests for TcpClientSync. -/// Tests connection string parsing, configuration validation, and constructors. #[cfg(test)] mod tests { use super::*;
