This is an automated email from the ASF dual-hosted git repository. bashirbekov pushed a commit to branch add-sync-client in repository https://gitbox.apache.org/repos/asf/iggy.git
commit efecd1a011be9e7c2fa487b09798d6880eac7ab8 Author: haze518 <[email protected]> AuthorDate: Mon Sep 22 08:35:13 2025 +0600 raw variant --- core/sdk/Cargo.toml | 6 +- core/sdk/src/client_provider.rs | 35 +- core/sdk/src/client_wrappers/binary_client.rs | 107 +++-- .../binary_consumer_group_client.rs | 460 ++++++++++++--------- .../binary_consumer_offset_client.rs | 244 ++++++----- .../src/client_wrappers/binary_message_client.rs | 316 ++++++++------ .../src/client_wrappers/binary_partition_client.rs | 161 +++++--- .../binary_personal_access_token_client.rs | 128 ++++-- .../src/client_wrappers/binary_segment_client.rs | 95 +++-- .../src/client_wrappers/binary_stream_client.rs | 163 +++++--- .../src/client_wrappers/binary_system_client.rs | 185 ++++++--- .../sdk/src/client_wrappers/binary_topic_client.rs | 442 +++++++++++--------- core/sdk/src/client_wrappers/binary_user_client.rs | 349 ++++++++++------ core/sdk/src/client_wrappers/client_wrapper.rs | 38 +- core/sdk/src/client_wrappers/mod.rs | 9 +- core/sdk/src/clients/client.rs | 128 ++++-- core/sdk/src/clients/client_builder.rs | 36 +- core/sdk/src/clients/consumer.rs | 2 +- core/sdk/src/clients/consumer_builder.rs | 2 +- core/sdk/src/clients/producer.rs | 2 +- core/sdk/src/clients/producer_builder.rs | 2 +- .../stream_builder/build/build_iggy_consumer.rs | 2 + .../stream_builder/build/build_iggy_producer.rs | 1 + core/sdk/src/tcp/tcp_client_sync.rs | 2 +- 24 files changed, 1793 insertions(+), 1122 deletions(-) diff --git a/core/sdk/Cargo.toml b/core/sdk/Cargo.toml index b3c55308..a8e6f158 100644 --- a/core/sdk/Cargo.toml +++ b/core/sdk/Cargo.toml @@ -31,8 +31,8 @@ readme = "../../README.md" [features] default = ["tokio_lock"] tokio_lock = [] -sync = ["maybe-async/is_sync"] -async = [] +sync = ["maybe-async/is_sync", "iggy_binary_protocol/sync"] +async = ["iggy_binary_protocol/async"] # Moved to common crate. Not sure if this is even needed anymore #fast_async_lock = ["dep:fast-async-mutex"] @@ -48,7 +48,7 @@ dashmap = { workspace = true } flume = { workspace = true } futures = { workspace = true } futures-util = { workspace = true } -iggy_binary_protocol = { workspace = true } +iggy_binary_protocol = { workspace = true, features = [] } iggy_common = { workspace = true } maybe-async = "0.2.10" num_cpus = "1.17.0" diff --git a/core/sdk/src/client_provider.rs b/core/sdk/src/client_provider.rs index ab9d329a..1443aed6 100644 --- a/core/sdk/src/client_provider.rs +++ b/core/sdk/src/client_provider.rs @@ -19,13 +19,23 @@ use crate::client_wrappers::client_wrapper::ClientWrapper; #[allow(deprecated)] use crate::clients::client::IggyClient; + +#[cfg(feature = "async")] use crate::http::http_client::HttpClient; +#[cfg(feature = "async")] +use crate::quic::quic_client::QuicClient; +#[cfg(feature = "async")] +use crate::tcp::tcp_client::TcpClient; + use crate::prelude::{ - ClientError, HttpClientConfig, IggyDuration, QuicClientConfig, QuicClientReconnectionConfig, + ClientError, IggyDuration, +}; + +#[cfg(feature = "async")] +use crate::prelude::{ + HttpClientConfig, QuicClientConfig, QuicClientReconnectionConfig, TcpClientConfig, TcpClientReconnectionConfig, }; -use crate::quic::quic_client::QuicClient; -use crate::tcp::tcp_client::TcpClient; use iggy_binary_protocol::Client; use iggy_common::{AutoLogin, Credentials}; use std::str::FromStr; @@ -46,10 +56,13 @@ pub struct ClientProviderConfig { /// The transport to use. Valid values are `quic`, `http` and `tcp`. pub transport: String, /// The optional configuration for the HTTP transport. + #[cfg(feature = "async")] pub http: Option<Arc<HttpClientConfig>>, /// The optional configuration for the QUIC transport. + #[cfg(feature = "async")] pub quic: Option<Arc<QuicClientConfig>>, /// The optional configuration for the TCP transport. + #[cfg(feature = "async")] pub tcp: Option<Arc<TcpClientConfig>>, } @@ -57,8 +70,11 @@ impl Default for ClientProviderConfig { fn default() -> ClientProviderConfig { ClientProviderConfig { transport: TCP_TRANSPORT.to_string(), + #[cfg(feature = "async")] http: Some(Arc::new(HttpClientConfig::default())), + #[cfg(feature = "async")] quic: Some(Arc::new(QuicClientConfig::default())), + #[cfg(feature = "async")] tcp: Some(Arc::new(TcpClientConfig::default())), } } @@ -79,10 +95,14 @@ impl ClientProviderConfig { let transport = args.transport; let mut config = Self { transport, + #[cfg(feature = "async")] http: None, + #[cfg(feature = "async")] quic: None, + #[cfg(feature = "async")] tcp: None, }; + #[cfg(feature = "async")] match config.transport.as_str() { QUIC_TRANSPORT => { config.quic = Some(Arc::new(QuicClientConfig { @@ -157,21 +177,29 @@ impl ClientProviderConfig { _ => return Err(ClientError::InvalidTransport(config.transport.clone())), } + #[cfg(feature = "sync")] + if config.transport != TCP_TRANSPORT { + return Err(ClientError::InvalidTransport(config.transport.clone())); + } + Ok(config) } } +#[cfg(feature = "async")] /// Create a default `IggyClient` with the default configuration. pub async fn get_default_client_() -> Result<IggyClient, ClientError> { get_client(Arc::new(ClientProviderConfig::default())).await } +#[cfg(feature = "async")] /// Create a `IggyClient` for the specific transport based on the provided configuration. pub async fn get_client(config: Arc<ClientProviderConfig>) -> Result<IggyClient, ClientError> { let client = get_raw_connected_client(config).await?; Ok(IggyClient::builder().with_client(client).build()?) } +#[cfg(feature = "async")] /// Create a `Client` for the specific transport based on the provided configuration. pub async fn get_raw_connected_client( config: Arc<ClientProviderConfig>, @@ -179,6 +207,7 @@ pub async fn get_raw_connected_client( get_raw_client(config, true).await } +#[cfg(feature = "async")] /// Create a `Client` for the specific transport based on the provided configuration. pub async fn get_raw_client( config: Arc<ClientProviderConfig>, diff --git a/core/sdk/src/client_wrappers/binary_client.rs b/core/sdk/src/client_wrappers/binary_client.rs index a4206134..f205e112 100644 --- a/core/sdk/src/client_wrappers/binary_client.rs +++ b/core/sdk/src/client_wrappers/binary_client.rs @@ -16,52 +16,87 @@ * under the License. */ -use crate::client_wrappers::client_wrapper::ClientWrapper; +use crate::client_wrappers::ClientWrapper; use async_broadcast::Receiver; -use async_trait::async_trait; use iggy_binary_protocol::Client; use iggy_common::{DiagnosticEvent, IggyError}; -#[maybe_async::maybe_async] -#[async_trait] -impl Client for ClientWrapper { - async fn connect(&self) -> Result<(), IggyError> { - match self { - ClientWrapper::Iggy(client) => client.connect().await, - ClientWrapper::Http(client) => client.connect().await, - ClientWrapper::Tcp(client) => client.connect().await, - ClientWrapper::TcpSync(client) => client.connect().await, - ClientWrapper::Quic(client) => client.connect().await, +#[cfg(feature = "async")] +pub mod async_impl { + use super::*; + use async_trait::async_trait; + + #[async_trait] + impl Client for ClientWrapper { + async fn connect(&self) -> Result<(), IggyError> { + match self { + ClientWrapper::Iggy(client) => client.connect().await, + ClientWrapper::Http(client) => client.connect().await, + ClientWrapper::Tcp(client) => client.connect().await, + ClientWrapper::Quic(client) => client.connect().await, + } } - } - async fn disconnect(&self) -> Result<(), IggyError> { - match self { - ClientWrapper::Iggy(client) => client.disconnect().await, - ClientWrapper::Http(client) => client.disconnect().await, - ClientWrapper::Tcp(client) => client.disconnect().await, - ClientWrapper::TcpSync(client) => client.disconnect().await, - ClientWrapper::Quic(client) => client.disconnect().await, + async fn disconnect(&self) -> Result<(), IggyError> { + match self { + ClientWrapper::Iggy(client) => client.disconnect().await, + ClientWrapper::Http(client) => client.disconnect().await, + ClientWrapper::Tcp(client) => client.disconnect().await, + ClientWrapper::Quic(client) => client.disconnect().await, + } } - } - async fn shutdown(&self) -> Result<(), IggyError> { - match self { - ClientWrapper::Iggy(client) => client.shutdown().await, - ClientWrapper::Http(client) => client.shutdown().await, - ClientWrapper::Tcp(client) => client.shutdown().await, - ClientWrapper::TcpSync(client) => client.shutdown().await, - ClientWrapper::Quic(client) => client.shutdown().await, + async fn shutdown(&self) -> Result<(), IggyError> { + match self { + ClientWrapper::Iggy(client) => client.shutdown().await, + ClientWrapper::Http(client) => client.shutdown().await, + ClientWrapper::Tcp(client) => client.shutdown().await, + ClientWrapper::Quic(client) => client.shutdown().await, + } } - } - async fn subscribe_events(&self) -> Receiver<DiagnosticEvent> { - match self { - ClientWrapper::Iggy(client) => client.subscribe_events().await, - ClientWrapper::Http(client) => client.subscribe_events().await, - ClientWrapper::Tcp(client) => client.subscribe_events().await, - ClientWrapper::TcpSync(client) => client.subscribe_events().await, - ClientWrapper::Quic(client) => client.subscribe_events().await, + async fn subscribe_events(&self) -> Receiver<DiagnosticEvent> { + match self { + ClientWrapper::Iggy(client) => client.subscribe_events().await, + ClientWrapper::Http(client) => client.subscribe_events().await, + ClientWrapper::Tcp(client) => client.subscribe_events().await, + ClientWrapper::Quic(client) => client.subscribe_events().await, + } } } } + +#[cfg(feature = "sync")] +pub mod sync_impl { + use super::*; + + impl Client for ClientWrapper { + fn connect(&self) -> Result<(), IggyError> { + match self { + ClientWrapper::TcpSync(client) => client.connect(), + ClientWrapper::Iggy(_) => Err(IggyError::InvalidConfiguration), + } + } + + fn disconnect(&self) -> Result<(), IggyError> { + match self { + ClientWrapper::TcpSync(client) => client.disconnect(), + ClientWrapper::Iggy(_) => Err(IggyError::InvalidConfiguration), + } + } + + fn shutdown(&self) -> Result<(), IggyError> { + match self { + ClientWrapper::TcpSync(client) => client.shutdown(), + ClientWrapper::Iggy(_) => Err(IggyError::InvalidConfiguration), + } + } + + fn subscribe_events(&self) -> Receiver<DiagnosticEvent> { + match self { + ClientWrapper::TcpSync(client) => client.subscribe_events(), + ClientWrapper::Iggy(_) => panic!("add here smth"), + } + } + } +} \ No newline at end of file diff --git a/core/sdk/src/client_wrappers/binary_consumer_group_client.rs b/core/sdk/src/client_wrappers/binary_consumer_group_client.rs index 41fbe045..074f7532 100644 --- a/core/sdk/src/client_wrappers/binary_consumer_group_client.rs +++ b/core/sdk/src/client_wrappers/binary_consumer_group_client.rs @@ -16,238 +16,300 @@ * under the License. */ -use crate::client_wrappers::client_wrapper::ClientWrapper; +use crate::client_wrappers::ClientWrapper; use async_dropper::AsyncDrop; -use async_trait::async_trait; use iggy_binary_protocol::{ConsumerGroupClient, UserClient}; use iggy_common::{ConsumerGroup, ConsumerGroupDetails, Identifier, IggyError}; -#[maybe_async::maybe_async] -#[async_trait] -impl ConsumerGroupClient for ClientWrapper { - async fn get_consumer_group( - &self, - stream_id: &Identifier, - topic_id: &Identifier, - group_id: &Identifier, - ) -> Result<Option<ConsumerGroupDetails>, IggyError> { - match self { - ClientWrapper::Iggy(client) => { - client - .get_consumer_group(stream_id, topic_id, group_id) - .await - } - ClientWrapper::Http(client) => { - client - .get_consumer_group(stream_id, topic_id, group_id) - .await - } - ClientWrapper::Tcp(client) => { - client - .get_consumer_group(stream_id, topic_id, group_id) - .await - } - ClientWrapper::TcpSync(client) => { - client - .get_consumer_group(stream_id, topic_id, group_id) - .await - } - ClientWrapper::Quic(client) => { - client - .get_consumer_group(stream_id, topic_id, group_id) - .await +#[cfg(feature = "async")] +pub mod async_impl { + use super::*; + use async_trait::async_trait; + + #[async_trait] + impl ConsumerGroupClient for ClientWrapper { + async fn get_consumer_group( + &self, + stream_id: &Identifier, + topic_id: &Identifier, + group_id: &Identifier, + ) -> Result<Option<ConsumerGroupDetails>, IggyError> { + match self { + ClientWrapper::Iggy(client) => { + client + .get_consumer_group(stream_id, topic_id, group_id) + .await + } + ClientWrapper::Http(client) => { + client + .get_consumer_group(stream_id, topic_id, group_id) + .await + } + ClientWrapper::Tcp(client) => { + client + .get_consumer_group(stream_id, topic_id, group_id) + .await + } + ClientWrapper::Quic(client) => { + client + .get_consumer_group(stream_id, topic_id, group_id) + .await + } } } - } - async fn get_consumer_groups( - &self, - stream_id: &Identifier, - topic_id: &Identifier, - ) -> Result<Vec<ConsumerGroup>, IggyError> { - match self { - ClientWrapper::Iggy(client) => client.get_consumer_groups(stream_id, topic_id).await, - ClientWrapper::Http(client) => client.get_consumer_groups(stream_id, topic_id).await, - ClientWrapper::Tcp(client) => client.get_consumer_groups(stream_id, topic_id).await, - ClientWrapper::TcpSync(client) => client.get_consumer_groups(stream_id, topic_id).await, - ClientWrapper::Quic(client) => client.get_consumer_groups(stream_id, topic_id).await, + async fn get_consumer_groups( + &self, + stream_id: &Identifier, + topic_id: &Identifier, + ) -> Result<Vec<ConsumerGroup>, IggyError> { + match self { + ClientWrapper::Iggy(client) => client.get_consumer_groups(stream_id, topic_id).await, + ClientWrapper::Http(client) => client.get_consumer_groups(stream_id, topic_id).await, + ClientWrapper::Tcp(client) => client.get_consumer_groups(stream_id, topic_id).await, + ClientWrapper::Quic(client) => client.get_consumer_groups(stream_id, topic_id).await, + } } - } - async fn create_consumer_group( - &self, - stream_id: &Identifier, - topic_id: &Identifier, - name: &str, - group_id: Option<u32>, - ) -> Result<ConsumerGroupDetails, IggyError> { - match self { - ClientWrapper::Iggy(client) => { - client - .create_consumer_group(stream_id, topic_id, name, group_id) - .await - } - ClientWrapper::Http(client) => { - client - .create_consumer_group(stream_id, topic_id, name, group_id) - .await - } - ClientWrapper::Tcp(client) => { - client - .create_consumer_group(stream_id, topic_id, name, group_id) - .await - } - ClientWrapper::TcpSync(client) => { - client - .create_consumer_group(stream_id, topic_id, name, group_id) - .await - } - ClientWrapper::Quic(client) => { - client - .create_consumer_group(stream_id, topic_id, name, group_id) - .await + async fn create_consumer_group( + &self, + stream_id: &Identifier, + topic_id: &Identifier, + name: &str, + group_id: Option<u32>, + ) -> Result<ConsumerGroupDetails, IggyError> { + match self { + ClientWrapper::Iggy(client) => { + client + .create_consumer_group(stream_id, topic_id, name, group_id) + .await + } + ClientWrapper::Http(client) => { + client + .create_consumer_group(stream_id, topic_id, name, group_id) + .await + } + ClientWrapper::Tcp(client) => { + client + .create_consumer_group(stream_id, topic_id, name, group_id) + .await + } + ClientWrapper::Quic(client) => { + client + .create_consumer_group(stream_id, topic_id, name, group_id) + .await + } } } - } - async fn delete_consumer_group( - &self, - stream_id: &Identifier, - topic_id: &Identifier, - group_id: &Identifier, - ) -> Result<(), IggyError> { - match self { - ClientWrapper::Iggy(client) => { - client - .delete_consumer_group(stream_id, topic_id, group_id) - .await - } - ClientWrapper::Http(client) => { - client - .delete_consumer_group(stream_id, topic_id, group_id) - .await - } - ClientWrapper::Tcp(client) => { - client - .delete_consumer_group(stream_id, topic_id, group_id) - .await - } - ClientWrapper::TcpSync(client) => { - client - .delete_consumer_group(stream_id, topic_id, group_id) - .await - } - ClientWrapper::Quic(client) => { - client - .delete_consumer_group(stream_id, topic_id, group_id) - .await + async fn delete_consumer_group( + &self, + stream_id: &Identifier, + topic_id: &Identifier, + group_id: &Identifier, + ) -> Result<(), IggyError> { + match self { + ClientWrapper::Iggy(client) => { + client + .delete_consumer_group(stream_id, topic_id, group_id) + .await + } + ClientWrapper::Http(client) => { + client + .delete_consumer_group(stream_id, topic_id, group_id) + .await + } + ClientWrapper::Tcp(client) => { + client + .delete_consumer_group(stream_id, topic_id, group_id) + .await + } + ClientWrapper::Quic(client) => { + client + .delete_consumer_group(stream_id, topic_id, group_id) + .await + } } } - } - async fn join_consumer_group( - &self, - stream_id: &Identifier, - topic_id: &Identifier, - group_id: &Identifier, - ) -> Result<(), IggyError> { - match self { - ClientWrapper::Iggy(client) => { - client - .join_consumer_group(stream_id, topic_id, group_id) - .await - } - ClientWrapper::Http(client) => { - client - .join_consumer_group(stream_id, topic_id, group_id) - .await - } - ClientWrapper::Tcp(client) => { - client - .join_consumer_group(stream_id, topic_id, group_id) - .await + async fn join_consumer_group( + &self, + stream_id: &Identifier, + topic_id: &Identifier, + group_id: &Identifier, + ) -> Result<(), IggyError> { + match self { + ClientWrapper::Iggy(client) => { + client + .join_consumer_group(stream_id, topic_id, group_id) + .await + } + ClientWrapper::Http(client) => { + client + .join_consumer_group(stream_id, topic_id, group_id) + .await + } + ClientWrapper::Tcp(client) => { + client + .join_consumer_group(stream_id, topic_id, group_id) + .await + } + ClientWrapper::Quic(client) => { + client + .join_consumer_group(stream_id, topic_id, group_id) + .await + } } - ClientWrapper::TcpSync(client) => { - client - .join_consumer_group(stream_id, topic_id, group_id) - .await - } - ClientWrapper::Quic(client) => { - client - .join_consumer_group(stream_id, topic_id, group_id) - .await + } + + async fn leave_consumer_group( + &self, + stream_id: &Identifier, + topic_id: &Identifier, + group_id: &Identifier, + ) -> Result<(), IggyError> { + match self { + ClientWrapper::Iggy(client) => { + client + .leave_consumer_group(stream_id, topic_id, group_id) + .await + } + ClientWrapper::Http(client) => { + client + .leave_consumer_group(stream_id, topic_id, group_id) + .await + } + ClientWrapper::Tcp(client) => { + client + .leave_consumer_group(stream_id, topic_id, group_id) + .await + } + ClientWrapper::Quic(client) => { + client + .leave_consumer_group(stream_id, topic_id, group_id) + .await + } } } } - async fn leave_consumer_group( - &self, - stream_id: &Identifier, - topic_id: &Identifier, - group_id: &Identifier, - ) -> Result<(), IggyError> { - match self { - ClientWrapper::Iggy(client) => { - client - .leave_consumer_group(stream_id, topic_id, group_id) - .await - } - ClientWrapper::Http(client) => { - client - .leave_consumer_group(stream_id, topic_id, group_id) - .await - } - ClientWrapper::Tcp(client) => { - client - .leave_consumer_group(stream_id, topic_id, group_id) - .await - } - ClientWrapper::TcpSync(client) => { - client - .leave_consumer_group(stream_id, topic_id, group_id) - .await - } - ClientWrapper::Quic(client) => { - client - .leave_consumer_group(stream_id, topic_id, group_id) - .await + #[async_trait] + impl AsyncDrop for ClientWrapper { + async fn async_drop(&mut self) { + match self { + ClientWrapper::Iggy(client) => { + let _ = client.logout_user().await; + } + ClientWrapper::Http(client) => { + let _ = client.logout_user().await; + } + ClientWrapper::Tcp(client) => { + let _ = client.logout_user().await; + } + ClientWrapper::Quic(client) => { + let _ = client.logout_user().await; + } } } } } -#[cfg(feature = "async")] -#[async_trait] -impl AsyncDrop for ClientWrapper { - async fn async_drop(&mut self) { - match self { - ClientWrapper::Iggy(client) => { - let _ = client.logout_user().await; +#[cfg(feature = "sync")] +pub mod sync_impl { + use super::*; + + impl ConsumerGroupClient for ClientWrapper { + fn get_consumer_group( + &self, + stream_id: &Identifier, + topic_id: &Identifier, + group_id: &Identifier, + ) -> Result<Option<ConsumerGroupDetails>, IggyError> { + match self { + ClientWrapper::TcpSync(client) => { + client.get_consumer_group(stream_id, topic_id, group_id) + } + ClientWrapper::Iggy(_) => Err(IggyError::InvalidConfiguration), + } + } + + fn get_consumer_groups( + &self, + stream_id: &Identifier, + topic_id: &Identifier, + ) -> Result<Vec<ConsumerGroup>, IggyError> { + match self { + ClientWrapper::TcpSync(client) => client.get_consumer_groups(stream_id, topic_id), + ClientWrapper::Iggy(_) => Err(IggyError::InvalidConfiguration), } - ClientWrapper::Http(client) => { - let _ = client.logout_user().await; + } + + fn create_consumer_group( + &self, + stream_id: &Identifier, + topic_id: &Identifier, + name: &str, + group_id: Option<u32>, + ) -> Result<ConsumerGroupDetails, IggyError> { + match self { + ClientWrapper::TcpSync(client) => { + client.create_consumer_group(stream_id, topic_id, name, group_id) + } + ClientWrapper::Iggy(_) => Err(IggyError::InvalidConfiguration), } - ClientWrapper::Tcp(client) => { - let _ = client.logout_user().await; + } + + fn delete_consumer_group( + &self, + stream_id: &Identifier, + topic_id: &Identifier, + group_id: &Identifier, + ) -> Result<(), IggyError> { + match self { + ClientWrapper::TcpSync(client) => { + client.delete_consumer_group(stream_id, topic_id, group_id) + } + ClientWrapper::Iggy(_) => Err(IggyError::InvalidConfiguration), } - ClientWrapper::TcpSync(client) => { - let _ = client.logout_user().await; + } + + fn join_consumer_group( + &self, + stream_id: &Identifier, + topic_id: &Identifier, + group_id: &Identifier, + ) -> Result<(), IggyError> { + match self { + ClientWrapper::TcpSync(client) => { + client.join_consumer_group(stream_id, topic_id, group_id) + } + ClientWrapper::Iggy(_) => Err(IggyError::InvalidConfiguration), } - ClientWrapper::Quic(client) => { - let _ = client.logout_user().await; + } + + fn leave_consumer_group( + &self, + stream_id: &Identifier, + topic_id: &Identifier, + group_id: &Identifier, + ) -> Result<(), IggyError> { + match self { + ClientWrapper::TcpSync(client) => { + client.leave_consumer_group(stream_id, topic_id, group_id) + } + ClientWrapper::Iggy(_) => Err(IggyError::InvalidConfiguration), } } } -} -#[cfg(feature = "sync")] -impl Drop for ClientWrapper { - fn drop(&mut self) { - match self { - ClientWrapper::TcpSync(client) => { - let _ = client.logout_user(); + impl Drop for ClientWrapper { + fn drop(&mut self) { + match self { + ClientWrapper::TcpSync(client) => { + let _ = client.logout_user(); + } + _ => return } - _ => return } } -} +} \ No newline at end of file diff --git a/core/sdk/src/client_wrappers/binary_consumer_offset_client.rs b/core/sdk/src/client_wrappers/binary_consumer_offset_client.rs index f7c6a9fe..f545ce1d 100644 --- a/core/sdk/src/client_wrappers/binary_consumer_offset_client.rs +++ b/core/sdk/src/client_wrappers/binary_consumer_offset_client.rs @@ -16,120 +16,162 @@ * under the License. */ -use crate::client_wrappers::client_wrapper::ClientWrapper; -use async_trait::async_trait; +use crate::client_wrappers::ClientWrapper; use iggy_binary_protocol::ConsumerOffsetClient; use iggy_common::{Consumer, ConsumerOffsetInfo, Identifier, IggyError}; -#[maybe_async::maybe_async] -#[async_trait] -impl ConsumerOffsetClient for ClientWrapper { - async fn store_consumer_offset( - &self, - consumer: &Consumer, - stream_id: &Identifier, - topic_id: &Identifier, - partition_id: Option<u32>, - offset: u64, - ) -> Result<(), IggyError> { - match self { - ClientWrapper::Iggy(client) => { - client - .store_consumer_offset(consumer, stream_id, topic_id, partition_id, offset) - .await - } - ClientWrapper::Http(client) => { - client - .store_consumer_offset(consumer, stream_id, topic_id, partition_id, offset) - .await - } - ClientWrapper::Tcp(client) => { - client - .store_consumer_offset(consumer, stream_id, topic_id, partition_id, offset) - .await - } - ClientWrapper::TcpSync(client) => { - client - .store_consumer_offset(consumer, stream_id, topic_id, partition_id, offset) - .await - } - ClientWrapper::Quic(client) => { - client - .store_consumer_offset(consumer, stream_id, topic_id, partition_id, offset) - .await +#[cfg(feature = "async")] +pub mod async_impl { + use super::*; + use async_trait::async_trait; + + #[async_trait] + impl ConsumerOffsetClient for ClientWrapper { + async fn store_consumer_offset( + &self, + consumer: &Consumer, + stream_id: &Identifier, + topic_id: &Identifier, + partition_id: Option<u32>, + offset: u64, + ) -> Result<(), IggyError> { + match self { + ClientWrapper::Iggy(client) => { + client + .store_consumer_offset(consumer, stream_id, topic_id, partition_id, offset) + .await + } + ClientWrapper::Http(client) => { + client + .store_consumer_offset(consumer, stream_id, topic_id, partition_id, offset) + .await + } + ClientWrapper::Tcp(client) => { + client + .store_consumer_offset(consumer, stream_id, topic_id, partition_id, offset) + .await + } + ClientWrapper::Quic(client) => { + client + .store_consumer_offset(consumer, stream_id, topic_id, partition_id, offset) + .await + } } } - } - async fn get_consumer_offset( - &self, - consumer: &Consumer, - stream_id: &Identifier, - topic_id: &Identifier, - partition_id: Option<u32>, - ) -> Result<Option<ConsumerOffsetInfo>, IggyError> { - match self { - ClientWrapper::Iggy(client) => { - client - .get_consumer_offset(consumer, stream_id, topic_id, partition_id) - .await - } - ClientWrapper::Http(client) => { - client - .get_consumer_offset(consumer, stream_id, topic_id, partition_id) - .await - } - ClientWrapper::Tcp(client) => { - client - .get_consumer_offset(consumer, stream_id, topic_id, partition_id) - .await + async fn get_consumer_offset( + &self, + consumer: &Consumer, + stream_id: &Identifier, + topic_id: &Identifier, + partition_id: Option<u32>, + ) -> Result<Option<ConsumerOffsetInfo>, IggyError> { + match self { + ClientWrapper::Iggy(client) => { + client + .get_consumer_offset(consumer, stream_id, topic_id, partition_id) + .await + } + ClientWrapper::Http(client) => { + client + .get_consumer_offset(consumer, stream_id, topic_id, partition_id) + .await + } + ClientWrapper::Tcp(client) => { + client + .get_consumer_offset(consumer, stream_id, topic_id, partition_id) + .await + } + ClientWrapper::Quic(client) => { + client + .get_consumer_offset(consumer, stream_id, topic_id, partition_id) + .await + } } - ClientWrapper::TcpSync(client) => { - client - .get_consumer_offset(consumer, stream_id, topic_id, partition_id) - .await - } - ClientWrapper::Quic(client) => { - client - .get_consumer_offset(consumer, stream_id, topic_id, partition_id) - .await + } + + async fn delete_consumer_offset( + &self, + consumer: &Consumer, + stream_id: &Identifier, + topic_id: &Identifier, + partition_id: Option<u32>, + ) -> Result<(), IggyError> { + match self { + ClientWrapper::Iggy(client) => { + client + .delete_consumer_offset(consumer, stream_id, topic_id, partition_id) + .await + } + ClientWrapper::Http(client) => { + client + .delete_consumer_offset(consumer, stream_id, topic_id, partition_id) + .await + } + ClientWrapper::Tcp(client) => { + client + .delete_consumer_offset(consumer, stream_id, topic_id, partition_id) + .await + } + ClientWrapper::Quic(client) => { + client + .delete_consumer_offset(consumer, stream_id, topic_id, partition_id) + .await + } } } } +} - async fn delete_consumer_offset( - &self, - consumer: &Consumer, - stream_id: &Identifier, - topic_id: &Identifier, - partition_id: Option<u32>, - ) -> Result<(), IggyError> { - match self { - ClientWrapper::Iggy(client) => { - client - .delete_consumer_offset(consumer, stream_id, topic_id, partition_id) - .await - } - ClientWrapper::Http(client) => { - client - .delete_consumer_offset(consumer, stream_id, topic_id, partition_id) - .await - } - ClientWrapper::Tcp(client) => { - client - .delete_consumer_offset(consumer, stream_id, topic_id, partition_id) - .await +#[cfg(feature = "sync")] +pub mod sync_impl { + use super::*; + + impl ConsumerOffsetClient for ClientWrapper { + fn store_consumer_offset( + &self, + consumer: &Consumer, + stream_id: &Identifier, + topic_id: &Identifier, + partition_id: Option<u32>, + offset: u64, + ) -> Result<(), IggyError> { + match self { + ClientWrapper::TcpSync(client) => { + client.store_consumer_offset(consumer, stream_id, topic_id, partition_id, offset) + } + ClientWrapper::Iggy(_) => Err(IggyError::InvalidConfiguration), } - ClientWrapper::TcpSync(client) => { - client - .delete_consumer_offset(consumer, stream_id, topic_id, partition_id) - .await + } + + fn get_consumer_offset( + &self, + consumer: &Consumer, + stream_id: &Identifier, + topic_id: &Identifier, + partition_id: Option<u32>, + ) -> Result<Option<ConsumerOffsetInfo>, IggyError> { + match self { + ClientWrapper::TcpSync(client) => { + client.get_consumer_offset(consumer, stream_id, topic_id, partition_id) + } + ClientWrapper::Iggy(_) => Err(IggyError::InvalidConfiguration), } - ClientWrapper::Quic(client) => { - client - .delete_consumer_offset(consumer, stream_id, topic_id, partition_id) - .await + } + + fn delete_consumer_offset( + &self, + consumer: &Consumer, + stream_id: &Identifier, + topic_id: &Identifier, + partition_id: Option<u32>, + ) -> Result<(), IggyError> { + match self { + ClientWrapper::TcpSync(client) => { + client.delete_consumer_offset(consumer, stream_id, topic_id, partition_id) + } + ClientWrapper::Iggy(_) => Err(IggyError::InvalidConfiguration), } } } -} +} \ No newline at end of file diff --git a/core/sdk/src/client_wrappers/binary_message_client.rs b/core/sdk/src/client_wrappers/binary_message_client.rs index f1908de2..bc6ae563 100644 --- a/core/sdk/src/client_wrappers/binary_message_client.rs +++ b/core/sdk/src/client_wrappers/binary_message_client.rs @@ -16,82 +16,167 @@ * under the License. */ -use crate::client_wrappers::client_wrapper::ClientWrapper; -use async_trait::async_trait; +use crate::client_wrappers::ClientWrapper; use iggy_binary_protocol::MessageClient; use iggy_common::{ Consumer, Identifier, IggyError, IggyMessage, Partitioning, PolledMessages, PollingStrategy, }; -#[maybe_async::maybe_async] -#[async_trait] -impl MessageClient for ClientWrapper { - async fn poll_messages( - &self, - stream_id: &Identifier, - topic_id: &Identifier, - partition_id: Option<u32>, - consumer: &Consumer, - strategy: &PollingStrategy, - count: u32, - auto_commit: bool, - ) -> Result<PolledMessages, IggyError> { - match self { - ClientWrapper::Iggy(client) => { - client - .poll_messages( - stream_id, - topic_id, - partition_id, - consumer, - strategy, - count, - auto_commit, - ) - .await - } - ClientWrapper::Http(client) => { - client - .poll_messages( - stream_id, - topic_id, - partition_id, - consumer, - strategy, - count, - auto_commit, - ) - .await +#[cfg(feature = "async")] +pub mod async_impl { + use super::*; + use async_trait::async_trait; + + #[async_trait] + impl MessageClient for ClientWrapper { + async fn poll_messages( + &self, + stream_id: &Identifier, + topic_id: &Identifier, + partition_id: Option<u32>, + consumer: &Consumer, + strategy: &PollingStrategy, + count: u32, + auto_commit: bool, + ) -> Result<PolledMessages, IggyError> { + match self { + ClientWrapper::Iggy(client) => { + client + .poll_messages( + stream_id, + topic_id, + partition_id, + consumer, + strategy, + count, + auto_commit, + ) + .await + } + ClientWrapper::Http(client) => { + client + .poll_messages( + stream_id, + topic_id, + partition_id, + consumer, + strategy, + count, + auto_commit, + ) + .await + } + ClientWrapper::Tcp(client) => { + client + .poll_messages( + stream_id, + topic_id, + partition_id, + consumer, + strategy, + count, + auto_commit, + ) + .await + } + ClientWrapper::Quic(client) => { + client + .poll_messages( + stream_id, + topic_id, + partition_id, + consumer, + strategy, + count, + auto_commit, + ) + .await + } } - ClientWrapper::Tcp(client) => { - client - .poll_messages( - stream_id, - topic_id, - partition_id, - consumer, - strategy, - count, - auto_commit, - ) - .await + } + + async fn send_messages( + &self, + stream_id: &Identifier, + topic_id: &Identifier, + partitioning: &Partitioning, + messages: &mut [IggyMessage], + ) -> Result<(), IggyError> { + match self { + ClientWrapper::Iggy(client) => { + client + .send_messages(stream_id, topic_id, partitioning, messages) + .await + } + ClientWrapper::Http(client) => { + client + .send_messages(stream_id, topic_id, partitioning, messages) + .await + } + ClientWrapper::Tcp(client) => { + client + .send_messages(stream_id, topic_id, partitioning, messages) + .await + } + ClientWrapper::Quic(client) => { + client + .send_messages(stream_id, topic_id, partitioning, messages) + .await + } } - ClientWrapper::TcpSync(client) => { - client - .poll_messages( - stream_id, - topic_id, - partition_id, - consumer, - strategy, - count, - auto_commit, - ) - .await + } + + async fn flush_unsaved_buffer( + &self, + stream_id: &Identifier, + topic_id: &Identifier, + partitioning_id: u32, + fsync: bool, + ) -> Result<(), IggyError> { + match self { + ClientWrapper::Iggy(client) => { + client + .flush_unsaved_buffer(stream_id, topic_id, partitioning_id, fsync) + .await + } + ClientWrapper::Http(client) => { + client + .flush_unsaved_buffer(stream_id, topic_id, partitioning_id, fsync) + .await + } + ClientWrapper::Tcp(client) => { + client + .flush_unsaved_buffer(stream_id, topic_id, partitioning_id, fsync) + .await + } + ClientWrapper::Quic(client) => { + client + .flush_unsaved_buffer(stream_id, topic_id, partitioning_id, fsync) + .await + } } - ClientWrapper::Quic(client) => { - client - .poll_messages( + } + } +} + +#[cfg(feature = "sync")] +pub mod sync_impl { + use super::*; + + impl MessageClient for ClientWrapper { + fn poll_messages( + &self, + stream_id: &Identifier, + topic_id: &Identifier, + partition_id: Option<u32>, + consumer: &Consumer, + strategy: &PollingStrategy, + count: u32, + auto_commit: bool, + ) -> Result<PolledMessages, IggyError> { + match self { + ClientWrapper::TcpSync(client) => { + client.poll_messages( stream_id, topic_id, partition_id, @@ -100,80 +185,39 @@ impl MessageClient for ClientWrapper { count, auto_commit, ) - .await + } + ClientWrapper::Iggy(_) => Err(IggyError::InvalidConfiguration), } } - } - async fn send_messages( - &self, - stream_id: &Identifier, - topic_id: &Identifier, - partitioning: &Partitioning, - messages: &mut [IggyMessage], - ) -> Result<(), IggyError> { - match self { - ClientWrapper::Iggy(client) => { - client - .send_messages(stream_id, topic_id, partitioning, messages) - .await - } - ClientWrapper::Http(client) => { - client - .send_messages(stream_id, topic_id, partitioning, messages) - .await - } - ClientWrapper::Tcp(client) => { - client - .send_messages(stream_id, topic_id, partitioning, messages) - .await - } - ClientWrapper::TcpSync(client) => { - client - .send_messages(stream_id, topic_id, partitioning, messages) - .await - } - ClientWrapper::Quic(client) => { - client - .send_messages(stream_id, topic_id, partitioning, messages) - .await + fn send_messages( + &self, + stream_id: &Identifier, + topic_id: &Identifier, + partitioning: &Partitioning, + messages: &mut [IggyMessage], + ) -> Result<(), IggyError> { + match self { + ClientWrapper::TcpSync(client) => { + client.send_messages(stream_id, topic_id, partitioning, messages) + } + ClientWrapper::Iggy(_) => Err(IggyError::InvalidConfiguration), } } - } - async fn flush_unsaved_buffer( - &self, - stream_id: &Identifier, - topic_id: &Identifier, - partitioning_id: u32, - fsync: bool, - ) -> Result<(), IggyError> { - match self { - ClientWrapper::Iggy(client) => { - client - .flush_unsaved_buffer(stream_id, topic_id, partitioning_id, fsync) - .await - } - ClientWrapper::Http(client) => { - client - .flush_unsaved_buffer(stream_id, topic_id, partitioning_id, fsync) - .await - } - ClientWrapper::Tcp(client) => { - client - .flush_unsaved_buffer(stream_id, topic_id, partitioning_id, fsync) - .await - } - ClientWrapper::TcpSync(client) => { - client - .flush_unsaved_buffer(stream_id, topic_id, partitioning_id, fsync) - .await - } - ClientWrapper::Quic(client) => { - client - .flush_unsaved_buffer(stream_id, topic_id, partitioning_id, fsync) - .await + fn flush_unsaved_buffer( + &self, + stream_id: &Identifier, + topic_id: &Identifier, + partitioning_id: u32, + fsync: bool, + ) -> Result<(), IggyError> { + match self { + ClientWrapper::TcpSync(client) => { + client.flush_unsaved_buffer(stream_id, topic_id, partitioning_id, fsync) + } + ClientWrapper::Iggy(_) => Err(IggyError::InvalidConfiguration), } } } -} +} \ No newline at end of file diff --git a/core/sdk/src/client_wrappers/binary_partition_client.rs b/core/sdk/src/client_wrappers/binary_partition_client.rs index fc6fcce7..fe1d1cb9 100644 --- a/core/sdk/src/client_wrappers/binary_partition_client.rs +++ b/core/sdk/src/client_wrappers/binary_partition_client.rs @@ -16,81 +16,110 @@ * under the License. */ -use crate::client_wrappers::client_wrapper::ClientWrapper; -use async_trait::async_trait; +use crate::client_wrappers::ClientWrapper; use iggy_binary_protocol::PartitionClient; use iggy_common::{Identifier, IggyError}; -#[maybe_async::maybe_async] -#[async_trait] -impl PartitionClient for ClientWrapper { - async fn create_partitions( - &self, - stream_id: &Identifier, - topic_id: &Identifier, - partitions_count: u32, - ) -> Result<(), IggyError> { - match self { - ClientWrapper::Iggy(client) => { - client - .create_partitions(stream_id, topic_id, partitions_count) - .await - } - ClientWrapper::Http(client) => { - client - .create_partitions(stream_id, topic_id, partitions_count) - .await - } - ClientWrapper::Tcp(client) => { - client - .create_partitions(stream_id, topic_id, partitions_count) - .await - } - ClientWrapper::Quic(client) => { - client - .create_partitions(stream_id, topic_id, partitions_count) - .await +#[cfg(feature = "async")] +pub mod async_impl { + use super::*; + use async_trait::async_trait; + + #[async_trait] + impl PartitionClient for ClientWrapper { + async fn create_partitions( + &self, + stream_id: &Identifier, + topic_id: &Identifier, + partitions_count: u32, + ) -> Result<(), IggyError> { + match self { + ClientWrapper::Iggy(client) => { + client + .create_partitions(stream_id, topic_id, partitions_count) + .await + } + ClientWrapper::Http(client) => { + client + .create_partitions(stream_id, topic_id, partitions_count) + .await + } + ClientWrapper::Tcp(client) => { + client + .create_partitions(stream_id, topic_id, partitions_count) + .await + } + ClientWrapper::Quic(client) => { + client + .create_partitions(stream_id, topic_id, partitions_count) + .await + } } - ClientWrapper::TcpSync(client) => { - client - .create_partitions(stream_id, topic_id, partitions_count) - .await + } + + async fn delete_partitions( + &self, + stream_id: &Identifier, + topic_id: &Identifier, + partitions_count: u32, + ) -> Result<(), IggyError> { + match self { + ClientWrapper::Iggy(client) => { + client + .delete_partitions(stream_id, topic_id, partitions_count) + .await + } + ClientWrapper::Http(client) => { + client + .delete_partitions(stream_id, topic_id, partitions_count) + .await + } + ClientWrapper::Tcp(client) => { + client + .delete_partitions(stream_id, topic_id, partitions_count) + .await + } + ClientWrapper::Quic(client) => { + client + .delete_partitions(stream_id, topic_id, partitions_count) + .await + } } } } +} - async fn delete_partitions( - &self, - stream_id: &Identifier, - topic_id: &Identifier, - partitions_count: u32, - ) -> Result<(), IggyError> { - match self { - ClientWrapper::Iggy(client) => { - client - .delete_partitions(stream_id, topic_id, partitions_count) - .await - } - ClientWrapper::Http(client) => { - client - .delete_partitions(stream_id, topic_id, partitions_count) - .await - } - ClientWrapper::Tcp(client) => { - client - .delete_partitions(stream_id, topic_id, partitions_count) - .await - } - ClientWrapper::Quic(client) => { - client - .delete_partitions(stream_id, topic_id, partitions_count) - .await +#[cfg(feature = "sync")] +pub mod sync_impl { + use super::*; + + impl PartitionClient for ClientWrapper { + fn create_partitions( + &self, + stream_id: &Identifier, + topic_id: &Identifier, + partitions_count: u32, + ) -> Result<(), IggyError> { + match self { + ClientWrapper::TcpSync(client) => { + client.create_partitions(stream_id, topic_id, partitions_count) + } + ClientWrapper::Iggy(_) => Err(IggyError::InvalidConfiguration), } - ClientWrapper::TcpSync(client) => { - client - .delete_partitions(stream_id, topic_id, partitions_count) - .await + } + + fn delete_partitions( + &self, + stream_id: &Identifier, + topic_id: &Identifier, + partitions_count: u32, + ) -> Result<(), IggyError> { + match self { + ClientWrapper::TcpSync(client) => { + client.delete_partitions(stream_id, topic_id, partitions_count) + } + ClientWrapper::Iggy(_) => Err(IggyError::InvalidConfiguration), } } } -} +} \ No newline at end of file diff --git a/core/sdk/src/client_wrappers/binary_personal_access_token_client.rs b/core/sdk/src/client_wrappers/binary_personal_access_token_client.rs index 119c9204..6efca014 100644 --- a/core/sdk/src/client_wrappers/binary_personal_access_token_client.rs +++ b/core/sdk/src/client_wrappers/binary_personal_access_token_client.rs @@ -16,61 +16,103 @@ * under the License. */ -use crate::client_wrappers::client_wrapper::ClientWrapper; -use async_trait::async_trait; +use crate::client_wrappers::ClientWrapper; use iggy_binary_protocol::PersonalAccessTokenClient; use iggy_common::{ IdentityInfo, IggyError, PersonalAccessTokenExpiry, PersonalAccessTokenInfo, RawPersonalAccessToken, }; -#[maybe_async::maybe_async] -#[async_trait] -impl PersonalAccessTokenClient for ClientWrapper { - async fn get_personal_access_tokens(&self) -> Result<Vec<PersonalAccessTokenInfo>, IggyError> { - match self { - ClientWrapper::Iggy(client) => client.get_personal_access_tokens().await, - ClientWrapper::Http(client) => client.get_personal_access_tokens().await, - ClientWrapper::Tcp(client) => client.get_personal_access_tokens().await, - ClientWrapper::Quic(client) => client.get_personal_access_tokens().await, - ClientWrapper::TcpSync(client) => client.get_personal_access_tokens().await, +#[cfg(feature = "async")] +pub mod async_impl { + use super::*; + use async_trait::async_trait; + + #[async_trait] + impl PersonalAccessTokenClient for ClientWrapper { + async fn get_personal_access_tokens(&self) -> Result<Vec<PersonalAccessTokenInfo>, IggyError> { + match self { + ClientWrapper::Iggy(client) => client.get_personal_access_tokens().await, + ClientWrapper::Http(client) => client.get_personal_access_tokens().await, + ClientWrapper::Tcp(client) => client.get_personal_access_tokens().await, + ClientWrapper::Quic(client) => client.get_personal_access_tokens().await, + } } - } - async fn create_personal_access_token( - &self, - name: &str, - expiry: PersonalAccessTokenExpiry, - ) -> Result<RawPersonalAccessToken, IggyError> { - match self { - ClientWrapper::Iggy(client) => client.create_personal_access_token(name, expiry).await, - ClientWrapper::Http(client) => client.create_personal_access_token(name, expiry).await, - ClientWrapper::Tcp(client) => client.create_personal_access_token(name, expiry).await, - ClientWrapper::Quic(client) => client.create_personal_access_token(name, expiry).await, - ClientWrapper::TcpSync(client) => client.create_personal_access_token(name, expiry).await, + async fn create_personal_access_token( + &self, + name: &str, + expiry: PersonalAccessTokenExpiry, + ) -> Result<RawPersonalAccessToken, IggyError> { + match self { + ClientWrapper::Iggy(client) => client.create_personal_access_token(name, expiry).await, + ClientWrapper::Http(client) => client.create_personal_access_token(name, expiry).await, + ClientWrapper::Tcp(client) => client.create_personal_access_token(name, expiry).await, + ClientWrapper::Quic(client) => client.create_personal_access_token(name, expiry).await, + } } - } - async fn delete_personal_access_token(&self, name: &str) -> Result<(), IggyError> { - match self { - ClientWrapper::Iggy(client) => client.delete_personal_access_token(name).await, - ClientWrapper::Http(client) => client.delete_personal_access_token(name).await, - ClientWrapper::Tcp(client) => client.delete_personal_access_token(name).await, - ClientWrapper::Quic(client) => client.delete_personal_access_token(name).await, - ClientWrapper::TcpSync(client) => client.delete_personal_access_token(name).await, + async fn delete_personal_access_token(&self, name: &str) -> Result<(), IggyError> { + match self { + ClientWrapper::Iggy(client) => client.delete_personal_access_token(name).await, + ClientWrapper::Http(client) => client.delete_personal_access_token(name).await, + ClientWrapper::Tcp(client) => client.delete_personal_access_token(name).await, + ClientWrapper::Quic(client) => client.delete_personal_access_token(name).await, + } } - } - async fn login_with_personal_access_token( - &self, - token: &str, - ) -> Result<IdentityInfo, IggyError> { - match self { - ClientWrapper::Iggy(client) => client.login_with_personal_access_token(token).await, - ClientWrapper::Http(client) => client.login_with_personal_access_token(token).await, - ClientWrapper::Tcp(client) => client.login_with_personal_access_token(token).await, - ClientWrapper::Quic(client) => client.login_with_personal_access_token(token).await, - ClientWrapper::TcpSync(client) => client.login_with_personal_access_token(token).await, + async fn login_with_personal_access_token( + &self, + token: &str, + ) -> Result<IdentityInfo, IggyError> { + match self { + ClientWrapper::Iggy(client) => client.login_with_personal_access_token(token).await, + ClientWrapper::Http(client) => client.login_with_personal_access_token(token).await, + ClientWrapper::Tcp(client) => client.login_with_personal_access_token(token).await, + ClientWrapper::Quic(client) => client.login_with_personal_access_token(token).await, + } } } } + +#[cfg(feature = "sync")] +pub mod sync_impl { + use super::*; + + impl PersonalAccessTokenClient for ClientWrapper { + fn get_personal_access_tokens(&self) -> Result<Vec<PersonalAccessTokenInfo>, IggyError> { + match self { + ClientWrapper::TcpSync(client) => client.get_personal_access_tokens(), + ClientWrapper::Iggy(_) => Err(IggyError::InvalidConfiguration), + } + } + + fn create_personal_access_token( + &self, + name: &str, + expiry: PersonalAccessTokenExpiry, + ) -> Result<RawPersonalAccessToken, IggyError> { + match self { + ClientWrapper::TcpSync(client) => client.create_personal_access_token(name, expiry), + ClientWrapper::Iggy(_) => Err(IggyError::InvalidConfiguration), + } + } + + fn delete_personal_access_token(&self, name: &str) -> Result<(), IggyError> { + match self { + ClientWrapper::TcpSync(client) => client.delete_personal_access_token(name), + ClientWrapper::Iggy(_) => Err(IggyError::InvalidConfiguration), + } + } + + fn login_with_personal_access_token( + &self, + token: &str, + ) -> Result<IdentityInfo, IggyError> { + match self { + ClientWrapper::TcpSync(client) => client.login_with_personal_access_token(token), + ClientWrapper::Iggy(_) => Err(IggyError::InvalidConfiguration), + } + } + } +} \ No newline at end of file diff --git a/core/sdk/src/client_wrappers/binary_segment_client.rs b/core/sdk/src/client_wrappers/binary_segment_client.rs index b42e76fe..1ffcd543 100644 --- a/core/sdk/src/client_wrappers/binary_segment_client.rs +++ b/core/sdk/src/client_wrappers/binary_segment_client.rs @@ -16,47 +16,68 @@ * under the License. */ -use crate::client_wrappers::client_wrapper::ClientWrapper; -use async_trait::async_trait; +use crate::client_wrappers::ClientWrapper; use iggy_binary_protocol::SegmentClient; use iggy_common::{Identifier, IggyError}; -#[maybe_async::maybe_async] -#[async_trait] -impl SegmentClient for ClientWrapper { - async fn delete_segments( - &self, - stream_id: &Identifier, - topic_id: &Identifier, - partition_id: u32, - segments_count: u32, - ) -> Result<(), IggyError> { - match self { - ClientWrapper::Iggy(client) => { - client - .delete_segments(stream_id, topic_id, partition_id, segments_count) - .await - } - ClientWrapper::Http(client) => { - client - .delete_segments(stream_id, topic_id, partition_id, segments_count) - .await - } - ClientWrapper::Tcp(client) => { - client - .delete_segments(stream_id, topic_id, partition_id, segments_count) - .await - } - ClientWrapper::Quic(client) => { - client - .delete_segments(stream_id, topic_id, partition_id, segments_count) - .await - } - ClientWrapper::TcpSync(client) => { - client - .delete_segments(stream_id, topic_id, partition_id, segments_count) - .await +#[cfg(feature = "async")] +pub mod async_impl { + use super::*; + use async_trait::async_trait; + + #[async_trait] + impl SegmentClient for ClientWrapper { + async fn delete_segments( + &self, + stream_id: &Identifier, + topic_id: &Identifier, + partition_id: u32, + segments_count: u32, + ) -> Result<(), IggyError> { + match self { + ClientWrapper::Iggy(client) => { + client + .delete_segments(stream_id, topic_id, partition_id, segments_count) + .await + } + ClientWrapper::Http(client) => { + client + .delete_segments(stream_id, topic_id, partition_id, segments_count) + .await + } + ClientWrapper::Tcp(client) => { + client + .delete_segments(stream_id, topic_id, partition_id, segments_count) + .await + } + ClientWrapper::Quic(client) => { + client + .delete_segments(stream_id, topic_id, partition_id, segments_count) + .await + } } } } } + +#[cfg(feature = "sync")] +pub mod sync_impl { + use super::*; + + impl SegmentClient for ClientWrapper { + fn delete_segments( + &self, + stream_id: &Identifier, + topic_id: &Identifier, + partition_id: u32, + segments_count: u32, + ) -> Result<(), IggyError> { + match self { + ClientWrapper::TcpSync(client) => { + client.delete_segments(stream_id, topic_id, partition_id, segments_count) + } + ClientWrapper::Iggy(_) => Err(IggyError::InvalidConfiguration), + } + } + } +} \ No newline at end of file diff --git a/core/sdk/src/client_wrappers/binary_stream_client.rs b/core/sdk/src/client_wrappers/binary_stream_client.rs index cd8ce8d4..a13e4406 100644 --- a/core/sdk/src/client_wrappers/binary_stream_client.rs +++ b/core/sdk/src/client_wrappers/binary_stream_client.rs @@ -16,75 +16,126 @@ * under the License. */ -use crate::client_wrappers::client_wrapper::ClientWrapper; -use async_trait::async_trait; +use crate::client_wrappers::ClientWrapper; use iggy_binary_protocol::StreamClient; use iggy_common::{Identifier, IggyError, Stream, StreamDetails}; -#[maybe_async::maybe_async] -#[async_trait] -impl StreamClient for ClientWrapper { - async fn get_stream(&self, stream_id: &Identifier) -> Result<Option<StreamDetails>, IggyError> { - match self { - ClientWrapper::Iggy(client) => client.get_stream(stream_id).await, - ClientWrapper::Http(client) => client.get_stream(stream_id).await, - ClientWrapper::Tcp(client) => client.get_stream(stream_id).await, - ClientWrapper::Quic(client) => client.get_stream(stream_id).await, - ClientWrapper::TcpSync(client) => client.get_stream(stream_id).await, +#[cfg(feature = "async")] +pub mod async_impl { + use super::*; + use async_trait::async_trait; + + #[async_trait] + impl StreamClient for ClientWrapper { + async fn get_stream(&self, stream_id: &Identifier) -> Result<Option<StreamDetails>, IggyError> { + match self { + ClientWrapper::Iggy(client) => client.get_stream(stream_id).await, + ClientWrapper::Http(client) => client.get_stream(stream_id).await, + ClientWrapper::Tcp(client) => client.get_stream(stream_id).await, + ClientWrapper::Quic(client) => client.get_stream(stream_id).await, + } } - } - async fn get_streams(&self) -> Result<Vec<Stream>, IggyError> { - match self { - ClientWrapper::Iggy(client) => client.get_streams().await, - ClientWrapper::Http(client) => client.get_streams().await, - ClientWrapper::Tcp(client) => client.get_streams().await, - ClientWrapper::Quic(client) => client.get_streams().await, - ClientWrapper::TcpSync(client) => client.get_streams().await, + async fn get_streams(&self) -> Result<Vec<Stream>, IggyError> { + match self { + ClientWrapper::Iggy(client) => client.get_streams().await, + ClientWrapper::Http(client) => client.get_streams().await, + ClientWrapper::Tcp(client) => client.get_streams().await, + ClientWrapper::Quic(client) => client.get_streams().await, + } } - } - async fn create_stream( - &self, - name: &str, - stream_id: Option<u32>, - ) -> Result<StreamDetails, IggyError> { - match self { - ClientWrapper::Iggy(client) => client.create_stream(name, stream_id).await, - ClientWrapper::Http(client) => client.create_stream(name, stream_id).await, - ClientWrapper::Tcp(client) => client.create_stream(name, stream_id).await, - ClientWrapper::Quic(client) => client.create_stream(name, stream_id).await, - ClientWrapper::TcpSync(client) => client.create_stream(name, stream_id).await, + async fn create_stream( + &self, + name: &str, + stream_id: Option<u32>, + ) -> Result<StreamDetails, IggyError> { + match self { + ClientWrapper::Iggy(client) => client.create_stream(name, stream_id).await, + ClientWrapper::Http(client) => client.create_stream(name, stream_id).await, + ClientWrapper::Tcp(client) => client.create_stream(name, stream_id).await, + ClientWrapper::Quic(client) => client.create_stream(name, stream_id).await, + } } - } - async fn update_stream(&self, stream_id: &Identifier, name: &str) -> Result<(), IggyError> { - match self { - ClientWrapper::Iggy(client) => client.update_stream(stream_id, name).await, - ClientWrapper::Http(client) => client.update_stream(stream_id, name).await, - ClientWrapper::Tcp(client) => client.update_stream(stream_id, name).await, - ClientWrapper::Quic(client) => client.update_stream(stream_id, name).await, - ClientWrapper::TcpSync(client) => client.update_stream(stream_id, name).await, + async fn update_stream(&self, stream_id: &Identifier, name: &str) -> Result<(), IggyError> { + match self { + ClientWrapper::Iggy(client) => client.update_stream(stream_id, name).await, + ClientWrapper::Http(client) => client.update_stream(stream_id, name).await, + ClientWrapper::Tcp(client) => client.update_stream(stream_id, name).await, + ClientWrapper::Quic(client) => client.update_stream(stream_id, name).await, + } } - } - async fn delete_stream(&self, stream_id: &Identifier) -> Result<(), IggyError> { - match self { - ClientWrapper::Iggy(client) => client.delete_stream(stream_id).await, - ClientWrapper::Http(client) => client.delete_stream(stream_id).await, - ClientWrapper::Tcp(client) => client.delete_stream(stream_id).await, - ClientWrapper::Quic(client) => client.delete_stream(stream_id).await, - ClientWrapper::TcpSync(client) => client.delete_stream(stream_id).await, + async fn delete_stream(&self, stream_id: &Identifier) -> Result<(), IggyError> { + match self { + ClientWrapper::Iggy(client) => client.delete_stream(stream_id).await, + ClientWrapper::Http(client) => client.delete_stream(stream_id).await, + ClientWrapper::Tcp(client) => client.delete_stream(stream_id).await, + ClientWrapper::Quic(client) => client.delete_stream(stream_id).await, + } } - } - async fn purge_stream(&self, stream_id: &Identifier) -> Result<(), IggyError> { - match self { - ClientWrapper::Iggy(client) => client.purge_stream(stream_id).await, - ClientWrapper::Http(client) => client.purge_stream(stream_id).await, - ClientWrapper::Tcp(client) => client.purge_stream(stream_id).await, - ClientWrapper::Quic(client) => client.purge_stream(stream_id).await, - ClientWrapper::TcpSync(client) => client.purge_stream(stream_id).await, + async fn purge_stream(&self, stream_id: &Identifier) -> Result<(), IggyError> { + match self { + ClientWrapper::Iggy(client) => client.purge_stream(stream_id).await, + ClientWrapper::Http(client) => client.purge_stream(stream_id).await, + ClientWrapper::Tcp(client) => client.purge_stream(stream_id).await, + ClientWrapper::Quic(client) => client.purge_stream(stream_id).await, + } } } } + +#[cfg(feature = "sync")] +pub mod sync_impl { + use super::*; + + impl StreamClient for ClientWrapper { + fn get_stream(&self, stream_id: &Identifier) -> Result<Option<StreamDetails>, IggyError> { + match self { + ClientWrapper::TcpSync(client) => client.get_stream(stream_id), + ClientWrapper::Iggy(_) => Err(IggyError::InvalidConfiguration), + } + } + + fn get_streams(&self) -> Result<Vec<Stream>, IggyError> { + match self { + ClientWrapper::TcpSync(client) => client.get_streams(), + ClientWrapper::Iggy(_) => Err(IggyError::InvalidConfiguration), + } + } + + fn create_stream( + &self, + name: &str, + stream_id: Option<u32>, + ) -> Result<StreamDetails, IggyError> { + match self { + ClientWrapper::TcpSync(client) => client.create_stream(name, stream_id), + ClientWrapper::Iggy(_) => Err(IggyError::InvalidConfiguration), + } + } + + fn update_stream(&self, stream_id: &Identifier, name: &str) -> Result<(), IggyError> { + match self { + ClientWrapper::TcpSync(client) => client.update_stream(stream_id, name), + ClientWrapper::Iggy(_) => Err(IggyError::InvalidConfiguration), + } + } + + fn delete_stream(&self, stream_id: &Identifier) -> Result<(), IggyError> { + match self { + ClientWrapper::TcpSync(client) => client.delete_stream(stream_id), + ClientWrapper::Iggy(_) => Err(IggyError::InvalidConfiguration), + } + } + + fn purge_stream(&self, stream_id: &Identifier) -> Result<(), IggyError> { + match self { + ClientWrapper::TcpSync(client) => client.purge_stream(stream_id), + ClientWrapper::Iggy(_) => Err(IggyError::InvalidConfiguration), + } + } + } +} \ No newline at end of file diff --git a/core/sdk/src/client_wrappers/binary_system_client.rs b/core/sdk/src/client_wrappers/binary_system_client.rs index e9718bfd..e29305fb 100644 --- a/core/sdk/src/client_wrappers/binary_system_client.rs +++ b/core/sdk/src/client_wrappers/binary_system_client.rs @@ -16,88 +16,145 @@ * under the License. */ -use crate::client_wrappers::client_wrapper::ClientWrapper; -use async_trait::async_trait; +use crate::client_wrappers::ClientWrapper; use iggy_binary_protocol::SystemClient; use iggy_common::{ ClientInfo, ClientInfoDetails, IggyDuration, IggyError, Snapshot, SnapshotCompression, Stats, SystemSnapshotType, }; -#[maybe_async::maybe_async] -#[async_trait] -impl SystemClient for ClientWrapper { - async fn get_stats(&self) -> Result<Stats, IggyError> { - match self { - ClientWrapper::Iggy(client) => client.get_stats().await, - ClientWrapper::Http(client) => client.get_stats().await, - ClientWrapper::Tcp(client) => client.get_stats().await, - ClientWrapper::Quic(client) => client.get_stats().await, - ClientWrapper::TcpSync(client) => client.get_stats().await, +#[cfg(feature = "async")] +pub mod async_impl { + use super::*; + use async_trait::async_trait; + + #[async_trait] + impl SystemClient for ClientWrapper { + async fn get_stats(&self) -> Result<Stats, IggyError> { + match self { + ClientWrapper::Iggy(client) => client.get_stats().await, + ClientWrapper::Http(client) => client.get_stats().await, + ClientWrapper::Tcp(client) => client.get_stats().await, + ClientWrapper::Quic(client) => client.get_stats().await, + } } - } - async fn get_me(&self) -> Result<ClientInfoDetails, IggyError> { - match self { - ClientWrapper::Iggy(client) => client.get_me().await, - ClientWrapper::Http(client) => client.get_me().await, - ClientWrapper::Tcp(client) => client.get_me().await, - ClientWrapper::Quic(client) => client.get_me().await, - ClientWrapper::TcpSync(client) => client.get_me().await, + async fn get_me(&self) -> Result<ClientInfoDetails, IggyError> { + match self { + ClientWrapper::Iggy(client) => client.get_me().await, + ClientWrapper::Http(client) => client.get_me().await, + ClientWrapper::Tcp(client) => client.get_me().await, + ClientWrapper::Quic(client) => client.get_me().await, + } } - } - async fn get_client(&self, client_id: u32) -> Result<Option<ClientInfoDetails>, IggyError> { - match self { - ClientWrapper::Iggy(client) => client.get_client(client_id).await, - ClientWrapper::Http(client) => client.get_client(client_id).await, - ClientWrapper::Tcp(client) => client.get_client(client_id).await, - ClientWrapper::Quic(client) => client.get_client(client_id).await, - ClientWrapper::TcpSync(client) => client.get_client(client_id).await, + async fn get_client(&self, client_id: u32) -> Result<Option<ClientInfoDetails>, IggyError> { + match self { + ClientWrapper::Iggy(client) => client.get_client(client_id).await, + ClientWrapper::Http(client) => client.get_client(client_id).await, + ClientWrapper::Tcp(client) => client.get_client(client_id).await, + ClientWrapper::Quic(client) => client.get_client(client_id).await, + } } - } - async fn get_clients(&self) -> Result<Vec<ClientInfo>, IggyError> { - match self { - ClientWrapper::Iggy(client) => client.get_clients().await, - ClientWrapper::Http(client) => client.get_clients().await, - ClientWrapper::Tcp(client) => client.get_clients().await, - ClientWrapper::Quic(client) => client.get_clients().await, - ClientWrapper::TcpSync(client) => client.get_clients().await, + async fn get_clients(&self) -> Result<Vec<ClientInfo>, IggyError> { + match self { + ClientWrapper::Iggy(client) => client.get_clients().await, + ClientWrapper::Http(client) => client.get_clients().await, + ClientWrapper::Tcp(client) => client.get_clients().await, + ClientWrapper::Quic(client) => client.get_clients().await, + } } - } - async fn ping(&self) -> Result<(), IggyError> { - match self { - ClientWrapper::Iggy(client) => client.ping().await, - ClientWrapper::Http(client) => client.ping().await, - ClientWrapper::Tcp(client) => client.ping().await, - ClientWrapper::Quic(client) => client.ping().await, - ClientWrapper::TcpSync(client) => client.ping().await, + async fn ping(&self) -> Result<(), IggyError> { + match self { + ClientWrapper::Iggy(client) => client.ping().await, + ClientWrapper::Http(client) => client.ping().await, + ClientWrapper::Tcp(client) => client.ping().await, + ClientWrapper::Quic(client) => client.ping().await, + } } - } - async fn heartbeat_interval(&self) -> IggyDuration { - match self { - ClientWrapper::Iggy(client) => client.heartbeat_interval().await, - ClientWrapper::Http(client) => client.heartbeat_interval().await, - ClientWrapper::Tcp(client) => client.heartbeat_interval().await, - ClientWrapper::Quic(client) => client.heartbeat_interval().await, - ClientWrapper::TcpSync(client) => client.heartbeat_interval().await, + async fn heartbeat_interval(&self) -> IggyDuration { + match self { + ClientWrapper::Iggy(client) => client.heartbeat_interval().await, + ClientWrapper::Http(client) => client.heartbeat_interval().await, + ClientWrapper::Tcp(client) => client.heartbeat_interval().await, + ClientWrapper::Quic(client) => client.heartbeat_interval().await, + } } - } - async fn snapshot( - &self, - compression: SnapshotCompression, - snapshot_types: Vec<SystemSnapshotType>, - ) -> Result<Snapshot, IggyError> { - match self { - ClientWrapper::Iggy(client) => client.snapshot(compression, snapshot_types).await, - ClientWrapper::Http(client) => client.snapshot(compression, snapshot_types).await, - ClientWrapper::Tcp(client) => client.snapshot(compression, snapshot_types).await, - ClientWrapper::Quic(client) => client.snapshot(compression, snapshot_types).await, - ClientWrapper::TcpSync(client) => client.snapshot(compression, snapshot_types).await, + async fn snapshot( + &self, + compression: SnapshotCompression, + snapshot_types: Vec<SystemSnapshotType>, + ) -> Result<Snapshot, IggyError> { + match self { + ClientWrapper::Iggy(client) => client.snapshot(compression, snapshot_types).await, + ClientWrapper::Http(client) => client.snapshot(compression, snapshot_types).await, + ClientWrapper::Tcp(client) => client.snapshot(compression, snapshot_types).await, + ClientWrapper::Quic(client) => client.snapshot(compression, snapshot_types).await, + } } } } + +#[cfg(feature = "sync")] +pub mod sync_impl { + use super::*; + + impl SystemClient for ClientWrapper { + fn get_stats(&self) -> Result<Stats, IggyError> { + match self { + ClientWrapper::TcpSync(client) => client.get_stats(), + ClientWrapper::Iggy(_) => Err(IggyError::InvalidConfiguration), + } + } + + fn get_me(&self) -> Result<ClientInfoDetails, IggyError> { + match self { + ClientWrapper::TcpSync(client) => client.get_me(), + ClientWrapper::Iggy(_) => Err(IggyError::InvalidConfiguration), + } + } + + fn get_client(&self, client_id: u32) -> Result<Option<ClientInfoDetails>, IggyError> { + match self { + ClientWrapper::TcpSync(client) => client.get_client(client_id), + ClientWrapper::Iggy(_) => Err(IggyError::InvalidConfiguration), + } + } + + fn get_clients(&self) -> Result<Vec<ClientInfo>, IggyError> { + match self { + ClientWrapper::TcpSync(client) => client.get_clients(), + ClientWrapper::Iggy(_) => Err(IggyError::InvalidConfiguration), + } + } + + fn ping(&self) -> Result<(), IggyError> { + match self { + ClientWrapper::TcpSync(client) => client.ping(), + ClientWrapper::Iggy(_) => Err(IggyError::InvalidConfiguration), + } + } + + fn heartbeat_interval(&self) -> IggyDuration { + match self { + ClientWrapper::TcpSync(client) => client.heartbeat_interval(), + ClientWrapper::Iggy(_) => IggyDuration::default(), + } + } + + fn snapshot( + &self, + compression: SnapshotCompression, + snapshot_types: Vec<SystemSnapshotType>, + ) -> Result<Snapshot, IggyError> { + match self { + ClientWrapper::TcpSync(client) => client.snapshot(compression, snapshot_types), + ClientWrapper::Iggy(_) => Err(IggyError::InvalidConfiguration), + } + } + } +} \ No newline at end of file diff --git a/core/sdk/src/client_wrappers/binary_topic_client.rs b/core/sdk/src/client_wrappers/binary_topic_client.rs index 55fbdcca..e3da23df 100644 --- a/core/sdk/src/client_wrappers/binary_topic_client.rs +++ b/core/sdk/src/client_wrappers/binary_topic_client.rs @@ -16,191 +16,270 @@ * under the License. */ -use crate::client_wrappers::client_wrapper::ClientWrapper; -use async_trait::async_trait; +use crate::client_wrappers::ClientWrapper; use iggy_binary_protocol::TopicClient; use iggy_common::{ CompressionAlgorithm, Identifier, IggyError, IggyExpiry, MaxTopicSize, Topic, TopicDetails, }; -#[maybe_async::maybe_async] -#[async_trait] -impl TopicClient for ClientWrapper { - async fn get_topic( - &self, - stream_id: &Identifier, - topic_id: &Identifier, - ) -> Result<Option<TopicDetails>, IggyError> { - match self { - ClientWrapper::Iggy(client) => client.get_topic(stream_id, topic_id).await, - ClientWrapper::Http(client) => client.get_topic(stream_id, topic_id).await, - ClientWrapper::Tcp(client) => client.get_topic(stream_id, topic_id).await, - ClientWrapper::Quic(client) => client.get_topic(stream_id, topic_id).await, - ClientWrapper::TcpSync(client) => client.get_topic(stream_id, topic_id).await, - } - } +#[cfg(feature = "async")] +pub mod async_impl { + use super::*; + use async_trait::async_trait; - async fn get_topics(&self, stream_id: &Identifier) -> Result<Vec<Topic>, IggyError> { - match self { - ClientWrapper::Iggy(client) => client.get_topics(stream_id).await, - ClientWrapper::Http(client) => client.get_topics(stream_id).await, - ClientWrapper::Tcp(client) => client.get_topics(stream_id).await, - ClientWrapper::Quic(client) => client.get_topics(stream_id).await, - ClientWrapper::TcpSync(client) => client.get_topics(stream_id).await, + #[async_trait] + impl TopicClient for ClientWrapper { + async fn get_topic( + &self, + stream_id: &Identifier, + topic_id: &Identifier, + ) -> Result<Option<TopicDetails>, IggyError> { + match self { + ClientWrapper::Iggy(client) => client.get_topic(stream_id, topic_id).await, + ClientWrapper::Http(client) => client.get_topic(stream_id, topic_id).await, + ClientWrapper::Tcp(client) => client.get_topic(stream_id, topic_id).await, + ClientWrapper::Quic(client) => client.get_topic(stream_id, topic_id).await, + } } - } - async fn create_topic( - &self, - stream_id: &Identifier, - name: &str, - partitions_count: u32, - compression_algorithm: CompressionAlgorithm, - replication_factor: Option<u8>, - topic_id: Option<u32>, - message_expiry: IggyExpiry, - max_topic_size: MaxTopicSize, - ) -> Result<TopicDetails, IggyError> { - match self { - ClientWrapper::Iggy(client) => { - client - .create_topic( - stream_id, - name, - partitions_count, - compression_algorithm, - replication_factor, - topic_id, - message_expiry, - max_topic_size, - ) - .await + async fn get_topics(&self, stream_id: &Identifier) -> Result<Vec<Topic>, IggyError> { + match self { + ClientWrapper::Iggy(client) => client.get_topics(stream_id).await, + ClientWrapper::Http(client) => client.get_topics(stream_id).await, + ClientWrapper::Tcp(client) => client.get_topics(stream_id).await, + ClientWrapper::Quic(client) => client.get_topics(stream_id).await, } - ClientWrapper::Http(client) => { - client - .create_topic( - stream_id, - name, - partitions_count, - compression_algorithm, - replication_factor, - topic_id, - message_expiry, - max_topic_size, - ) - .await + } + + async fn create_topic( + &self, + stream_id: &Identifier, + name: &str, + partitions_count: u32, + compression_algorithm: CompressionAlgorithm, + replication_factor: Option<u8>, + topic_id: Option<u32>, + message_expiry: IggyExpiry, + max_topic_size: MaxTopicSize, + ) -> Result<TopicDetails, IggyError> { + match self { + ClientWrapper::Iggy(client) => { + client + .create_topic( + stream_id, + name, + partitions_count, + compression_algorithm, + replication_factor, + topic_id, + message_expiry, + max_topic_size, + ) + .await + } + ClientWrapper::Http(client) => { + client + .create_topic( + stream_id, + name, + partitions_count, + compression_algorithm, + replication_factor, + topic_id, + message_expiry, + max_topic_size, + ) + .await + } + ClientWrapper::Tcp(client) => { + client + .create_topic( + stream_id, + name, + partitions_count, + compression_algorithm, + replication_factor, + topic_id, + message_expiry, + max_topic_size, + ) + .await + } + ClientWrapper::Quic(client) => { + client + .create_topic( + stream_id, + name, + partitions_count, + compression_algorithm, + replication_factor, + topic_id, + message_expiry, + max_topic_size, + ) + .await + } } - ClientWrapper::Tcp(client) => { - client - .create_topic( - stream_id, - name, - partitions_count, - compression_algorithm, - replication_factor, - topic_id, - message_expiry, - max_topic_size, - ) - .await + } + + async fn update_topic( + &self, + stream_id: &Identifier, + topic_id: &Identifier, + name: &str, + compression_algorithm: CompressionAlgorithm, + replication_factor: Option<u8>, + message_expiry: IggyExpiry, + max_topic_size: MaxTopicSize, + ) -> Result<(), IggyError> { + match self { + ClientWrapper::Iggy(client) => { + client + .update_topic( + stream_id, + topic_id, + name, + compression_algorithm, + replication_factor, + message_expiry, + max_topic_size, + ) + .await + } + ClientWrapper::Http(client) => { + client + .update_topic( + stream_id, + topic_id, + name, + compression_algorithm, + replication_factor, + message_expiry, + max_topic_size, + ) + .await + } + ClientWrapper::Tcp(client) => { + client + .update_topic( + stream_id, + topic_id, + name, + compression_algorithm, + replication_factor, + message_expiry, + max_topic_size, + ) + .await + } + ClientWrapper::Quic(client) => { + client + .update_topic( + stream_id, + topic_id, + name, + compression_algorithm, + replication_factor, + message_expiry, + max_topic_size, + ) + .await + } } - ClientWrapper::Quic(client) => { - client - .create_topic( - stream_id, - name, - partitions_count, - compression_algorithm, - replication_factor, - topic_id, - message_expiry, - max_topic_size, - ) - .await + } + + async fn delete_topic( + &self, + stream_id: &Identifier, + topic_id: &Identifier, + ) -> Result<(), IggyError> { + match self { + ClientWrapper::Iggy(client) => client.delete_topic(stream_id, topic_id).await, + ClientWrapper::Http(client) => client.delete_topic(stream_id, topic_id).await, + ClientWrapper::Tcp(client) => client.delete_topic(stream_id, topic_id).await, + ClientWrapper::Quic(client) => client.delete_topic(stream_id, topic_id).await, } - ClientWrapper::TcpSync(client) => { - client - .create_topic( - stream_id, - name, - partitions_count, - compression_algorithm, - replication_factor, - topic_id, - message_expiry, - max_topic_size, - ) - .await + } + + async fn purge_topic( + &self, + stream_id: &Identifier, + topic_id: &Identifier, + ) -> Result<(), IggyError> { + match self { + ClientWrapper::Iggy(client) => client.purge_topic(stream_id, topic_id).await, + ClientWrapper::Http(client) => client.purge_topic(stream_id, topic_id).await, + ClientWrapper::Tcp(client) => client.purge_topic(stream_id, topic_id).await, + ClientWrapper::Quic(client) => client.purge_topic(stream_id, topic_id).await, } } } +} - async fn update_topic( - &self, - stream_id: &Identifier, - topic_id: &Identifier, - name: &str, - compression_algorithm: CompressionAlgorithm, - replication_factor: Option<u8>, - message_expiry: IggyExpiry, - max_topic_size: MaxTopicSize, - ) -> Result<(), IggyError> { - match self { - ClientWrapper::Iggy(client) => { - client - .update_topic( - stream_id, - topic_id, - name, - compression_algorithm, - replication_factor, - message_expiry, - max_topic_size, - ) - .await +#[cfg(feature = "sync")] +pub mod sync_impl { + use super::*; + + impl TopicClient for ClientWrapper { + fn get_topic( + &self, + stream_id: &Identifier, + topic_id: &Identifier, + ) -> Result<Option<TopicDetails>, IggyError> { + match self { + ClientWrapper::TcpSync(client) => client.get_topic(stream_id, topic_id), + ClientWrapper::Iggy(_) => Err(IggyError::InvalidConfiguration), } - ClientWrapper::Http(client) => { - client - .update_topic( - stream_id, - topic_id, - name, - compression_algorithm, - replication_factor, - message_expiry, - max_topic_size, - ) - .await + } + + fn get_topics(&self, stream_id: &Identifier) -> Result<Vec<Topic>, IggyError> { + match self { + ClientWrapper::TcpSync(client) => client.get_topics(stream_id), + ClientWrapper::Iggy(_) => Err(IggyError::InvalidConfiguration), } - ClientWrapper::Tcp(client) => { - client - .update_topic( + } + + fn create_topic( + &self, + stream_id: &Identifier, + name: &str, + partitions_count: u32, + compression_algorithm: CompressionAlgorithm, + replication_factor: Option<u8>, + topic_id: Option<u32>, + message_expiry: IggyExpiry, + max_topic_size: MaxTopicSize, + ) -> Result<TopicDetails, IggyError> { + match self { + ClientWrapper::TcpSync(client) => { + client.create_topic( stream_id, - topic_id, name, + partitions_count, compression_algorithm, replication_factor, - message_expiry, - max_topic_size, - ) - .await - } - ClientWrapper::Quic(client) => { - client - .update_topic( - stream_id, topic_id, - name, - compression_algorithm, - replication_factor, message_expiry, max_topic_size, ) - .await + } + ClientWrapper::Iggy(_) => Err(IggyError::InvalidConfiguration), } - ClientWrapper::TcpSync(client) => { - client - .update_topic( + } + + fn update_topic( + &self, + stream_id: &Identifier, + topic_id: &Identifier, + name: &str, + compression_algorithm: CompressionAlgorithm, + replication_factor: Option<u8>, + message_expiry: IggyExpiry, + max_topic_size: MaxTopicSize, + ) -> Result<(), IggyError> { + match self { + ClientWrapper::TcpSync(client) => { + client.update_topic( stream_id, topic_id, name, @@ -209,36 +288,31 @@ impl TopicClient for ClientWrapper { message_expiry, max_topic_size, ) - .await + } + ClientWrapper::Iggy(_) => Err(IggyError::InvalidConfiguration), } } - } - async fn delete_topic( - &self, - stream_id: &Identifier, - topic_id: &Identifier, - ) -> Result<(), IggyError> { - match self { - ClientWrapper::Iggy(client) => client.delete_topic(stream_id, topic_id).await, - ClientWrapper::Http(client) => client.delete_topic(stream_id, topic_id).await, - ClientWrapper::Tcp(client) => client.delete_topic(stream_id, topic_id).await, - ClientWrapper::Quic(client) => client.delete_topic(stream_id, topic_id).await, - ClientWrapper::TcpSync(client) => client.delete_topic(stream_id, topic_id).await, + fn delete_topic( + &self, + stream_id: &Identifier, + topic_id: &Identifier, + ) -> Result<(), IggyError> { + match self { + ClientWrapper::TcpSync(client) => client.delete_topic(stream_id, topic_id), + ClientWrapper::Iggy(_) => Err(IggyError::InvalidConfiguration), + } } - } - async fn purge_topic( - &self, - stream_id: &Identifier, - topic_id: &Identifier, - ) -> Result<(), IggyError> { - match self { - ClientWrapper::Iggy(client) => client.purge_topic(stream_id, topic_id).await, - ClientWrapper::Http(client) => client.purge_topic(stream_id, topic_id).await, - ClientWrapper::Tcp(client) => client.purge_topic(stream_id, topic_id).await, - ClientWrapper::Quic(client) => client.purge_topic(stream_id, topic_id).await, - ClientWrapper::TcpSync(client) => client.purge_topic(stream_id, topic_id).await, + fn purge_topic( + &self, + stream_id: &Identifier, + topic_id: &Identifier, + ) -> Result<(), IggyError> { + match self { + ClientWrapper::TcpSync(client) => client.purge_topic(stream_id, topic_id), + ClientWrapper::Iggy(_) => Err(IggyError::InvalidConfiguration), + } } } -} +} \ No newline at end of file diff --git a/core/sdk/src/client_wrappers/binary_user_client.rs b/core/sdk/src/client_wrappers/binary_user_client.rs index 486e937e..3838be28 100644 --- a/core/sdk/src/client_wrappers/binary_user_client.rs +++ b/core/sdk/src/client_wrappers/binary_user_client.rs @@ -16,163 +16,244 @@ * under the License. */ -use crate::client_wrappers::client_wrapper::ClientWrapper; -use async_trait::async_trait; +use crate::client_wrappers::ClientWrapper; use iggy_binary_protocol::UserClient; use iggy_common::{ Identifier, IdentityInfo, IggyError, Permissions, UserInfo, UserInfoDetails, UserStatus, }; -#[maybe_async::maybe_async] -#[async_trait] -impl UserClient for ClientWrapper { - async fn get_user(&self, user_id: &Identifier) -> Result<Option<UserInfoDetails>, IggyError> { - match self { - ClientWrapper::Iggy(client) => client.get_user(user_id).await, - ClientWrapper::Http(client) => client.get_user(user_id).await, - ClientWrapper::Tcp(client) => client.get_user(user_id).await, - ClientWrapper::Quic(client) => client.get_user(user_id).await, - ClientWrapper::TcpSync(client) => client.get_user(user_id).await, +#[cfg(feature = "async")] +pub mod async_impl { + use super::*; + use async_trait::async_trait; + + #[async_trait] + impl UserClient for ClientWrapper { + async fn get_user(&self, user_id: &Identifier) -> Result<Option<UserInfoDetails>, IggyError> { + match self { + ClientWrapper::Iggy(client) => client.get_user(user_id).await, + ClientWrapper::Http(client) => client.get_user(user_id).await, + ClientWrapper::Tcp(client) => client.get_user(user_id).await, + ClientWrapper::Quic(client) => client.get_user(user_id).await, + } } - } - async fn get_users(&self) -> Result<Vec<UserInfo>, IggyError> { - match self { - ClientWrapper::Iggy(client) => client.get_users().await, - ClientWrapper::Http(client) => client.get_users().await, - ClientWrapper::Tcp(client) => client.get_users().await, - ClientWrapper::Quic(client) => client.get_users().await, - ClientWrapper::TcpSync(client) => client.get_users().await, + async fn get_users(&self) -> Result<Vec<UserInfo>, IggyError> { + match self { + ClientWrapper::Iggy(client) => client.get_users().await, + ClientWrapper::Http(client) => client.get_users().await, + ClientWrapper::Tcp(client) => client.get_users().await, + ClientWrapper::Quic(client) => client.get_users().await, + } } - } - async fn create_user( - &self, - username: &str, - password: &str, - status: UserStatus, - permissions: Option<Permissions>, - ) -> Result<UserInfoDetails, IggyError> { - match self { - ClientWrapper::Http(client) => { - client - .create_user(username, password, status, permissions) - .await - } - ClientWrapper::Tcp(client) => { - client - .create_user(username, password, status, permissions) - .await - } - ClientWrapper::Quic(client) => { - client - .create_user(username, password, status, permissions) - .await - } - ClientWrapper::Iggy(client) => { - client - .create_user(username, password, status, permissions) - .await - } - ClientWrapper::TcpSync(client) => { - client - .create_user(username, password, status, permissions) - .await + async fn create_user( + &self, + username: &str, + password: &str, + status: UserStatus, + permissions: Option<Permissions>, + ) -> Result<UserInfoDetails, IggyError> { + match self { + ClientWrapper::Iggy(client) => { + client + .create_user(username, password, status, permissions) + .await + } + ClientWrapper::Http(client) => { + client + .create_user(username, password, status, permissions) + .await + } + ClientWrapper::Tcp(client) => { + client + .create_user(username, password, status, permissions) + .await + } + ClientWrapper::Quic(client) => { + client + .create_user(username, password, status, permissions) + .await + } } } - } - async fn delete_user(&self, user_id: &Identifier) -> Result<(), IggyError> { - match self { - ClientWrapper::Http(client) => client.delete_user(user_id).await, - ClientWrapper::Tcp(client) => client.delete_user(user_id).await, - ClientWrapper::Quic(client) => client.delete_user(user_id).await, - ClientWrapper::Iggy(client) => client.delete_user(user_id).await, - ClientWrapper::TcpSync(client) => client.delete_user(user_id).await, + async fn delete_user(&self, user_id: &Identifier) -> Result<(), IggyError> { + match self { + ClientWrapper::Iggy(client) => client.delete_user(user_id).await, + ClientWrapper::Http(client) => client.delete_user(user_id).await, + ClientWrapper::Tcp(client) => client.delete_user(user_id).await, + ClientWrapper::Quic(client) => client.delete_user(user_id).await, + } } - } - async fn update_user( - &self, - user_id: &Identifier, - username: Option<&str>, - status: Option<UserStatus>, - ) -> Result<(), IggyError> { - match self { - ClientWrapper::Http(client) => client.update_user(user_id, username, status).await, - ClientWrapper::Tcp(client) => client.update_user(user_id, username, status).await, - ClientWrapper::Quic(client) => client.update_user(user_id, username, status).await, - ClientWrapper::Iggy(client) => client.update_user(user_id, username, status).await, - ClientWrapper::TcpSync(client) => client.update_user(user_id, username, status).await, + async fn update_user( + &self, + user_id: &Identifier, + username: Option<&str>, + status: Option<UserStatus>, + ) -> Result<(), IggyError> { + match self { + ClientWrapper::Iggy(client) => client.update_user(user_id, username, status).await, + ClientWrapper::Http(client) => client.update_user(user_id, username, status).await, + ClientWrapper::Tcp(client) => client.update_user(user_id, username, status).await, + ClientWrapper::Quic(client) => client.update_user(user_id, username, status).await, + } } - } - async fn update_permissions( - &self, - user_id: &Identifier, - permissions: Option<Permissions>, - ) -> Result<(), IggyError> { - match self { - ClientWrapper::Iggy(client) => client.update_permissions(user_id, permissions).await, - ClientWrapper::Http(client) => client.update_permissions(user_id, permissions).await, - ClientWrapper::Tcp(client) => client.update_permissions(user_id, permissions).await, - ClientWrapper::Quic(client) => client.update_permissions(user_id, permissions).await, - ClientWrapper::TcpSync(client) => client.update_permissions(user_id, permissions).await, + async fn update_permissions( + &self, + user_id: &Identifier, + permissions: Option<Permissions>, + ) -> Result<(), IggyError> { + match self { + ClientWrapper::Iggy(client) => client.update_permissions(user_id, permissions).await, + ClientWrapper::Http(client) => client.update_permissions(user_id, permissions).await, + ClientWrapper::Tcp(client) => client.update_permissions(user_id, permissions).await, + ClientWrapper::Quic(client) => client.update_permissions(user_id, permissions).await, + } } - } - async fn change_password( - &self, - user_id: &Identifier, - current_password: &str, - new_password: &str, - ) -> Result<(), IggyError> { - match self { - ClientWrapper::Http(client) => { - client - .change_password(user_id, current_password, new_password) - .await - } - ClientWrapper::Tcp(client) => { - client - .change_password(user_id, current_password, new_password) - .await - } - ClientWrapper::Quic(client) => { - client - .change_password(user_id, current_password, new_password) - .await - } - ClientWrapper::Iggy(client) => { - client - .change_password(user_id, current_password, new_password) - .await - } - ClientWrapper::TcpSync(client) => { - client - .change_password(user_id, current_password, new_password) - .await + async fn change_password( + &self, + user_id: &Identifier, + current_password: &str, + new_password: &str, + ) -> Result<(), IggyError> { + match self { + ClientWrapper::Iggy(client) => { + client + .change_password(user_id, current_password, new_password) + .await + } + ClientWrapper::Http(client) => { + client + .change_password(user_id, current_password, new_password) + .await + } + ClientWrapper::Tcp(client) => { + client + .change_password(user_id, current_password, new_password) + .await + } + ClientWrapper::Quic(client) => { + client + .change_password(user_id, current_password, new_password) + .await + } } } - } - async fn login_user(&self, username: &str, password: &str) -> Result<IdentityInfo, IggyError> { - match self { - ClientWrapper::Iggy(client) => client.login_user(username, password).await, - ClientWrapper::Http(client) => client.login_user(username, password).await, - ClientWrapper::Tcp(client) => client.login_user(username, password).await, - ClientWrapper::Quic(client) => client.login_user(username, password).await, - ClientWrapper::TcpSync(client) => client.login_user(username, password).await, + async fn login_user(&self, username: &str, password: &str) -> Result<IdentityInfo, IggyError> { + match self { + ClientWrapper::Iggy(client) => client.login_user(username, password).await, + ClientWrapper::Http(client) => client.login_user(username, password).await, + ClientWrapper::Tcp(client) => client.login_user(username, password).await, + ClientWrapper::Quic(client) => client.login_user(username, password).await, + } } - } - async fn logout_user(&self) -> Result<(), IggyError> { - match self { - ClientWrapper::Iggy(client) => client.logout_user().await, - ClientWrapper::Http(client) => client.logout_user().await, - ClientWrapper::Tcp(client) => client.logout_user().await, - ClientWrapper::Quic(client) => client.logout_user().await, - ClientWrapper::TcpSync(client) => client.logout_user().await, + async fn logout_user(&self) -> Result<(), IggyError> { + match self { + ClientWrapper::Iggy(client) => client.logout_user().await, + ClientWrapper::Http(client) => client.logout_user().await, + ClientWrapper::Tcp(client) => client.logout_user().await, + ClientWrapper::Quic(client) => client.logout_user().await, + } } } } + +#[cfg(feature = "sync")] +pub mod sync_impl { + use super::*; + + impl UserClient for ClientWrapper { + fn get_user(&self, user_id: &Identifier) -> Result<Option<UserInfoDetails>, IggyError> { + match self { + ClientWrapper::TcpSync(client) => client.get_user(user_id), + ClientWrapper::Iggy(_) => Err(IggyError::InvalidConfiguration), + } + } + + fn get_users(&self) -> Result<Vec<UserInfo>, IggyError> { + match self { + ClientWrapper::TcpSync(client) => client.get_users(), + ClientWrapper::Iggy(_) => Err(IggyError::InvalidConfiguration), + } + } + + fn create_user( + &self, + username: &str, + password: &str, + status: UserStatus, + permissions: Option<Permissions>, + ) -> Result<UserInfoDetails, IggyError> { + match self { + ClientWrapper::TcpSync(client) => { + client.create_user(username, password, status, permissions) + } + ClientWrapper::Iggy(_) => Err(IggyError::InvalidConfiguration), + } + } + + fn delete_user(&self, user_id: &Identifier) -> Result<(), IggyError> { + match self { + ClientWrapper::TcpSync(client) => client.delete_user(user_id), + ClientWrapper::Iggy(_) => Err(IggyError::InvalidConfiguration), + } + } + + fn update_user( + &self, + user_id: &Identifier, + username: Option<&str>, + status: Option<UserStatus>, + ) -> Result<(), IggyError> { + match self { + ClientWrapper::TcpSync(client) => client.update_user(user_id, username, status), + ClientWrapper::Iggy(_) => Err(IggyError::InvalidConfiguration), + } + } + + fn update_permissions( + &self, + user_id: &Identifier, + permissions: Option<Permissions>, + ) -> Result<(), IggyError> { + match self { + ClientWrapper::TcpSync(client) => client.update_permissions(user_id, permissions), + ClientWrapper::Iggy(_) => Err(IggyError::InvalidConfiguration), + } + } + + fn change_password( + &self, + user_id: &Identifier, + current_password: &str, + new_password: &str, + ) -> Result<(), IggyError> { + match self { + ClientWrapper::TcpSync(client) => { + client.change_password(user_id, current_password, new_password) + } + ClientWrapper::Iggy(_) => Err(IggyError::InvalidConfiguration), + } + } + + fn login_user(&self, username: &str, password: &str) -> Result<IdentityInfo, IggyError> { + match self { + ClientWrapper::TcpSync(client) => client.login_user(username, password), + ClientWrapper::Iggy(_) => Err(IggyError::InvalidConfiguration), + } + } + + fn logout_user(&self) -> Result<(), IggyError> { + match self { + ClientWrapper::TcpSync(client) => client.logout_user(), + ClientWrapper::Iggy(_) => Err(IggyError::InvalidConfiguration), + } + } + } +} \ No newline at end of file diff --git a/core/sdk/src/client_wrappers/client_wrapper.rs b/core/sdk/src/client_wrappers/client_wrapper.rs index 12f758f5..1a0bf9e8 100644 --- a/core/sdk/src/client_wrappers/client_wrapper.rs +++ b/core/sdk/src/client_wrappers/client_wrapper.rs @@ -17,16 +17,32 @@ */ use crate::clients::client::IggyClient; -use crate::http::http_client::HttpClient; -use crate::quic::quic_client::QuicClient; -use crate::tcp::tcp_client::TcpClient; -#[allow(clippy::large_enum_variant)] -#[derive(Debug)] -pub enum ClientWrapper { - Iggy(IggyClient), - Http(HttpClient), - Tcp(TcpClient), - TcpSync(Box<dyn iggy_binary_protocol::Client>), - Quic(QuicClient), +#[cfg(feature = "async")] +pub mod async_impl { + use super::*; + use crate::http::http_client::HttpClient; + use crate::quic::quic_client::QuicClient; + use crate::tcp::tcp_client::TcpClient; + + #[allow(clippy::large_enum_variant)] + #[derive(Debug)] + pub enum ClientWrapper { + Iggy(IggyClient), + Http(HttpClient), + Tcp(TcpClient), + Quic(QuicClient), + } +} + +#[cfg(feature = "sync")] +pub mod sync_impl { + use super::*; + + #[allow(clippy::large_enum_variant)] + #[derive(Debug)] + pub enum ClientWrapper { + Iggy(IggyClient), + TcpSync(Box<dyn iggy_binary_protocol::Client>), + } } diff --git a/core/sdk/src/client_wrappers/mod.rs b/core/sdk/src/client_wrappers/mod.rs index 04911271..a2fc5444 100644 --- a/core/sdk/src/client_wrappers/mod.rs +++ b/core/sdk/src/client_wrappers/mod.rs @@ -27,4 +27,11 @@ mod binary_stream_client; mod binary_system_client; mod binary_topic_client; mod binary_user_client; -pub mod client_wrapper; + +mod client_wrapper; + +#[cfg(feature = "sync")] +pub use client_wrapper::sync_impl::ClientWrapper; + +#[cfg(feature = "async")] +pub use client_wrapper::async_impl::ClientWrapper; diff --git a/core/sdk/src/clients/client.rs b/core/sdk/src/clients/client.rs index e8e30520..84f6e8b0 100644 --- a/core/sdk/src/clients/client.rs +++ b/core/sdk/src/clients/client.rs @@ -19,26 +19,33 @@ use crate::clients::client_builder::IggyClientBuilder; use iggy_common::locking::{IggySharedMut, IggySharedMutFn}; -use crate::client_wrappers::client_wrapper::ClientWrapper; +use crate::client_wrappers::ClientWrapper; +#[cfg(feature = "async")] use crate::http::http_client::HttpClient; use crate::prelude::EncryptorKind; use crate::prelude::IggyConsumerBuilder; use crate::prelude::IggyError; use crate::prelude::IggyProducerBuilder; +#[cfg(feature = "async")] use crate::quic::quic_client::QuicClient; +#[cfg(feature = "async")] use crate::tcp::tcp_client::TcpClient; +#[cfg(feature = "sync")] +use crate::tcp::tcp_client_sync::TcpClientSync; +#[cfg(feature = "sync")] +use crate::connection::transport::TcpTransport; use async_broadcast::Receiver; +#[cfg(feature = "async")] use async_trait::async_trait; use iggy_binary_protocol::{Client, SystemClient}; +#[cfg(feature = "async")] +use tokio::{spawn, time::sleep}; use iggy_common::{ ConnectionStringUtils, Consumer, DiagnosticEvent, Partitioner, TransportProtocol, }; use std::fmt::Debug; use std::sync::Arc; -use tokio::spawn; -use tokio::time::sleep; -use tracing::log::warn; -use tracing::{debug, error, info}; +use tracing::{debug, error, info, warn}; /// The main client struct which implements all the `Client` traits and wraps the underlying low-level client for the specific transport. /// @@ -53,7 +60,14 @@ pub struct IggyClient { impl Default for IggyClient { fn default() -> Self { - IggyClient::new(ClientWrapper::Tcp(TcpClient::default())) + #[cfg(feature = "async")] + { + IggyClient::new(ClientWrapper::Tcp(TcpClient::default())) + } + #[cfg(feature = "sync")] + { + IggyClient::new(ClientWrapper::TcpSync(Box::new(TcpClientSync::default()))) + } } } @@ -83,15 +97,24 @@ impl IggyClient { /// Creates a new `IggyClient` from the provided connection string. pub fn from_connection_string(connection_string: &str) -> Result<Self, IggyError> { match ConnectionStringUtils::parse_protocol(connection_string)? { + #[cfg(feature = "async")] TransportProtocol::Tcp => Ok(IggyClient::new(ClientWrapper::Tcp( TcpClient::from_connection_string(connection_string)?, ))), + #[cfg(feature = "sync")] + TransportProtocol::Tcp => Ok(IggyClient::new(ClientWrapper::TcpSync(Box::new( + TcpClientSync::<TcpTransport>::from_connection_string(connection_string)?, + )))), + #[cfg(feature = "async")] TransportProtocol::Quic => Ok(IggyClient::new(ClientWrapper::Quic( QuicClient::from_connection_string(connection_string)?, ))), + #[cfg(feature = "async")] TransportProtocol::Http => Ok(IggyClient::new(ClientWrapper::Http( HttpClient::from_connection_string(connection_string)?, ))), + #[cfg(feature = "sync")] + _ => Err(IggyError::InvalidConfiguration), } } @@ -174,45 +197,74 @@ impl IggyClient { } } -#[async_trait] -impl Client for IggyClient { - async fn connect(&self) -> Result<(), IggyError> { - let heartbeat_interval; - { - let client = self.client.read().await; - client.connect().await?; - heartbeat_interval = client.heartbeat_interval().await; - } +#[cfg(feature = "async")] +mod async_impl { + use super::*; - let client = self.client.clone(); - spawn(async move { - loop { - debug!("Sending the heartbeat..."); - if let Err(error) = client.read().await.ping().await { - error!("There was an error when sending a heartbeat. {error}"); - if error == IggyError::ClientShutdown { - warn!("The client has been shut down - stopping the heartbeat."); - return; + #[async_trait] + impl Client for IggyClient { + async fn connect(&self) -> Result<(), IggyError> { + let heartbeat_interval; + { + let client = self.client.read().await; + client.connect().await?; + heartbeat_interval = client.heartbeat_interval().await; + } + + let client = self.client.clone(); + spawn(async move { + loop { + debug!("Sending the heartbeat..."); + if let Err(error) = client.read().await.ping().await { + error!("There was an error when sending a heartbeat. {error}"); + if error == IggyError::ClientShutdown { + warn!("The client has been shut down - stopping the heartbeat."); + return; + } + } else { + debug!("Heartbeat was sent successfully."); } - } else { - debug!("Heartbeat was sent successfully."); + sleep(heartbeat_interval.get_duration()).await } - sleep(heartbeat_interval.get_duration()).await - } - }); - Ok(()) - } + }); + Ok(()) + } - async fn disconnect(&self) -> Result<(), IggyError> { - self.client.read().await.disconnect().await - } + async fn disconnect(&self) -> Result<(), IggyError> { + self.client.read().disconnect().await + } - async fn shutdown(&self) -> Result<(), IggyError> { - self.client.read().await.shutdown().await + async fn shutdown(&self) -> Result<(), IggyError> { + self.client.read().shutdown().await + } + + async fn subscribe_events(&self) -> Receiver<DiagnosticEvent> { + self.client.read().subscribe_events().await + } } +} - async fn subscribe_events(&self) -> Receiver<DiagnosticEvent> { - self.client.read().await.subscribe_events().await +#[cfg(feature = "sync")] +mod sync_impl { + use super::*; + + impl Client for IggyClient { + fn connect(&self) -> Result<(), IggyError> { + let client = self.client.read(); + client.connect() + } + + fn disconnect(&self) -> Result<(), IggyError> { + self.client.read().disconnect() + } + + fn shutdown(&self) -> Result<(), IggyError> { + self.client.read().shutdown() + } + + fn subscribe_events(&self) -> Receiver<DiagnosticEvent> { + self.client.read().subscribe_events() + } } } diff --git a/core/sdk/src/clients/client_builder.rs b/core/sdk/src/clients/client_builder.rs index 205dcd8f..c993e6f5 100644 --- a/core/sdk/src/clients/client_builder.rs +++ b/core/sdk/src/clients/client_builder.rs @@ -16,15 +16,19 @@ * under the License. */ -use crate::client_wrappers::client_wrapper::ClientWrapper; +use crate::client_wrappers::ClientWrapper; use crate::clients::client::IggyClient; +#[cfg(feature = "async")] use crate::http::http_client::HttpClient; -use crate::prelude::{ - AutoLogin, EncryptorKind, HttpClientConfigBuilder, IggyDuration, IggyError, Partitioner, - QuicClientConfigBuilder, TcpClientConfigBuilder, -}; +use crate::prelude::{AutoLogin, EncryptorKind, IggyDuration, IggyError, Partitioner}; +#[cfg(feature = "async")] +use crate::prelude::{HttpClientConfigBuilder, QuicClientConfigBuilder, TcpClientConfigBuilder}; +#[cfg(feature = "async")] use crate::quic::quic_client::QuicClient; +#[cfg(feature = "async")] use crate::tcp::tcp_client::TcpClient; +#[cfg(feature = "sync")] +use crate::tcp::tcp_client_sync::TcpClientSync; use iggy_common::{ConnectionStringUtils, TransportProtocol}; use std::sync::Arc; use tracing::error; @@ -48,6 +52,7 @@ impl IggyClientBuilder { pub fn from_connection_string(connection_string: &str) -> Result<Self, IggyError> { let mut builder = Self::new(); + #[cfg(feature = "async")] match ConnectionStringUtils::parse_protocol(connection_string)? { TransportProtocol::Tcp => { builder.client = Some(ClientWrapper::Tcp(TcpClient::from_connection_string( @@ -66,6 +71,18 @@ impl IggyClientBuilder { } } + #[cfg(feature = "sync")] + match ConnectionStringUtils::parse_protocol(connection_string)? { + TransportProtocol::Tcp => { + builder.client = Some(ClientWrapper::TcpSync(Box::new( + TcpClientSync::from_connection_string(connection_string)?, + ))); + } + _ => { + return Err(IggyError::InvalidConfiguration); + } + } + Ok(builder) } @@ -90,6 +107,7 @@ impl IggyClientBuilder { /// This method provides fluent API for the TCP client configuration. /// It returns the `TcpClientBuilder` instance, which allows to configure the TCP client with custom settings or using defaults. /// This should be called after the non-protocol specific methods, such as `with_partitioner`, `with_encryptor` or `with_message_handler`. + #[cfg(feature = "async")] pub fn with_tcp(self) -> TcpClientBuilder { TcpClientBuilder { config: TcpClientConfigBuilder::default(), @@ -100,6 +118,7 @@ impl IggyClientBuilder { /// This method provides fluent API for the QUIC client configuration. /// It returns the `QuicClientBuilder` instance, which allows to configure the QUIC client with custom settings or using defaults. /// This should be called after the non-protocol specific methods, such as `with_partitioner`, `with_encryptor` or `with_message_handler`. + #[cfg(feature = "async")] pub fn with_quic(self) -> QuicClientBuilder { QuicClientBuilder { config: QuicClientConfigBuilder::default(), @@ -110,6 +129,7 @@ impl IggyClientBuilder { /// This method provides fluent API for the HTTP client configuration. /// It returns the `HttpClientBuilder` instance, which allows to configure the HTTP client with custom settings or using defaults. /// This should be called after the non-protocol specific methods, such as `with_partitioner`, `with_encryptor` or `with_message_handler`. + #[cfg(feature = "async")] pub fn with_http(self) -> HttpClientBuilder { HttpClientBuilder { config: HttpClientConfigBuilder::default(), @@ -131,12 +151,14 @@ impl IggyClientBuilder { } } +#[cfg(feature = "async")] #[derive(Debug, Default)] pub struct TcpClientBuilder { config: TcpClientConfigBuilder, parent_builder: IggyClientBuilder, } +#[cfg(feature = "async")] impl TcpClientBuilder { /// Sets the server address for the TCP client. pub fn with_server_address(mut self, server_address: String) -> Self { @@ -209,12 +231,14 @@ impl TcpClientBuilder { } } +#[cfg(feature = "async")] #[derive(Debug, Default)] pub struct QuicClientBuilder { config: QuicClientConfigBuilder, parent_builder: IggyClientBuilder, } +#[cfg(feature = "async")] impl QuicClientBuilder { /// Sets the server address for the QUIC client. pub fn with_server_address(mut self, server_address: String) -> Self { @@ -261,12 +285,14 @@ impl QuicClientBuilder { } } +#[cfg(feature = "async")] #[derive(Debug, Default)] pub struct HttpClientBuilder { config: HttpClientConfigBuilder, parent_builder: IggyClientBuilder, } +#[cfg(feature = "async")] impl HttpClientBuilder { /// Sets the server address for the HTTP client. pub fn with_api_url(mut self, api_url: String) -> Self { diff --git a/core/sdk/src/clients/consumer.rs b/core/sdk/src/clients/consumer.rs index bd84a60c..30fe1509 100644 --- a/core/sdk/src/clients/consumer.rs +++ b/core/sdk/src/clients/consumer.rs @@ -16,7 +16,7 @@ * under the License. */ -use crate::client_wrappers::client_wrapper::ClientWrapper; +use crate::client_wrappers::ClientWrapper; use bytes::Bytes; use dashmap::DashMap; use futures::Stream; diff --git a/core/sdk/src/clients/consumer_builder.rs b/core/sdk/src/clients/consumer_builder.rs index 576d841e..15c1fbe5 100644 --- a/core/sdk/src/clients/consumer_builder.rs +++ b/core/sdk/src/clients/consumer_builder.rs @@ -16,7 +16,7 @@ * under the License. */ -use crate::client_wrappers::client_wrapper::ClientWrapper; +use crate::client_wrappers::ClientWrapper; use crate::prelude::{AutoCommit, AutoCommitWhen, IggyConsumer}; use iggy_common::locking::IggySharedMut; use iggy_common::{Consumer, EncryptorKind, Identifier, IggyDuration, PollingStrategy}; diff --git a/core/sdk/src/clients/producer.rs b/core/sdk/src/clients/producer.rs index 5eee0e25..ac21d0b5 100644 --- a/core/sdk/src/clients/producer.rs +++ b/core/sdk/src/clients/producer.rs @@ -16,7 +16,7 @@ * under the License. */ use super::ORDERING; -use crate::client_wrappers::client_wrapper::ClientWrapper; +use crate::client_wrappers::ClientWrapper; use crate::clients::MAX_BATCH_LENGTH; use crate::clients::producer_builder::SendMode; use crate::clients::producer_config::DirectConfig; diff --git a/core/sdk/src/clients/producer_builder.rs b/core/sdk/src/clients/producer_builder.rs index ae8bb368..03b9a8d6 100644 --- a/core/sdk/src/clients/producer_builder.rs +++ b/core/sdk/src/clients/producer_builder.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use crate::client_wrappers::client_wrapper::ClientWrapper; +use crate::client_wrappers::ClientWrapper; use crate::clients::producer_config::{BackgroundConfig, DirectConfig}; use crate::prelude::IggyProducer; use iggy_common::locking::IggySharedMut; diff --git a/core/sdk/src/stream_builder/build/build_iggy_consumer.rs b/core/sdk/src/stream_builder/build/build_iggy_consumer.rs index 2cd105eb..9763cdcf 100644 --- a/core/sdk/src/stream_builder/build/build_iggy_consumer.rs +++ b/core/sdk/src/stream_builder/build/build_iggy_consumer.rs @@ -20,6 +20,7 @@ use crate::clients::client::IggyClient; use crate::clients::consumer::IggyConsumer; use crate::prelude::{ConsumerKind, IggyError}; use crate::stream_builder::IggyConsumerConfig; +use futures::TryFutureExt; use tracing::{error, trace}; /// Builds an `IggyConsumer` from the given `IggyClient` and `IggyConsumerConfig`. @@ -39,6 +40,7 @@ use tracing::{error, trace}; /// The `IggyConsumerConfig` fields are used to configure the `IggyConsumer`. /// #[maybe_async::maybe_async] +#[cfg(feature = "async")] pub(crate) async fn build_iggy_consumer( client: &IggyClient, config: &IggyConsumerConfig, diff --git a/core/sdk/src/stream_builder/build/build_iggy_producer.rs b/core/sdk/src/stream_builder/build/build_iggy_producer.rs index 5780b41f..7b94235d 100644 --- a/core/sdk/src/stream_builder/build/build_iggy_producer.rs +++ b/core/sdk/src/stream_builder/build/build_iggy_producer.rs @@ -21,6 +21,7 @@ use crate::clients::producer::IggyProducer; use crate::clients::producer_config::DirectConfig; use crate::prelude::{IggyError, IggyExpiry, MaxTopicSize}; use crate::stream_builder::IggyProducerConfig; +use futures::TryFutureExt; use tracing::{error, trace}; /// Build a producer from the stream configuration. diff --git a/core/sdk/src/tcp/tcp_client_sync.rs b/core/sdk/src/tcp/tcp_client_sync.rs index 91c539a9..2f8a5f2e 100644 --- a/core/sdk/src/tcp/tcp_client_sync.rs +++ b/core/sdk/src/tcp/tcp_client_sync.rs @@ -174,7 +174,7 @@ where } fn publish_event(&self, event: DiagnosticEvent) { - if let Err(error) = self.events.0.broadcast(event) { + if let Err(error) = self.events.0.try_broadcast(event) { error!("Failed to send a TCP diagnostic event: {error}"); } }
