This is an automated email from the ASF dual-hosted git repository. bashirbekov pushed a commit to branch add-sync-client in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 031d7de97639cb79a50525042d08b06cc92c1dd5 Author: haze518 <[email protected]> AuthorDate: Fri Sep 19 07:51:52 2025 +0600 complete sync client --- core/common/src/error/iggy_error.rs | 6 + core/integration/src/lib.rs | 1 + core/integration/src/tcp_client_sync.rs | 88 +++ core/integration/tests/sdk/mod.rs | 1 + core/integration/tests/sdk/tcp_client_sync.rs | 274 +++++++ core/sdk/src/client_wrappers/binary_client.rs | 4 + .../binary_consumer_group_client.rs | 29 + .../binary_consumer_offset_client.rs | 15 + .../src/client_wrappers/binary_message_client.rs | 23 + .../src/client_wrappers/binary_partition_client.rs | 10 + .../binary_personal_access_token_client.rs | 4 + .../src/client_wrappers/binary_segment_client.rs | 5 + .../src/client_wrappers/binary_stream_client.rs | 6 + .../src/client_wrappers/binary_system_client.rs | 7 + .../sdk/src/client_wrappers/binary_topic_client.rs | 31 + core/sdk/src/client_wrappers/binary_user_client.rs | 17 + core/sdk/src/client_wrappers/client_wrapper.rs | 1 + core/sdk/src/connection/mod.rs | 1 + core/sdk/src/connection/transport.rs | 178 +++++ core/sdk/src/lib.rs | 2 + core/sdk/src/prelude.rs | 1 + core/sdk/src/protocol/core.rs | 796 +++++++++++++++++++ core/sdk/src/protocol/mod.rs | 1 + core/sdk/src/tcp/mod.rs | 2 + core/sdk/src/tcp/tcp_client_sync.rs | 862 +++++++++++++++++++++ 25 files changed, 2365 insertions(+) diff --git a/core/common/src/error/iggy_error.rs b/core/common/src/error/iggy_error.rs index cc8a284b..2597e154 100644 --- a/core/common/src/error/iggy_error.rs +++ b/core/common/src/error/iggy_error.rs @@ -459,6 +459,12 @@ pub enum IggyError { CannotReadIndexPosition = 10011, #[error("Cannot read index timestamp")] CannotReadIndexTimestamp = 10012, + #[error("Max number of retry has exceeded")] + MaxRetriesExceeded = 10050, + #[error("Connection missed socket")] + ConnectionMissedSocket = 10051, + #[error("Incorrect connection state")] + IncorrectConnectionState = 10052, } impl IggyError { diff --git a/core/integration/src/lib.rs b/core/integration/src/lib.rs index 88c51ac1..1b35d43b 100644 --- a/core/integration/src/lib.rs +++ b/core/integration/src/lib.rs @@ -24,6 +24,7 @@ pub mod http_client; pub mod quic_client; #[allow(deprecated)] pub mod tcp_client; +pub mod tcp_client_sync; #[allow(deprecated)] pub mod test_mcp_server; #[allow(deprecated)] diff --git a/core/integration/src/tcp_client_sync.rs b/core/integration/src/tcp_client_sync.rs new file mode 100644 index 00000000..e885686c --- /dev/null +++ b/core/integration/src/tcp_client_sync.rs @@ -0,0 +1,88 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use crate::test_server::{ClientFactory, Transport}; +use async_trait::async_trait; +use iggy::prelude::{ClientWrapper, TcpClientConfig, TcpClientSyncTcp, TcpClientSyncTls}; +use std::sync::Arc; + +#[derive(Debug, Clone, Default)] +pub struct TcpClientSyncFactory { + pub server_addr: String, + pub nodelay: bool, + pub tls_enabled: bool, + pub tls_domain: String, + pub tls_ca_file: Option<String>, + pub tls_validate_certificate: bool, +} + +#[async_trait] +impl ClientFactory for TcpClientSyncFactory { + async fn create_client(&self) -> ClientWrapper { + let config = TcpClientConfig { + server_address: self.server_addr.clone(), + nodelay: self.nodelay, + tls_enabled: self.tls_enabled, + tls_domain: self.tls_domain.clone(), + tls_ca_file: self.tls_ca_file.clone(), + tls_validate_certificate: self.tls_validate_certificate, + ..TcpClientConfig::default() + }; + + let client: Box<dyn iggy_binary_protocol::Client> = if self.tls_enabled { + Box::new(TcpClientSyncTls::create_tcp_tls(Arc::new(config)).unwrap_or_else(|e| { + panic!( + "Failed to create TcpClientSyncTls, iggy-server has address {}, error: {:?}", + self.server_addr, e + ) + })) + } else { + Box::new(TcpClientSyncTcp::create_tcp(Arc::new(config)).unwrap_or_else(|e| { + panic!( + "Failed to create TcpClientSyncTcp, iggy-server has address {}, error: {:?}", + self.server_addr, e + ) + })) + }; + + // Connect the client + client.connect().await.unwrap_or_else(|e| { + if self.tls_enabled { + panic!( + "Failed to connect to iggy-server at {} with TLS enabled, error: {:?}\\n\\\n Hint: Make sure the server is started with TLS enabled and self-signed certificate:\\n\\\n IGGY_TCP_TLS_ENABLED=true IGGY_TCP_TLS_SELF_SIGNED=true\\n\n or start iggy-bench with relevant tcp tls arguments: --tls --tls-domain <domain> --tls-ca-file <ca_file>\\n", + self.server_addr, e + ) + } else { + panic!( + "Failed to connect to iggy-server at {}, error: {:?}", + self.server_addr, e + ) + } + }); + + ClientWrapper::TcpSync(client) + } + + fn transport(&self) -> Transport { + Transport::Tcp + } + + fn server_addr(&self) -> String { + self.server_addr.clone() + } +} \ No newline at end of file diff --git a/core/integration/tests/sdk/mod.rs b/core/integration/tests/sdk/mod.rs index 6d94bfa6..26796a62 100644 --- a/core/integration/tests/sdk/mod.rs +++ b/core/integration/tests/sdk/mod.rs @@ -17,3 +17,4 @@ */ mod producer; +mod tcp_client_sync; diff --git a/core/integration/tests/sdk/tcp_client_sync.rs b/core/integration/tests/sdk/tcp_client_sync.rs new file mode 100644 index 00000000..7c79d65a --- /dev/null +++ b/core/integration/tests/sdk/tcp_client_sync.rs @@ -0,0 +1,274 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use std::sync::Arc; + +use iggy::prelude::*; +use integration::test_server::{TestServer, IpAddrKind, ClientFactory}; +use integration::tcp_client_sync::TcpClientSyncFactory; +use serial_test::serial; +use bytes::Bytes; + +#[tokio::test] +#[serial] +async fn should_connect_and_ping_tcp_sync_client() { + let mut test_server = TestServer::default(); + test_server.start(); + + let tcp_client_config = TcpClientConfig { + server_address: test_server.get_raw_tcp_addr().unwrap(), + ..TcpClientConfig::default() + }; + let client = ClientWrapper::TcpSync(Box::new(TcpClientSyncTcp::create_tcp(Arc::new(tcp_client_config)).unwrap())); + let client = IggyClient::create(client, None, None); + + client.connect().await.unwrap(); + assert!(client.ping().await.is_ok(), "Failed to ping server"); + + // let factory = TcpClientSyncFactory { + // server_addr: test_server.get_raw_tcp_addr().unwrap(), + // nodelay: false, + // tls_enabled: false, + // tls_domain: "localhost".to_string(), + // tls_ca_file: None, + // tls_validate_certificate: false, + // }; + + // let client_wrapper = factory.create_client().await; + + // Test direct ping on the wrapper (which should already be connected) + // match &client_wrapper { + // ClientWrapper::TcpSync(sync_client) => { + // sync_client.ping().await.unwrap(); + + // // Test login as root user + // sync_client + // .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD) + // .await + // .unwrap(); + // } + // _ => panic!("Expected TcpSync client"), + // } + + test_server.stop(); +} + +#[tokio::test] +#[serial] +async fn should_perform_basic_streaming_operations() { + let mut server = TestServer::new(None, true, None, IpAddrKind::V4); + server.start(); + + let factory = TcpClientSyncFactory { + server_addr: server.get_raw_tcp_addr().unwrap(), + nodelay: false, + tls_enabled: false, + tls_domain: "localhost".to_string(), + tls_ca_file: None, + tls_validate_certificate: false, + }; + + let client_wrapper = factory.create_client().await; + let client = IggyClient::create(client_wrapper, None, None); + + // Connect first + client.connect().await.unwrap(); + + // Login as root + client + .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD) + .await + .unwrap(); + + let stream_id = 1; + let topic_id = 1; + let stream_name = "test-stream"; + let topic_name = "test-topic"; + + // Create stream + let stream = client + .create_stream(stream_name, Some(stream_id)) + .await + .unwrap(); + assert_eq!(stream.id, stream_id); + assert_eq!(stream.name, stream_name); + + // Create topic + let topic = client + .create_topic( + &Identifier::numeric(stream_id).unwrap(), + topic_name, + 1, // partitions_count + Default::default(), // compression_algorithm + None, // replication_factor + Some(topic_id), + IggyExpiry::NeverExpire, // message_expiry + MaxTopicSize::ServerDefault, // max_topic_size + ) + .await + .unwrap(); + assert_eq!(topic.id, topic_id); + assert_eq!(topic.name, topic_name); + + // Send a message + let payload = Bytes::from("Hello, World!"); + let mut message = IggyMessage::builder() + .payload(payload.clone()) + .build() + .unwrap(); + client + .send_messages( + &Identifier::numeric(stream_id).unwrap(), + &Identifier::numeric(topic_id).unwrap(), + &Partitioning::partition_id(1), + &mut [message], + ) + .await + .unwrap(); + + // Poll the message + let consumer = Consumer::default(); + let polled = client + .poll_messages( + &Identifier::numeric(stream_id).unwrap(), + &Identifier::numeric(topic_id).unwrap(), + Some(1), // partition_id + &consumer, + &PollingStrategy::next(), + 1, // count + false, // auto_commit + ) + .await + .unwrap(); + + assert_eq!(polled.messages.len(), 1); + assert_eq!( + polled.messages[0].payload, + payload + ); + + server.stop(); +} + +#[tokio::test] +#[serial] +async fn should_handle_authentication() { + let mut server = TestServer::new(None, true, None, IpAddrKind::V4); + server.start(); + + let factory = TcpClientSyncFactory { + server_addr: server.get_raw_tcp_addr().unwrap(), + nodelay: false, + tls_enabled: false, + tls_domain: "localhost".to_string(), + tls_ca_file: None, + tls_validate_certificate: false, + }; + + let client_wrapper = factory.create_client().await; + let client = IggyClient::create(client_wrapper, None, None); + + // Connect first + client.connect().await.unwrap(); + + // Test login as root user + let identity = client + .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD) + .await + .unwrap(); + + assert_eq!(identity.user_id, DEFAULT_ROOT_USER_ID); + + // Test getting current user info + let me = client.get_me().await.unwrap(); + assert_eq!(me.user_id, Some(DEFAULT_ROOT_USER_ID)); + + // Test logout + client.logout_user().await.unwrap(); + + server.stop(); +} + +#[tokio::test] +#[serial] +async fn should_handle_connection_lifecycle() { + let mut server = TestServer::new(None, true, None, IpAddrKind::V4); + server.start(); + + let factory = TcpClientSyncFactory { + server_addr: server.get_raw_tcp_addr().unwrap(), + nodelay: false, + tls_enabled: false, + tls_domain: "localhost".to_string(), + tls_ca_file: None, + tls_validate_certificate: false, + }; + + let client_wrapper = factory.create_client().await; + let client = IggyClient::create(client_wrapper, None, None); + + // Connect first + client.connect().await.unwrap(); + + // Test basic ping after connection + client.ping().await.unwrap(); + + // Test getting server stats + let stats = client.get_stats().await.unwrap(); + assert!(stats.clients_count > 0); + + server.stop(); +} + +#[tokio::test] +#[serial] +async fn should_connect_with_tls() { + let mut server = TestServer::new( + Some([("IGGY_TCP_TLS_ENABLED".to_string(), "true".to_string())].into()), + true, + None, + IpAddrKind::V4, + ); + server.start(); + + let factory = TcpClientSyncFactory { + server_addr: server.get_raw_tcp_addr().unwrap(), + nodelay: false, + tls_enabled: true, + tls_domain: "localhost".to_string(), + tls_ca_file: None, + tls_validate_certificate: false, + }; + + let client_wrapper = factory.create_client().await; + let client = IggyClient::create(client_wrapper, None, None); + + // Connect first + client.connect().await.unwrap(); + + // Test basic ping with TLS + client.ping().await.unwrap(); + + // Test login with TLS + client + .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD) + .await + .unwrap(); + + server.stop(); +} \ No newline at end of file diff --git a/core/sdk/src/client_wrappers/binary_client.rs b/core/sdk/src/client_wrappers/binary_client.rs index 41d5744e..757d16a7 100644 --- a/core/sdk/src/client_wrappers/binary_client.rs +++ b/core/sdk/src/client_wrappers/binary_client.rs @@ -29,6 +29,7 @@ impl Client for ClientWrapper { ClientWrapper::Iggy(client) => client.connect().await, ClientWrapper::Http(client) => client.connect().await, ClientWrapper::Tcp(client) => client.connect().await, + ClientWrapper::TcpSync(client) => client.connect().await, ClientWrapper::Quic(client) => client.connect().await, } } @@ -38,6 +39,7 @@ impl Client for ClientWrapper { ClientWrapper::Iggy(client) => client.disconnect().await, ClientWrapper::Http(client) => client.disconnect().await, ClientWrapper::Tcp(client) => client.disconnect().await, + ClientWrapper::TcpSync(client) => client.disconnect().await, ClientWrapper::Quic(client) => client.disconnect().await, } } @@ -47,6 +49,7 @@ impl Client for ClientWrapper { ClientWrapper::Iggy(client) => client.shutdown().await, ClientWrapper::Http(client) => client.shutdown().await, ClientWrapper::Tcp(client) => client.shutdown().await, + ClientWrapper::TcpSync(client) => client.shutdown().await, ClientWrapper::Quic(client) => client.shutdown().await, } } @@ -56,6 +59,7 @@ impl Client for ClientWrapper { ClientWrapper::Iggy(client) => client.subscribe_events().await, ClientWrapper::Http(client) => client.subscribe_events().await, ClientWrapper::Tcp(client) => client.subscribe_events().await, + ClientWrapper::TcpSync(client) => client.subscribe_events().await, ClientWrapper::Quic(client) => client.subscribe_events().await, } } diff --git a/core/sdk/src/client_wrappers/binary_consumer_group_client.rs b/core/sdk/src/client_wrappers/binary_consumer_group_client.rs index 18117588..976be37c 100644 --- a/core/sdk/src/client_wrappers/binary_consumer_group_client.rs +++ b/core/sdk/src/client_wrappers/binary_consumer_group_client.rs @@ -46,6 +46,11 @@ impl ConsumerGroupClient for ClientWrapper { .get_consumer_group(stream_id, topic_id, group_id) .await } + ClientWrapper::TcpSync(client) => { + client + .get_consumer_group(stream_id, topic_id, group_id) + .await + } ClientWrapper::Quic(client) => { client .get_consumer_group(stream_id, topic_id, group_id) @@ -63,6 +68,7 @@ impl ConsumerGroupClient for ClientWrapper { ClientWrapper::Iggy(client) => client.get_consumer_groups(stream_id, topic_id).await, ClientWrapper::Http(client) => client.get_consumer_groups(stream_id, topic_id).await, ClientWrapper::Tcp(client) => client.get_consumer_groups(stream_id, topic_id).await, + ClientWrapper::TcpSync(client) => client.get_consumer_groups(stream_id, topic_id).await, ClientWrapper::Quic(client) => client.get_consumer_groups(stream_id, topic_id).await, } } @@ -90,6 +96,11 @@ impl ConsumerGroupClient for ClientWrapper { .create_consumer_group(stream_id, topic_id, name, group_id) .await } + ClientWrapper::TcpSync(client) => { + client + .create_consumer_group(stream_id, topic_id, name, group_id) + .await + } ClientWrapper::Quic(client) => { client .create_consumer_group(stream_id, topic_id, name, group_id) @@ -120,6 +131,11 @@ impl ConsumerGroupClient for ClientWrapper { .delete_consumer_group(stream_id, topic_id, group_id) .await } + ClientWrapper::TcpSync(client) => { + client + .delete_consumer_group(stream_id, topic_id, group_id) + .await + } ClientWrapper::Quic(client) => { client .delete_consumer_group(stream_id, topic_id, group_id) @@ -150,6 +166,11 @@ impl ConsumerGroupClient for ClientWrapper { .join_consumer_group(stream_id, topic_id, group_id) .await } + ClientWrapper::TcpSync(client) => { + client + .join_consumer_group(stream_id, topic_id, group_id) + .await + } ClientWrapper::Quic(client) => { client .join_consumer_group(stream_id, topic_id, group_id) @@ -180,6 +201,11 @@ impl ConsumerGroupClient for ClientWrapper { .leave_consumer_group(stream_id, topic_id, group_id) .await } + ClientWrapper::TcpSync(client) => { + client + .leave_consumer_group(stream_id, topic_id, group_id) + .await + } ClientWrapper::Quic(client) => { client .leave_consumer_group(stream_id, topic_id, group_id) @@ -202,6 +228,9 @@ impl AsyncDrop for ClientWrapper { ClientWrapper::Tcp(client) => { let _ = client.logout_user().await; } + ClientWrapper::TcpSync(client) => { + let _ = client.logout_user().await; + } ClientWrapper::Quic(client) => { let _ = client.logout_user().await; } diff --git a/core/sdk/src/client_wrappers/binary_consumer_offset_client.rs b/core/sdk/src/client_wrappers/binary_consumer_offset_client.rs index 668f120b..51c6ea2b 100644 --- a/core/sdk/src/client_wrappers/binary_consumer_offset_client.rs +++ b/core/sdk/src/client_wrappers/binary_consumer_offset_client.rs @@ -47,6 +47,11 @@ impl ConsumerOffsetClient for ClientWrapper { .store_consumer_offset(consumer, stream_id, topic_id, partition_id, offset) .await } + ClientWrapper::TcpSync(client) => { + client + .store_consumer_offset(consumer, stream_id, topic_id, partition_id, offset) + .await + } ClientWrapper::Quic(client) => { client .store_consumer_offset(consumer, stream_id, topic_id, partition_id, offset) @@ -78,6 +83,11 @@ impl ConsumerOffsetClient for ClientWrapper { .get_consumer_offset(consumer, stream_id, topic_id, partition_id) .await } + ClientWrapper::TcpSync(client) => { + client + .get_consumer_offset(consumer, stream_id, topic_id, partition_id) + .await + } ClientWrapper::Quic(client) => { client .get_consumer_offset(consumer, stream_id, topic_id, partition_id) @@ -109,6 +119,11 @@ impl ConsumerOffsetClient for ClientWrapper { .delete_consumer_offset(consumer, stream_id, topic_id, partition_id) .await } + ClientWrapper::TcpSync(client) => { + client + .delete_consumer_offset(consumer, stream_id, topic_id, partition_id) + .await + } ClientWrapper::Quic(client) => { client .delete_consumer_offset(consumer, stream_id, topic_id, partition_id) diff --git a/core/sdk/src/client_wrappers/binary_message_client.rs b/core/sdk/src/client_wrappers/binary_message_client.rs index 01e0c166..98db2924 100644 --- a/core/sdk/src/client_wrappers/binary_message_client.rs +++ b/core/sdk/src/client_wrappers/binary_message_client.rs @@ -75,6 +75,19 @@ impl MessageClient for ClientWrapper { ) .await } + ClientWrapper::TcpSync(client) => { + client + .poll_messages( + stream_id, + topic_id, + partition_id, + consumer, + strategy, + count, + auto_commit, + ) + .await + } ClientWrapper::Quic(client) => { client .poll_messages( @@ -114,6 +127,11 @@ impl MessageClient for ClientWrapper { .send_messages(stream_id, topic_id, partitioning, messages) .await } + ClientWrapper::TcpSync(client) => { + client + .send_messages(stream_id, topic_id, partitioning, messages) + .await + } ClientWrapper::Quic(client) => { client .send_messages(stream_id, topic_id, partitioning, messages) @@ -145,6 +163,11 @@ impl MessageClient for ClientWrapper { .flush_unsaved_buffer(stream_id, topic_id, partitioning_id, fsync) .await } + ClientWrapper::TcpSync(client) => { + client + .flush_unsaved_buffer(stream_id, topic_id, partitioning_id, fsync) + .await + } ClientWrapper::Quic(client) => { client .flush_unsaved_buffer(stream_id, topic_id, partitioning_id, fsync) diff --git a/core/sdk/src/client_wrappers/binary_partition_client.rs b/core/sdk/src/client_wrappers/binary_partition_client.rs index 26ccad1d..ceee0ad8 100644 --- a/core/sdk/src/client_wrappers/binary_partition_client.rs +++ b/core/sdk/src/client_wrappers/binary_partition_client.rs @@ -50,6 +50,11 @@ impl PartitionClient for ClientWrapper { .create_partitions(stream_id, topic_id, partitions_count) .await } + ClientWrapper::TcpSync(client) => { + client + .create_partitions(stream_id, topic_id, partitions_count) + .await + } } } @@ -80,6 +85,11 @@ impl PartitionClient for ClientWrapper { .delete_partitions(stream_id, topic_id, partitions_count) .await } + ClientWrapper::TcpSync(client) => { + client + .delete_partitions(stream_id, topic_id, partitions_count) + .await + } } } } diff --git a/core/sdk/src/client_wrappers/binary_personal_access_token_client.rs b/core/sdk/src/client_wrappers/binary_personal_access_token_client.rs index 58c6fbed..98dd829f 100644 --- a/core/sdk/src/client_wrappers/binary_personal_access_token_client.rs +++ b/core/sdk/src/client_wrappers/binary_personal_access_token_client.rs @@ -32,6 +32,7 @@ impl PersonalAccessTokenClient for ClientWrapper { ClientWrapper::Http(client) => client.get_personal_access_tokens().await, ClientWrapper::Tcp(client) => client.get_personal_access_tokens().await, ClientWrapper::Quic(client) => client.get_personal_access_tokens().await, + ClientWrapper::TcpSync(client) => client.get_personal_access_tokens().await, } } @@ -45,6 +46,7 @@ impl PersonalAccessTokenClient for ClientWrapper { ClientWrapper::Http(client) => client.create_personal_access_token(name, expiry).await, ClientWrapper::Tcp(client) => client.create_personal_access_token(name, expiry).await, ClientWrapper::Quic(client) => client.create_personal_access_token(name, expiry).await, + ClientWrapper::TcpSync(client) => client.create_personal_access_token(name, expiry).await, } } @@ -54,6 +56,7 @@ impl PersonalAccessTokenClient for ClientWrapper { ClientWrapper::Http(client) => client.delete_personal_access_token(name).await, ClientWrapper::Tcp(client) => client.delete_personal_access_token(name).await, ClientWrapper::Quic(client) => client.delete_personal_access_token(name).await, + ClientWrapper::TcpSync(client) => client.delete_personal_access_token(name).await, } } @@ -66,6 +69,7 @@ impl PersonalAccessTokenClient for ClientWrapper { ClientWrapper::Http(client) => client.login_with_personal_access_token(token).await, ClientWrapper::Tcp(client) => client.login_with_personal_access_token(token).await, ClientWrapper::Quic(client) => client.login_with_personal_access_token(token).await, + ClientWrapper::TcpSync(client) => client.login_with_personal_access_token(token).await, } } } diff --git a/core/sdk/src/client_wrappers/binary_segment_client.rs b/core/sdk/src/client_wrappers/binary_segment_client.rs index 0ec082ff..53845df0 100644 --- a/core/sdk/src/client_wrappers/binary_segment_client.rs +++ b/core/sdk/src/client_wrappers/binary_segment_client.rs @@ -51,6 +51,11 @@ impl SegmentClient for ClientWrapper { .delete_segments(stream_id, topic_id, partition_id, segments_count) .await } + ClientWrapper::TcpSync(client) => { + client + .delete_segments(stream_id, topic_id, partition_id, segments_count) + .await + } } } } diff --git a/core/sdk/src/client_wrappers/binary_stream_client.rs b/core/sdk/src/client_wrappers/binary_stream_client.rs index 93d785dd..37ee448c 100644 --- a/core/sdk/src/client_wrappers/binary_stream_client.rs +++ b/core/sdk/src/client_wrappers/binary_stream_client.rs @@ -29,6 +29,7 @@ impl StreamClient for ClientWrapper { ClientWrapper::Http(client) => client.get_stream(stream_id).await, ClientWrapper::Tcp(client) => client.get_stream(stream_id).await, ClientWrapper::Quic(client) => client.get_stream(stream_id).await, + ClientWrapper::TcpSync(client) => client.get_stream(stream_id).await, } } @@ -38,6 +39,7 @@ impl StreamClient for ClientWrapper { ClientWrapper::Http(client) => client.get_streams().await, ClientWrapper::Tcp(client) => client.get_streams().await, ClientWrapper::Quic(client) => client.get_streams().await, + ClientWrapper::TcpSync(client) => client.get_streams().await, } } @@ -51,6 +53,7 @@ impl StreamClient for ClientWrapper { ClientWrapper::Http(client) => client.create_stream(name, stream_id).await, ClientWrapper::Tcp(client) => client.create_stream(name, stream_id).await, ClientWrapper::Quic(client) => client.create_stream(name, stream_id).await, + ClientWrapper::TcpSync(client) => client.create_stream(name, stream_id).await, } } @@ -60,6 +63,7 @@ impl StreamClient for ClientWrapper { ClientWrapper::Http(client) => client.update_stream(stream_id, name).await, ClientWrapper::Tcp(client) => client.update_stream(stream_id, name).await, ClientWrapper::Quic(client) => client.update_stream(stream_id, name).await, + ClientWrapper::TcpSync(client) => client.update_stream(stream_id, name).await, } } @@ -69,6 +73,7 @@ impl StreamClient for ClientWrapper { ClientWrapper::Http(client) => client.delete_stream(stream_id).await, ClientWrapper::Tcp(client) => client.delete_stream(stream_id).await, ClientWrapper::Quic(client) => client.delete_stream(stream_id).await, + ClientWrapper::TcpSync(client) => client.delete_stream(stream_id).await, } } @@ -78,6 +83,7 @@ impl StreamClient for ClientWrapper { ClientWrapper::Http(client) => client.purge_stream(stream_id).await, ClientWrapper::Tcp(client) => client.purge_stream(stream_id).await, ClientWrapper::Quic(client) => client.purge_stream(stream_id).await, + ClientWrapper::TcpSync(client) => client.purge_stream(stream_id).await, } } } diff --git a/core/sdk/src/client_wrappers/binary_system_client.rs b/core/sdk/src/client_wrappers/binary_system_client.rs index 825dbd8e..af69c08f 100644 --- a/core/sdk/src/client_wrappers/binary_system_client.rs +++ b/core/sdk/src/client_wrappers/binary_system_client.rs @@ -32,6 +32,7 @@ impl SystemClient for ClientWrapper { ClientWrapper::Http(client) => client.get_stats().await, ClientWrapper::Tcp(client) => client.get_stats().await, ClientWrapper::Quic(client) => client.get_stats().await, + ClientWrapper::TcpSync(client) => client.get_stats().await, } } @@ -41,6 +42,7 @@ impl SystemClient for ClientWrapper { ClientWrapper::Http(client) => client.get_me().await, ClientWrapper::Tcp(client) => client.get_me().await, ClientWrapper::Quic(client) => client.get_me().await, + ClientWrapper::TcpSync(client) => client.get_me().await, } } @@ -50,6 +52,7 @@ impl SystemClient for ClientWrapper { ClientWrapper::Http(client) => client.get_client(client_id).await, ClientWrapper::Tcp(client) => client.get_client(client_id).await, ClientWrapper::Quic(client) => client.get_client(client_id).await, + ClientWrapper::TcpSync(client) => client.get_client(client_id).await, } } @@ -59,6 +62,7 @@ impl SystemClient for ClientWrapper { ClientWrapper::Http(client) => client.get_clients().await, ClientWrapper::Tcp(client) => client.get_clients().await, ClientWrapper::Quic(client) => client.get_clients().await, + ClientWrapper::TcpSync(client) => client.get_clients().await, } } @@ -68,6 +72,7 @@ impl SystemClient for ClientWrapper { ClientWrapper::Http(client) => client.ping().await, ClientWrapper::Tcp(client) => client.ping().await, ClientWrapper::Quic(client) => client.ping().await, + ClientWrapper::TcpSync(client) => client.ping().await, } } @@ -77,6 +82,7 @@ impl SystemClient for ClientWrapper { ClientWrapper::Http(client) => client.heartbeat_interval().await, ClientWrapper::Tcp(client) => client.heartbeat_interval().await, ClientWrapper::Quic(client) => client.heartbeat_interval().await, + ClientWrapper::TcpSync(client) => client.heartbeat_interval().await, } } @@ -90,6 +96,7 @@ impl SystemClient for ClientWrapper { ClientWrapper::Http(client) => client.snapshot(compression, snapshot_types).await, ClientWrapper::Tcp(client) => client.snapshot(compression, snapshot_types).await, ClientWrapper::Quic(client) => client.snapshot(compression, snapshot_types).await, + ClientWrapper::TcpSync(client) => client.snapshot(compression, snapshot_types).await, } } } diff --git a/core/sdk/src/client_wrappers/binary_topic_client.rs b/core/sdk/src/client_wrappers/binary_topic_client.rs index c9685bdb..ae6e733e 100644 --- a/core/sdk/src/client_wrappers/binary_topic_client.rs +++ b/core/sdk/src/client_wrappers/binary_topic_client.rs @@ -35,6 +35,7 @@ impl TopicClient for ClientWrapper { ClientWrapper::Http(client) => client.get_topic(stream_id, topic_id).await, ClientWrapper::Tcp(client) => client.get_topic(stream_id, topic_id).await, ClientWrapper::Quic(client) => client.get_topic(stream_id, topic_id).await, + ClientWrapper::TcpSync(client) => client.get_topic(stream_id, topic_id).await, } } @@ -44,6 +45,7 @@ impl TopicClient for ClientWrapper { ClientWrapper::Http(client) => client.get_topics(stream_id).await, ClientWrapper::Tcp(client) => client.get_topics(stream_id).await, ClientWrapper::Quic(client) => client.get_topics(stream_id).await, + ClientWrapper::TcpSync(client) => client.get_topics(stream_id).await, } } @@ -115,6 +117,20 @@ impl TopicClient for ClientWrapper { ) .await } + ClientWrapper::TcpSync(client) => { + client + .create_topic( + stream_id, + name, + partitions_count, + compression_algorithm, + replication_factor, + topic_id, + message_expiry, + max_topic_size, + ) + .await + } } } @@ -181,6 +197,19 @@ impl TopicClient for ClientWrapper { ) .await } + ClientWrapper::TcpSync(client) => { + client + .update_topic( + stream_id, + topic_id, + name, + compression_algorithm, + replication_factor, + message_expiry, + max_topic_size, + ) + .await + } } } @@ -194,6 +223,7 @@ impl TopicClient for ClientWrapper { ClientWrapper::Http(client) => client.delete_topic(stream_id, topic_id).await, ClientWrapper::Tcp(client) => client.delete_topic(stream_id, topic_id).await, ClientWrapper::Quic(client) => client.delete_topic(stream_id, topic_id).await, + ClientWrapper::TcpSync(client) => client.delete_topic(stream_id, topic_id).await, } } @@ -207,6 +237,7 @@ impl TopicClient for ClientWrapper { ClientWrapper::Http(client) => client.purge_topic(stream_id, topic_id).await, ClientWrapper::Tcp(client) => client.purge_topic(stream_id, topic_id).await, ClientWrapper::Quic(client) => client.purge_topic(stream_id, topic_id).await, + ClientWrapper::TcpSync(client) => client.purge_topic(stream_id, topic_id).await, } } } diff --git a/core/sdk/src/client_wrappers/binary_user_client.rs b/core/sdk/src/client_wrappers/binary_user_client.rs index f7289c24..d05174f4 100644 --- a/core/sdk/src/client_wrappers/binary_user_client.rs +++ b/core/sdk/src/client_wrappers/binary_user_client.rs @@ -31,6 +31,7 @@ impl UserClient for ClientWrapper { ClientWrapper::Http(client) => client.get_user(user_id).await, ClientWrapper::Tcp(client) => client.get_user(user_id).await, ClientWrapper::Quic(client) => client.get_user(user_id).await, + ClientWrapper::TcpSync(client) => client.get_user(user_id).await, } } @@ -40,6 +41,7 @@ impl UserClient for ClientWrapper { ClientWrapper::Http(client) => client.get_users().await, ClientWrapper::Tcp(client) => client.get_users().await, ClientWrapper::Quic(client) => client.get_users().await, + ClientWrapper::TcpSync(client) => client.get_users().await, } } @@ -71,6 +73,11 @@ impl UserClient for ClientWrapper { .create_user(username, password, status, permissions) .await } + ClientWrapper::TcpSync(client) => { + client + .create_user(username, password, status, permissions) + .await + } } } @@ -80,6 +87,7 @@ impl UserClient for ClientWrapper { ClientWrapper::Tcp(client) => client.delete_user(user_id).await, ClientWrapper::Quic(client) => client.delete_user(user_id).await, ClientWrapper::Iggy(client) => client.delete_user(user_id).await, + ClientWrapper::TcpSync(client) => client.delete_user(user_id).await, } } @@ -94,6 +102,7 @@ impl UserClient for ClientWrapper { ClientWrapper::Tcp(client) => client.update_user(user_id, username, status).await, ClientWrapper::Quic(client) => client.update_user(user_id, username, status).await, ClientWrapper::Iggy(client) => client.update_user(user_id, username, status).await, + ClientWrapper::TcpSync(client) => client.update_user(user_id, username, status).await, } } @@ -107,6 +116,7 @@ impl UserClient for ClientWrapper { ClientWrapper::Http(client) => client.update_permissions(user_id, permissions).await, ClientWrapper::Tcp(client) => client.update_permissions(user_id, permissions).await, ClientWrapper::Quic(client) => client.update_permissions(user_id, permissions).await, + ClientWrapper::TcpSync(client) => client.update_permissions(user_id, permissions).await, } } @@ -137,6 +147,11 @@ impl UserClient for ClientWrapper { .change_password(user_id, current_password, new_password) .await } + ClientWrapper::TcpSync(client) => { + client + .change_password(user_id, current_password, new_password) + .await + } } } @@ -146,6 +161,7 @@ impl UserClient for ClientWrapper { ClientWrapper::Http(client) => client.login_user(username, password).await, ClientWrapper::Tcp(client) => client.login_user(username, password).await, ClientWrapper::Quic(client) => client.login_user(username, password).await, + ClientWrapper::TcpSync(client) => client.login_user(username, password).await, } } @@ -155,6 +171,7 @@ impl UserClient for ClientWrapper { ClientWrapper::Http(client) => client.logout_user().await, ClientWrapper::Tcp(client) => client.logout_user().await, ClientWrapper::Quic(client) => client.logout_user().await, + ClientWrapper::TcpSync(client) => client.logout_user().await, } } } diff --git a/core/sdk/src/client_wrappers/client_wrapper.rs b/core/sdk/src/client_wrappers/client_wrapper.rs index d391bfda..12f758f5 100644 --- a/core/sdk/src/client_wrappers/client_wrapper.rs +++ b/core/sdk/src/client_wrappers/client_wrapper.rs @@ -27,5 +27,6 @@ pub enum ClientWrapper { Iggy(IggyClient), Http(HttpClient), Tcp(TcpClient), + TcpSync(Box<dyn iggy_binary_protocol::Client>), Quic(QuicClient), } diff --git a/core/sdk/src/connection/mod.rs b/core/sdk/src/connection/mod.rs new file mode 100644 index 00000000..bfc7a330 --- /dev/null +++ b/core/sdk/src/connection/mod.rs @@ -0,0 +1 @@ +pub mod transport; diff --git a/core/sdk/src/connection/transport.rs b/core/sdk/src/connection/transport.rs new file mode 100644 index 00000000..b2ec7304 --- /dev/null +++ b/core/sdk/src/connection/transport.rs @@ -0,0 +1,178 @@ +use std::io::{Read, Write}; +use std::net::{SocketAddr, TcpStream}; +use std::str::FromStr; +use std::sync::Arc; + +use tracing::error; +use iggy_common::{AutoLogin, IggyDuration, IggyError, TcpClientConfig}; +use rustls::pki_types::pem::PemObject; +use rustls::pki_types::{CertificateDer, ServerName}; +use rustls::{ClientConnection, StreamOwned}; + +pub trait ClientConfig { + fn server_address(&self) -> SocketAddr; + fn auto_login(&self) -> AutoLogin; + fn reconnection_reestablish_after(&self) -> IggyDuration; + fn reconnection_max_retries(&self) -> Option<u32>; + fn reconnection_enabled(&self) -> bool; + fn reconnection_interval(&self) -> IggyDuration; + fn heartbeat_interval(&self) -> IggyDuration; +} + +impl ClientConfig for TcpClientConfig { + fn auto_login(&self) -> AutoLogin { + self.auto_login.clone() + } + + fn heartbeat_interval(&self) -> IggyDuration { + self.heartbeat_interval + } + + fn reconnection_max_retries(&self) -> Option<u32> { + self.reconnection.max_retries + } + + fn reconnection_reestablish_after(&self) -> IggyDuration { + self.reconnection.reestablish_after + } + + fn reconnection_enabled(&self) -> bool { + self.reconnection.enabled + } + + fn reconnection_interval(&self) -> IggyDuration { + self.reconnection.interval + } + + fn server_address(&self) -> SocketAddr { + SocketAddr::from_str(&self.server_address).unwrap() + } +} + +pub trait Transport { + type Stream: Read + Write; + type Config: ClientConfig; + + fn connect( + cfg: Arc<Self::Config>, // TODO maybe should remove arc + server_address: SocketAddr, + ) -> Result<Self::Stream, IggyError>; + + fn shutdown(stream: &mut Self::Stream) -> Result<(), IggyError>; +} + +#[derive(Debug)] +pub struct TcpTransport; + +impl Transport for TcpTransport { + type Stream = TcpStream; + type Config = TcpClientConfig; + + fn connect( + cfg: Arc<Self::Config>, + server_address: SocketAddr, + ) -> Result<Self::Stream, IggyError> { + let stream = TcpStream::connect(server_address).map_err(|e| { + error!("Failed to establish a TCP connection to the server: {e}",); + IggyError::CannotEstablishConnection + })?; + if let Err(e) = stream.set_nodelay(cfg.nodelay) { + error!("Failed to set the nodelay option on the client: {e}, continuing...",); + } + Ok(stream) + } + + fn shutdown(stream: &mut Self::Stream) -> Result<(), IggyError> { + stream.shutdown(std::net::Shutdown::Both).map_err(|e| { + error!( + "Failed to shutdown the TCP connection to the TCP connection: {e}", + ); + IggyError::TcpError + }) + } +} + +#[derive(Debug)] +pub struct TcpTlsTransport; + +impl Transport for TcpTlsTransport { + type Stream = StreamOwned<ClientConnection, TcpStream>; + type Config = TcpClientConfig; + + fn connect( + cfg: Arc<Self::Config>, + server_address: SocketAddr, + ) -> Result<Self::Stream, IggyError> { + let stream = TcpStream::connect(server_address).map_err(|e| { + error!("Failed to establish a TLS connection to the server: {e}",); + IggyError::CannotEstablishConnection + })?; + if let Err(e) = stream.set_nodelay(cfg.nodelay) { + error!("Failed to set the nodelay option on the client: {e}, continuing...",); + } + + let _ = rustls::crypto::aws_lc_rs::default_provider().install_default(); + + let config = if cfg.tls_validate_certificate { + let mut root_cert_store = rustls::RootCertStore::empty(); + if let Some(certificate_path) = cfg.tls_ca_file.clone() { + for cert in + CertificateDer::pem_file_iter(&certificate_path).map_err(|error| { + error!("Failed to read the CA file: {certificate_path}. {error}",); + IggyError::InvalidTlsCertificatePath + })? + { + let certificate = cert.map_err(|error| { + error!( + "Failed to read a certificate from the CA file: {certificate_path}. {error}", + ); + IggyError::InvalidTlsCertificate + })?; + root_cert_store.add(certificate).map_err(|error| { + error!( + "Failed to add a certificate to the root certificate store. {error}", + ); + IggyError::InvalidTlsCertificate + })?; + } + } else { + root_cert_store.extend(webpki_roots::TLS_SERVER_ROOTS.iter().cloned()); + } + + rustls::ClientConfig::builder() + .with_root_certificates(root_cert_store) + .with_no_client_auth() + } else { + use crate::tcp::tcp_tls_verifier::NoServerVerification; + rustls::ClientConfig::builder() + .dangerous() + .with_custom_certificate_verifier(Arc::new(NoServerVerification)) + .with_no_client_auth() + }; + + let tls_domain = cfg.tls_domain.to_owned(); + let domain = ServerName::try_from(tls_domain).map_err(|error| { + error!("Failed to create a server name from the domain. {error}",); + IggyError::InvalidTlsDomain + })?; + + + let conn = ClientConnection::new(Arc::new(config), domain) + .map_err(|e| { + error!("Failed to establish a TCP/TLS connection to the server: {e}",); + IggyError::CannotEstablishConnection + })?; + + Ok(StreamOwned::new(conn, stream)) + } + + fn shutdown(stream: &mut Self::Stream) -> Result<(), IggyError> { + stream.conn.send_close_notify(); + stream.sock.shutdown(std::net::Shutdown::Both).map_err(|e| { + error!( + "Failed to shutdown the TCP/TLS connection to the TCP connection: {e}", + ); + IggyError::TcpError + }) + } +} diff --git a/core/sdk/src/lib.rs b/core/sdk/src/lib.rs index c315fd5e..52e7e118 100644 --- a/core/sdk/src/lib.rs +++ b/core/sdk/src/lib.rs @@ -26,3 +26,5 @@ pub mod prelude; pub mod quic; pub mod stream_builder; pub mod tcp; +pub mod protocol; +pub mod connection; diff --git a/core/sdk/src/prelude.rs b/core/sdk/src/prelude.rs index 1e1ee111..e93112a1 100644 --- a/core/sdk/src/prelude.rs +++ b/core/sdk/src/prelude.rs @@ -45,6 +45,7 @@ pub use crate::stream_builder::IggyStreamConsumer; pub use crate::stream_builder::{IggyProducerConfig, IggyStreamProducer}; pub use crate::stream_builder::{IggyStream, IggyStreamConfig}; pub use crate::tcp::tcp_client::TcpClient; +pub use crate::tcp::tcp_client_sync::{TcpClientSync, TcpClientSyncTcp, TcpClientSyncTls}; pub use iggy_binary_protocol::{ Client, ConsumerGroupClient, ConsumerOffsetClient, MessageClient, PartitionClient, PersonalAccessTokenClient, SegmentClient, StreamClient, SystemClient, TopicClient, UserClient, diff --git a/core/sdk/src/protocol/core.rs b/core/sdk/src/protocol/core.rs new file mode 100644 index 00000000..31e30d54 --- /dev/null +++ b/core/sdk/src/protocol/core.rs @@ -0,0 +1,796 @@ +use std::{ + collections::VecDeque, + net::SocketAddr, +}; + +use bytes::{Buf, BufMut, Bytes, BytesMut}; +use iggy_common::{AutoLogin, ClientState, Credentials, IggyDuration, IggyError, IggyErrorDiscriminants, IggyTimestamp}; +use tracing::{debug, error, info, warn}; + +const REQUEST_INITIAL_BYTES_LENGTH: usize = 4; +const REQUEST_HEADER_BYTES: usize = 8; +const RESPONSE_INITIAL_BYTES_LENGTH: usize = 8; +#[derive(Debug)] +pub struct ProtocolCoreConfig { + pub auto_login: AutoLogin, + pub reestablish_after: IggyDuration, + pub max_retries: Option<u32>, + pub reconnection_enabled: bool, + pub reconnection_interval: IggyDuration, +} + +#[derive(Debug)] +pub enum ControlAction { + Connect(SocketAddr), + Wait(IggyDuration), + Authenticate { username: String, password: String }, + Noop, + Error(IggyError), +} + +pub struct TxBuf { + pub header: [u8; 8], + pub payload: Bytes, + pub request_id: u64, +} + +impl TxBuf { + #[inline] + pub fn total_len(&self) -> usize { + REQUEST_HEADER_BYTES + self.payload.len() + } +} + +#[derive(Debug)] +pub struct ProtocolCore { + pub state: ClientState, + config: ProtocolCoreConfig, + last_connect_attempt: Option<IggyTimestamp>, + last_reconnection_attempt: Option<IggyTimestamp>, + pub retry_count: u32, + pub connection_retry_count: u32, + next_request_id: u64, + pending_sends: VecDeque<(u32, Bytes, u64)>, + sent_order: VecDeque<u64>, + auth_pending: bool, + auth_request_id: Option<u64>, + server_address: Option<SocketAddr>, + last_auth_result: Option<Result<(), IggyError>>, + connection_established: bool, +} + +impl ProtocolCore { + pub fn new(config: ProtocolCoreConfig) -> Self { + Self { + state: ClientState::Disconnected, + config, + last_connect_attempt: None, + last_reconnection_attempt: None, + retry_count: 0, + connection_retry_count: 0, + next_request_id: 1, + pending_sends: VecDeque::new(), + sent_order: VecDeque::new(), + auth_pending: false, + auth_request_id: None, + server_address: None, + last_auth_result: None, + connection_established: false, + } + } + + pub fn poll_transmit(&mut self) -> Option<TxBuf> { + if let Some((code, payload, request_id)) = self.pending_sends.pop_front() { + let total_len = (payload.len() + REQUEST_INITIAL_BYTES_LENGTH) as u32; + self.sent_order.push_back(request_id); + + Some(TxBuf { + payload, + header: make_header(total_len, code), + request_id, + }) + } else { + None + } + } + + pub fn send(&mut self, code: u32, payload: Bytes) -> Result<u64, IggyError> { + match self.state { + ClientState::Shutdown => Err(IggyError::ClientShutdown), + ClientState::Disconnected | ClientState::Connecting => Err(IggyError::NotConnected), + ClientState::Connected | ClientState::Authenticating | ClientState::Authenticated => { + Ok(self.queue_send(code, payload)) + } + } + } + + fn queue_send(&mut self, code: u32, payload: Bytes) -> u64 { + let request_id = self.next_request_id; + self.next_request_id += 1; + self.pending_sends.push_back((code, payload, request_id)); + request_id + } + + pub fn process_incoming_with<F: FnMut(u64, u32, Bytes)>( + &mut self, + buf: &mut BytesMut, + mut f: F, + ) { + loop { + if buf.len() < RESPONSE_INITIAL_BYTES_LENGTH { + break; + } + let status = u32::from_le_bytes(buf[..4].try_into().unwrap()); + let length = u32::from_le_bytes(buf[4..8].try_into().unwrap()); + let total = RESPONSE_INITIAL_BYTES_LENGTH + length as usize; + if buf.len() < total { + break; + } + + buf.advance(RESPONSE_INITIAL_BYTES_LENGTH); + let payload = if length <= 1 { + Bytes::new() + } else { + buf.split_to(length as usize).freeze() + }; + + // Handle response status logging + if status != 0 { + self.handle_error_response(status); + } + + if let Some(id) = self.on_response(status) { + f(id, status, payload); + } + } + } + + /// Handle error response logging based on status code + fn handle_error_response(&self, status: u32) { + // TEMP: See https://github.com/apache/iggy/pull/604 for context. + if status == IggyErrorDiscriminants::TopicIdAlreadyExists as u32 + || status == IggyErrorDiscriminants::TopicNameAlreadyExists as u32 + || status == IggyErrorDiscriminants::StreamIdAlreadyExists as u32 + || status == IggyErrorDiscriminants::StreamNameAlreadyExists as u32 + || status == IggyErrorDiscriminants::UserAlreadyExists as u32 + || status == IggyErrorDiscriminants::PersonalAccessTokenAlreadyExists as u32 + || status == IggyErrorDiscriminants::ConsumerGroupIdAlreadyExists as u32 + || status == IggyErrorDiscriminants::ConsumerGroupNameAlreadyExists as u32 + { + debug!( + "Received a server resource already exists response: {} ({})", + status, + IggyError::from_code_as_string(status) + ) + } else { + error!( + "Received an invalid response with status: {} ({}).", + status, + IggyError::from_code_as_string(status), + ); + } + } + + pub fn on_response(&mut self, status: u32) -> Option<u64> { + let request_id = self.sent_order.pop_front()?; + + if Some(request_id) == self.auth_request_id { + if status == 0 { + debug!("Authentication successful"); + self.state = ClientState::Authenticated; + self.auth_pending = false; + self.last_auth_result = Some(Ok(())); + } else { + warn!("Authentication failed with status: {}", status); + self.state = ClientState::Connected; + self.auth_pending = false; + self.last_auth_result = Some(Err(IggyError::Unauthenticated)); + } + self.auth_request_id = None; + } + + Some(request_id) + } + + pub fn poll(&mut self) -> ControlAction { + match self.state { + ClientState::Shutdown => ControlAction::Error(IggyError::ClientShutdown), + ClientState::Disconnected => ControlAction::Noop, + ClientState::Authenticated | ClientState::Authenticating | ClientState::Connected => { + ControlAction::Noop + } + ClientState::Connecting => { + let server_address = match self.server_address { + Some(addr) => addr, + None => return ControlAction::Error(IggyError::ConnectionMissedSocket), + }; + + // Check if reconnection is enabled + if !self.config.reconnection_enabled { + if self.connection_retry_count > 0 { + warn!("Automatic reconnection is disabled."); + return ControlAction::Error(IggyError::CannotEstablishConnection); + } + } + + // Handle timing for reestablish_after (initial connect timing) + if let Some(last_reconnection) = self.last_reconnection_attempt { + let now = IggyTimestamp::now(); + let elapsed = now.as_micros().saturating_sub(last_reconnection.as_micros()); + if elapsed < self.config.reestablish_after.as_micros() { + let remaining = IggyDuration::from(self.config.reestablish_after.as_micros() - elapsed); + info!( + "Trying to connect to the server in: {remaining}", + remaining = remaining.as_human_time_string() + ); + return ControlAction::Wait(remaining); + } + } + + // Handle timing for reconnection interval (between retries) + if let Some(last_attempt) = self.last_connect_attempt { + let now = IggyTimestamp::now(); + let elapsed = now.as_micros().saturating_sub(last_attempt.as_micros()); + if elapsed < self.config.reconnection_interval.as_micros() { + let remaining = IggyDuration::from(self.config.reconnection_interval.as_micros() - elapsed); + return ControlAction::Wait(remaining); + } + } + + // Check retry limits + let unlimited_retries = self.config.max_retries.is_none(); + let max_retries = self.config.max_retries.unwrap_or_default(); + + if !unlimited_retries && self.connection_retry_count >= max_retries { + error!( + "Maximum retry attempts ({}) exceeded for server: {:?}", + max_retries, server_address + ); + return ControlAction::Error(IggyError::MaxRetriesExceeded); + } + + // Increment retry count and log attempt + self.connection_retry_count += 1; + self.last_connect_attempt = Some(IggyTimestamp::now()); + + let max_retries_str = if let Some(max_retries) = self.config.max_retries { + max_retries.to_string() + } else { + "unlimited".to_string() + }; + + if self.connection_retry_count > 1 { + let interval_str = self.config.reconnection_interval.as_human_time_string(); + info!( + "Retrying to connect to server ({}/{max_retries_str}): {server_address:?} in: {interval_str}", + self.connection_retry_count, + max_retries_str = max_retries_str, + server_address = server_address, + interval_str = interval_str + ); + } else { + info!( + "Iggy client is connecting to server: {server_address:?}...", + server_address = server_address + ); + } + + ControlAction::Connect(server_address) + } + } + } + + pub fn desire_connect(&mut self, server_address: SocketAddr) -> Result<(), IggyError> { + match self.state { + ClientState::Shutdown => return Err(IggyError::ClientShutdown), + ClientState::Connecting => return Ok(()), + ClientState::Connected | ClientState::Authenticating | ClientState::Authenticated => { + return Ok(()); + } + _ => { + self.state = ClientState::Connecting; + self.server_address = Some(server_address); + } + } + + Ok(()) + } + + pub fn on_connected(&mut self) -> Result<(), IggyError> { + debug!("Transport connected"); + if self.state != ClientState::Connecting { + return Err(IggyError::IncorrectConnectionState); + } + self.state = ClientState::Connected; + self.connection_established = true; + // Reset retry counters on successful connection + self.retry_count = 0; + self.connection_retry_count = 0; + + match &self.config.auto_login { + AutoLogin::Disabled => { + info!("Automatic sign-in is disabled."); + } + AutoLogin::Enabled(credentials) => { + if !self.auth_pending { + self.state = ClientState::Authenticating; + self.auth_pending = true; + + match credentials { + Credentials::UsernamePassword(username, password) => { + let auth_payload = encode_auth(&username, &password); + let auth_id = self.queue_send(0x0A, auth_payload); + self.auth_request_id = Some(auth_id); + } + Credentials::PersonalAccessToken(token) => { + let auth_payload = encode_pat_auth(token); + let auth_id = self.queue_send(0x0B, auth_payload); // Different command code for PAT + self.auth_request_id = Some(auth_id); + } + } + } + } + } + + Ok(()) + } + + pub fn disconnect(&mut self) { + debug!("Transport disconnected"); + self.state = ClientState::Disconnected; + self.auth_pending = false; + self.auth_request_id = None; + self.sent_order.clear(); + + // Set last reconnection attempt time if we had an established connection + if self.connection_established { + self.last_reconnection_attempt = Some(IggyTimestamp::now()); + self.connection_established = false; + } + } + + pub fn shutdown(&mut self) { + self.state = ClientState::Shutdown; + self.auth_pending = false; + self.auth_request_id = None; + self.sent_order.clear(); + } + + pub fn should_wait_auth(&self) -> bool { + matches!(self.config.auto_login, AutoLogin::Enabled(_)) && self.auth_pending + } + + pub fn take_auth_result(&mut self) -> Option<Result<(), IggyError>> { + self.last_auth_result.take() + } + + /// Check if we should attempt reconnection for the given error + pub fn should_reconnect_for_error(&self, error: &IggyError) -> bool { + if !self.config.reconnection_enabled { + return false; + } + + matches!( + error, + IggyError::Disconnected + | IggyError::EmptyResponse + | IggyError::Unauthenticated + | IggyError::StaleClient + ) + } + + /// Initiate reconnection process + pub fn initiate_reconnection(&mut self, server_address: SocketAddr) -> Result<(), IggyError> { + info!( + "Reconnecting to the server: {server_address:?} by client...", + server_address = server_address + ); + + self.disconnect(); + self.desire_connect(server_address) + } + + /// Get current retry information for logging + pub fn get_retry_info(&self) -> (u32, String, String) { + let max_retries_str = if let Some(max_retries) = self.config.max_retries { + max_retries.to_string() + } else { + "unlimited".to_string() + }; + + let interval_str = self.config.reconnection_interval.as_human_time_string(); + + (self.connection_retry_count, max_retries_str, interval_str) + } +} + +fn encode_auth(username: &str, password: &str) -> Bytes { + let mut buf = BytesMut::new(); + buf.put_u32_le(username.len() as u32); + buf.put_slice(username.as_bytes()); + buf.put_u32_le(password.len() as u32); + buf.put_slice(password.as_bytes()); + buf.freeze() +} + +fn encode_pat_auth(token: &str) -> Bytes { + let mut buf = BytesMut::new(); + buf.put_u32_le(token.len() as u32); + buf.put_slice(token.as_bytes()); + buf.freeze() +} + +fn make_header(total_len: u32, code: u32) -> [u8; 8] { + let mut h = [0u8; 8]; + h[..4].copy_from_slice(&total_len.to_le_bytes()); + h[4..].copy_from_slice(&code.to_le_bytes()); + h +} + +#[cfg(test)] +mod tests { + use super::*; + use std::net::SocketAddr; + use std::str::FromStr; + + fn create_test_config() -> ProtocolCoreConfig { + ProtocolCoreConfig { + auto_login: AutoLogin::Disabled, + reestablish_after: IggyDuration::from_str("5s").unwrap(), + max_retries: Some(3), + reconnection_enabled: true, + reconnection_interval: IggyDuration::from_str("1s").unwrap(), + } + } + + #[test] + fn test_protocol_core_creation() { + let config = create_test_config(); + let core = ProtocolCore::new(config); + + assert_eq!(core.state, ClientState::Disconnected); + assert_eq!(core.retry_count, 0); + assert_eq!(core.connection_retry_count, 0); + assert_eq!(core.next_request_id, 1); + assert!(core.pending_sends.is_empty()); + assert!(core.sent_order.is_empty()); + assert!(!core.auth_pending); + assert!(core.auth_request_id.is_none()); + assert!(core.server_address.is_none()); + assert!(core.last_auth_result.is_none()); + assert!(!core.connection_established); + } + + #[test] + fn test_state_transitions_connecting() { + let config = create_test_config(); + let mut core = ProtocolCore::new(config); + let addr = SocketAddr::from_str("127.0.0.1:8080").unwrap(); + + // Test desire_connect + core.desire_connect(addr).unwrap(); + assert_eq!(core.state, ClientState::Connecting); + assert_eq!(core.server_address, Some(addr)); + } + + #[test] + fn test_on_connected_success() { + let config = create_test_config(); + let mut core = ProtocolCore::new(config); + let addr = SocketAddr::from_str("127.0.0.1:8080").unwrap(); + + core.desire_connect(addr).unwrap(); + core.on_connected().unwrap(); + + assert_eq!(core.state, ClientState::Connected); + assert_eq!(core.retry_count, 0); + assert_eq!(core.connection_retry_count, 0); + assert!(core.connection_established); + } + + #[test] + fn test_on_connected_wrong_state() { + let config = create_test_config(); + let mut core = ProtocolCore::new(config); + + // Try to connect without being in Connecting state + let result = core.on_connected(); + assert!(result.is_err()); + assert_eq!(result.unwrap_err(), IggyError::IncorrectConnectionState); + } + + #[test] + fn test_disconnect() { + let config = create_test_config(); + let mut core = ProtocolCore::new(config); + let addr = SocketAddr::from_str("127.0.0.1:8080").unwrap(); + + core.desire_connect(addr).unwrap(); + core.on_connected().unwrap(); + + core.disconnect(); + + assert_eq!(core.state, ClientState::Disconnected); + assert!(!core.auth_pending); + assert!(core.auth_request_id.is_none()); + assert!(core.sent_order.is_empty()); + assert!(!core.connection_established); + assert!(core.last_reconnection_attempt.is_some()); + } + + #[test] + fn test_shutdown() { + let config = create_test_config(); + let mut core = ProtocolCore::new(config); + let addr = SocketAddr::from_str("127.0.0.1:8080").unwrap(); + + core.desire_connect(addr).unwrap(); + core.on_connected().unwrap(); + + core.shutdown(); + + assert_eq!(core.state, ClientState::Shutdown); + assert!(!core.auth_pending); + assert!(core.auth_request_id.is_none()); + } + + #[test] + fn test_retry_logic_with_limits() { + let config = ProtocolCoreConfig { + auto_login: AutoLogin::Disabled, + reestablish_after: IggyDuration::from_str("0ms").unwrap(), // No wait + max_retries: Some(2), + reconnection_enabled: true, + reconnection_interval: IggyDuration::from_str("0ms").unwrap(), // No wait + }; + let mut core = ProtocolCore::new(config); + let addr = SocketAddr::from_str("127.0.0.1:8080").unwrap(); + + core.desire_connect(addr).unwrap(); + + // First poll should return Connect + let action = core.poll(); + assert!(matches!(action, ControlAction::Connect(_))); + assert_eq!(core.connection_retry_count, 1); + + // Simulate connection failure + core.disconnect(); + core.desire_connect(addr).unwrap(); + + // Second poll should return Connect + let action = core.poll(); + assert!(matches!(action, ControlAction::Connect(_))); + assert_eq!(core.connection_retry_count, 2); + + // Simulate connection failure + core.disconnect(); + core.desire_connect(addr).unwrap(); + + // Third poll should return Error due to max retries exceeded + let action = core.poll(); + assert!(matches!(action, ControlAction::Error(IggyError::MaxRetriesExceeded))); + } + + #[test] + fn test_retry_logic_unlimited() { + let config = ProtocolCoreConfig { + auto_login: AutoLogin::Disabled, + reestablish_after: IggyDuration::from_str("0ms").unwrap(), + max_retries: None, // Unlimited + reconnection_enabled: true, + reconnection_interval: IggyDuration::from_str("0ms").unwrap(), + }; + let mut core = ProtocolCore::new(config); + let addr = SocketAddr::from_str("127.0.0.1:8080").unwrap(); + + core.desire_connect(addr).unwrap(); + + // Should always allow retries + for i in 1..=10 { + let action = core.poll(); + assert!(matches!(action, ControlAction::Connect(_))); + assert_eq!(core.connection_retry_count, i); + + core.disconnect(); + core.desire_connect(addr).unwrap(); + } + } + + #[test] + fn test_reconnection_disabled() { + let config = ProtocolCoreConfig { + auto_login: AutoLogin::Disabled, + reestablish_after: IggyDuration::from_str("0ms").unwrap(), + max_retries: Some(3), + reconnection_enabled: false, + reconnection_interval: IggyDuration::from_str("1s").unwrap(), + }; + let mut core = ProtocolCore::new(config); + let addr = SocketAddr::from_str("127.0.0.1:8080").unwrap(); + + core.desire_connect(addr).unwrap(); + + // First connection attempt should work + let action = core.poll(); + assert!(matches!(action, ControlAction::Connect(_))); + + // Simulate failure and retry + core.disconnect(); + core.desire_connect(addr).unwrap(); + + // Should fail immediately when reconnection is disabled + let action = core.poll(); + assert!(matches!(action, ControlAction::Error(IggyError::CannotEstablishConnection))); + } + + #[test] + fn test_should_reconnect_for_error() { + let config = create_test_config(); + let core = ProtocolCore::new(config); + + // Should reconnect for these errors + assert!(core.should_reconnect_for_error(&IggyError::Disconnected)); + assert!(core.should_reconnect_for_error(&IggyError::EmptyResponse)); + assert!(core.should_reconnect_for_error(&IggyError::Unauthenticated)); + assert!(core.should_reconnect_for_error(&IggyError::StaleClient)); + + // Should not reconnect for other errors + assert!(!core.should_reconnect_for_error(&IggyError::InvalidCommand)); + assert!(!core.should_reconnect_for_error(&IggyError::StreamIdNotFound(1))); + } + + #[test] + fn test_should_reconnect_disabled() { + let config = ProtocolCoreConfig { + auto_login: AutoLogin::Disabled, + reestablish_after: IggyDuration::from_str("5s").unwrap(), + max_retries: Some(3), + reconnection_enabled: false, + reconnection_interval: IggyDuration::from_str("1s").unwrap(), + }; + let core = ProtocolCore::new(config); + + // Should not reconnect when reconnection is disabled + assert!(!core.should_reconnect_for_error(&IggyError::Disconnected)); + assert!(!core.should_reconnect_for_error(&IggyError::EmptyResponse)); + } + + #[test] + fn test_initiate_reconnection() { + let config = create_test_config(); + let mut core = ProtocolCore::new(config); + let addr = SocketAddr::from_str("127.0.0.1:8080").unwrap(); + + // First establish connection + core.desire_connect(addr).unwrap(); + core.on_connected().unwrap(); + + // Initiate reconnection + core.initiate_reconnection(addr).unwrap(); + + assert_eq!(core.state, ClientState::Connecting); + assert_eq!(core.server_address, Some(addr)); + } + + #[test] + fn test_queue_send_and_request_id() { + let config = create_test_config(); + let mut core = ProtocolCore::new(config); + + let payload = Bytes::from("test message"); + let request_id = core.queue_send(0x10, payload.clone()); + + assert_eq!(request_id, 1); + assert_eq!(core.next_request_id, 2); + assert_eq!(core.pending_sends.len(), 1); + + let (code, queued_payload, queued_id) = &core.pending_sends[0]; + assert_eq!(*code, 0x10); + assert_eq!(*queued_payload, payload); + assert_eq!(*queued_id, request_id); + } + + #[test] + fn test_poll_transmit() { + let config = create_test_config(); + let mut core = ProtocolCore::new(config); + + // Queue a message + let payload = Bytes::from("test"); + core.queue_send(0x10, payload); + + // Poll should return the queued message + let tx_buf = core.poll_transmit(); + assert!(tx_buf.is_some()); + + let tx = tx_buf.unwrap(); + // Extract command code from header (last 4 bytes) + let command_code = u32::from_le_bytes(tx.header[4..8].try_into().unwrap()); + assert_eq!(command_code, 0x10); + assert_eq!(tx.payload, Bytes::from("test")); + + // Should be empty after polling + assert!(core.poll_transmit().is_none()); + } + + #[test] + fn test_auth_with_username_password() { + let config = ProtocolCoreConfig { + auto_login: AutoLogin::Enabled(Credentials::UsernamePassword( + "user".to_string(), + "pass".to_string(), + )), + reestablish_after: IggyDuration::from_str("5s").unwrap(), + max_retries: Some(3), + reconnection_enabled: true, + reconnection_interval: IggyDuration::from_str("1s").unwrap(), + }; + let mut core = ProtocolCore::new(config); + let addr = SocketAddr::from_str("127.0.0.1:8080").unwrap(); + + core.desire_connect(addr).unwrap(); + core.on_connected().unwrap(); + + assert_eq!(core.state, ClientState::Authenticating); + assert!(core.auth_pending); + assert!(core.auth_request_id.is_some()); + assert!(!core.pending_sends.is_empty()); + } + + #[test] + fn test_auth_with_pat() { + let config = ProtocolCoreConfig { + auto_login: AutoLogin::Enabled(Credentials::PersonalAccessToken( + "test-token".to_string(), + )), + reestablish_after: IggyDuration::from_str("5s").unwrap(), + max_retries: Some(3), + reconnection_enabled: true, + reconnection_interval: IggyDuration::from_str("1s").unwrap(), + }; + let mut core = ProtocolCore::new(config); + let addr = SocketAddr::from_str("127.0.0.1:8080").unwrap(); + + core.desire_connect(addr).unwrap(); + core.on_connected().unwrap(); + + assert_eq!(core.state, ClientState::Authenticating); + assert!(core.auth_pending); + assert!(core.auth_request_id.is_some()); + assert!(!core.pending_sends.is_empty()); + } + + #[test] + fn test_get_retry_info() { + let config = ProtocolCoreConfig { + auto_login: AutoLogin::Disabled, + reestablish_after: IggyDuration::from_str("5s").unwrap(), + max_retries: Some(5), + reconnection_enabled: true, + reconnection_interval: IggyDuration::from_str("2s").unwrap(), + }; + let mut core = ProtocolCore::new(config); + + core.connection_retry_count = 3; + let (count, max_str, interval_str) = core.get_retry_info(); + + assert_eq!(count, 3); + assert_eq!(max_str, "5"); + assert_eq!(interval_str, "2s"); + } + + #[test] + fn test_get_retry_info_unlimited() { + let config = ProtocolCoreConfig { + auto_login: AutoLogin::Disabled, + reestablish_after: IggyDuration::from_str("5s").unwrap(), + max_retries: None, + reconnection_enabled: true, + reconnection_interval: IggyDuration::from_str("1s").unwrap(), + }; + let core = ProtocolCore::new(config); + + let (_, max_str, _) = core.get_retry_info(); + assert_eq!(max_str, "unlimited"); + } +} diff --git a/core/sdk/src/protocol/mod.rs b/core/sdk/src/protocol/mod.rs new file mode 100644 index 00000000..90d7cc36 --- /dev/null +++ b/core/sdk/src/protocol/mod.rs @@ -0,0 +1 @@ +pub(crate) mod core; diff --git a/core/sdk/src/tcp/mod.rs b/core/sdk/src/tcp/mod.rs index c073ff0e..e98605c4 100644 --- a/core/sdk/src/tcp/mod.rs +++ b/core/sdk/src/tcp/mod.rs @@ -22,3 +22,5 @@ pub(crate) mod tcp_connection_stream_kind; pub(crate) mod tcp_stream; pub(crate) mod tcp_tls_connection_stream; pub(crate) mod tcp_tls_verifier; + +pub mod tcp_client_sync; diff --git a/core/sdk/src/tcp/tcp_client_sync.rs b/core/sdk/src/tcp/tcp_client_sync.rs new file mode 100644 index 00000000..c60cb8f5 --- /dev/null +++ b/core/sdk/src/tcp/tcp_client_sync.rs @@ -0,0 +1,862 @@ +use std::{ + fmt::Debug, + io::{IoSlice, Read, Write}, + mem::MaybeUninit, + net::SocketAddr, + ops::DerefMut, + str::FromStr, + sync::{Arc, Mutex}, +}; + +use async_broadcast::{self, Receiver, Sender, broadcast}; +use async_trait::async_trait; +use bytes::{BufMut, Bytes, BytesMut}; +use iggy_binary_protocol::{BinaryClient, BinaryTransport, Client}; +use iggy_common::{ + AutoLogin, ClientState, Command, ConnectionString, ConnectionStringUtils, + DiagnosticEvent, IggyDuration, IggyError, IggyTimestamp, TcpClientConfig, TcpConnectionStringOptions, + TransportProtocol, +}; +use tracing::{debug, error, trace}; + +use crate::{ + connection::transport::{ClientConfig, Transport, TcpTransport, TcpTlsTransport}, + protocol::core::{ControlAction, ProtocolCore, ProtocolCoreConfig, TxBuf}, +}; + +#[derive(Debug)] +pub struct TcpClientSync<T> +where + T: Transport + Debug, + T::Config: ClientConfig, +{ + pub(crate) stream_factory: Arc<T>, + pub(crate) config: Arc<T::Config>, + inner: Mutex<ProtocolCore>, + stream: Mutex<Option<T::Stream>>, + events: (Sender<DiagnosticEvent>, Receiver<DiagnosticEvent>), + recv_buffer: Mutex<BytesMut>, + client_address: Mutex<Option<SocketAddr>>, + connected_at: Mutex<Option<IggyTimestamp>>, +} + +impl<T> TcpClientSync<T> +where + T: Transport + Debug, + T::Config: ClientConfig, +{ + pub fn create(stream_factory: Arc<T>, config: Arc<T::Config>) -> Result<Self, IggyError> { + let proto_config = ProtocolCoreConfig { + auto_login: config.auto_login(), + reestablish_after: config.reconnection_reestablish_after(), + max_retries: config.reconnection_max_retries(), + reconnection_enabled: config.reconnection_enabled(), + reconnection_interval: config.reconnection_interval(), + }; + let (tx, rx) = broadcast(1000); + + Ok(Self { + stream_factory, + config, + inner: Mutex::new(ProtocolCore::new(proto_config)), + stream: Mutex::new(None), + recv_buffer: Mutex::new(BytesMut::with_capacity(16 * 1024)), + events: (tx, rx), + client_address: Mutex::new(None), + connected_at: Mutex::new(None), + }) + } + + async fn get_client_address_value(&self) -> String { + let client_address = self.client_address.lock().unwrap(); + if let Some(client_address) = &*client_address { + client_address.to_string() + } else { + "unknown".to_string() + } + } +} + +#[async_trait] +impl<T> Client for TcpClientSync<T> +where + T: Transport + Send + Sync + 'static + Debug, + T::Config: Send + Sync + Debug, + T::Stream: Send + Sync + Debug, +{ + async fn connect(&self) -> Result<(), IggyError> { + let address = self.config.server_address(); + let config = self.config.clone(); + + let stream = { + let mut core = self.inner.lock().unwrap(); + let mut recv_buf = self.recv_buffer.lock().unwrap(); + Self::connect(&mut core, address, config, &mut recv_buf)? + }; + + if let Some(stream) = stream { + *self.stream.lock().unwrap() = Some(stream); + + let now = IggyTimestamp::now(); + *self.connected_at.lock().unwrap() = Some(now); + + self.publish_event(DiagnosticEvent::Connected).await; + + let client_address = self.get_client_address_value().await; + debug!("TcpClientSync client: {client_address} has connected to server at: {now}"); + } + + Ok(()) + } + + async fn disconnect(&self) -> Result<(), IggyError> { + if self.get_state().await == ClientState::Disconnected { + return Ok(()); + } + + let client_address = self.get_client_address_value().await; + debug!("TcpClientSync client: {client_address} is disconnecting from server..."); + + // Scope the mutex guards to ensure they're dropped before any await + { + let mut core = self.inner.lock().unwrap(); + core.disconnect(); + *self.stream.lock().unwrap() = None; + } + + self.publish_event(DiagnosticEvent::Disconnected).await; + + let now = IggyTimestamp::now(); + debug!("TcpClientSync client: {client_address} has disconnected from server at: {now}."); + Ok(()) + } + + async fn shutdown(&self) -> Result<(), IggyError> { + if self.get_state().await == ClientState::Shutdown { + return Ok(()); + } + + { + let mut core = self.inner.lock().unwrap(); + let stream = self.stream.lock().unwrap().take(); + if let Some(mut stream) = stream { + if let Err(e) = T::shutdown(&mut stream) { + error!("Failed to shutdown stream gracefully: {}", e); + } + } + core.shutdown(); + } + + self.publish_event(DiagnosticEvent::Shutdown).await; + Ok(()) + } + + async fn subscribe_events(&self) -> async_broadcast::Receiver<iggy_common::DiagnosticEvent> { + self.events.1.clone() + } +} + +#[async_trait] +impl<T> BinaryTransport for TcpClientSync<T> +where + T: Transport + Send + Sync + 'static + Debug, + T::Config: Send + Sync + Debug, + T::Stream: Send + Sync + Debug, +{ + async fn get_state(&self) -> ClientState { + self.inner.lock().unwrap().state + } + + async fn set_state(&self, client_state: ClientState) { + let mut core = self.inner.lock().unwrap(); + core.state = client_state + } + + async fn publish_event(&self, event: DiagnosticEvent) { + if let Err(error) = self.events.0.broadcast(event).await { + error!("Failed to send a TCP diagnostic event: {error}"); + } + } + + fn get_heartbeat_interval(&self) -> IggyDuration { + self.config.heartbeat_interval() + } + + async fn send_with_response<C: Command>(&self, command: &C) -> Result<Bytes, IggyError> { + command.validate()?; + self.send_raw_with_response(command.code(), command.to_bytes()) + .await + } + + async fn send_raw_with_response(&self, code: u32, payload: Bytes) -> Result<Bytes, IggyError> { + let result = self.send_raw(code, payload.clone()).await; + if result.is_ok() { + return result; + } + + let error = result.unwrap_err(); + + // Check if we should attempt reconnection using ProtocolCore logic + let should_reconnect = { + let core = self.inner.lock().unwrap(); + core.should_reconnect_for_error(&error) + }; + + if !should_reconnect { + return Err(error); + } + + // Perform reconnection + let server_address = self.config.server_address(); + { + let mut core = self.inner.lock().unwrap(); + core.initiate_reconnection(server_address)?; + } + + // Attempt to reconnect + self.connect().await?; + self.send_raw(code, payload).await + } +} + +impl<T> BinaryClient for TcpClientSync<T> +where + T: Transport + Send + Sync + 'static + Debug, + T::Config: Send + Sync + Debug, + T::Stream: Send + Sync + Debug, +{ +} + +impl<T> TcpClientSync<T> +where + T: Transport + Debug, +{ + fn connect( + core: &mut ProtocolCore, + address: SocketAddr, + config: Arc<T::Config>, + recv_buf: &mut BytesMut, + ) -> Result<Option<T::Stream>, IggyError> { + let current_state = core.state; + if matches!( + current_state, + ClientState::Connected | ClientState::Authenticating | ClientState::Authenticated + ) { + debug!( + "TcpClientSync: Already connected (state: {:?}), completing waiter immediately", + current_state + ); + return Ok(None); + } + if matches!(current_state, ClientState::Connecting) { + debug!("TcpClientSync: Already connecting, adding to waiters"); + return Ok(None); + } + + core.desire_connect(address).map_err(|e| { + error!("TcpClientSync: desire_connect failed: {}", e.as_string()); + e + })?; + + let mut stream; + loop { + match core.poll() { + ControlAction::Connect(addr) => match T::connect(config.clone(), addr) { + Ok(s) => { + core.on_connected()?; + stream = s; + break; + } + Err(_) => { + core.disconnect(); + return Err(IggyError::CannotEstablishConnection) + } + }, + ControlAction::Wait(duration) => { + std::thread::sleep(duration.get_duration()); + } + ControlAction::Error(err) => return Err(err), + ControlAction::Noop | ControlAction::Authenticate { .. } => return Ok(None), + } + } + debug!("TcpClientSync: Connection established"); + + if !core.should_wait_auth() { + return Ok(Some(stream)); + } + + let tx = core + .poll_transmit() + .ok_or(IggyError::IncorrectConnectionState)?; + write::<T>(&mut stream, tx)?; + + trace!("Sent AUTH, waiting for a response..."); + + let _ = read::<T>(&mut stream, core, recv_buf); + let _ = match core.take_auth_result() { + Some(res) => res, + None => Err(IggyError::IncorrectConnectionState), + }?; + + Ok(Some(stream)) + } + + async fn send_raw(&self, code: u32, payload: Bytes) -> Result<Bytes, IggyError> { + let mut core = self.inner.lock().unwrap(); + let mut stream = self.stream.lock().unwrap(); + let mut recv_buf = self.recv_buffer.lock().unwrap(); + let s = match stream.deref_mut() { + Some(s) => s, + // TODO add error trace + None => return Err(IggyError::IncorrectConnectionState), + }; + + core.send(code, payload)?; + if let Some(tx) = core.poll_transmit() { + write::<T>(s, tx)?; + return read::<T>(s, core.deref_mut(), &mut recv_buf); + } + Ok(Bytes::new()) + } +} + +fn write<T: Transport>(stream: &mut T::Stream, tx: TxBuf) -> Result<(), IggyError> { + let mut off = 0usize; + let total = tx.total_len(); + + while off < total { + let mut slices = [IoSlice::new(&[]), IoSlice::new(&[])]; + let iov = if off < 8 { + slices[0] = IoSlice::new(&tx.header[off..]); + if !tx.payload.is_empty() { + slices[1] = IoSlice::new(&tx.payload); + &slices[..2] + } else { + &slices[..1] + } + } else { + let body_off = off - 8; + slices[0] = IoSlice::new(&tx.payload[body_off..]); + &slices[..1] + }; + + let n = stream + .write_vectored(iov) + .map_err(|_| IggyError::CannotEstablishConnection)?; + if n == 0 { + return Err(IggyError::CannotEstablishConnection); + } + off += n; + } + + stream + .flush() + .map_err(|_| IggyError::CannotEstablishConnection)?; + Ok(()) +} + +fn read<T: Transport>( + stream: &mut T::Stream, + core: &mut ProtocolCore, + recv_buf: &mut BytesMut, +) -> Result<Bytes, IggyError> { + let mut result: Option<Result<Bytes, IggyError>> = None; + loop { + if recv_buf.spare_capacity_mut().is_empty() { + recv_buf.reserve(8192); + } + + let spare: &mut [MaybeUninit<u8>] = recv_buf.spare_capacity_mut(); + + let buf: &mut [u8] = unsafe { &mut *(spare as *mut [MaybeUninit<u8>] as *mut [u8]) }; + + let n = match stream.read(buf) { + Ok(0) => { + core.disconnect(); + return Err(IggyError::CannotEstablishConnection); + } + Ok(n) => n, + Err(e) => { + error!("TcpClientSync: read error: {}", e); + return Err(IggyError::CannotEstablishConnection); + } + }; + + unsafe { + recv_buf.advance_mut(n); + } + + core.process_incoming_with(recv_buf, |_, status: u32, payload| { + if status == 0 { + result = Some(Ok(payload)); + } else { + result = Some(Err(IggyError::from_code(status))); + } + }); + if result.is_some() { + return result.unwrap(); + } + } +} + +// Specific implementations for TCP +impl TcpClientSync<TcpTransport> { + /// Create a new TCP client for the provided server address. + pub fn new( + server_address: &str, + auto_sign_in: AutoLogin, + heartbeat_interval: IggyDuration, + ) -> Result<Self, IggyError> { + Self::create( + Arc::new(TcpTransport), + Arc::new(TcpClientConfig { + heartbeat_interval, + server_address: server_address.to_string(), + auto_login: auto_sign_in, + ..Default::default() + }), + ) + } + + /// Create a new TCP client from the provided connection string. + pub fn from_connection_string(connection_string: &str) -> Result<Self, IggyError> { + if ConnectionStringUtils::parse_protocol(connection_string)? != TransportProtocol::Tcp { + return Err(IggyError::InvalidConnectionString); + } + + Self::create( + Arc::new(TcpTransport), + Arc::new( + ConnectionString::<TcpConnectionStringOptions>::from_str(connection_string)?.into(), + ), + ) + } + + /// Create a new TCP client based on the provided configuration. + pub fn create_tcp(config: Arc<TcpClientConfig>) -> Result<Self, IggyError> { + Self::create(Arc::new(TcpTransport), config) + } +} + +// Type aliases for convenience +pub type TcpClientSyncTcp = TcpClientSync<TcpTransport>; +pub type TcpClientSyncTls = TcpClientSync<TcpTlsTransport>; + +impl TcpClientSyncTls { + /// Create a new TLS TCP client from the provided connection string. + pub fn from_connection_string_tls(connection_string: &str) -> Result<Self, IggyError> { + if ConnectionStringUtils::parse_protocol(connection_string)? != TransportProtocol::Tcp { + return Err(IggyError::InvalidConnectionString); + } + + Self::create( + Arc::new(TcpTlsTransport), + Arc::new( + ConnectionString::<TcpConnectionStringOptions>::from_str(connection_string)?.into(), + ), + ) + } + + /// Create a new TLS TCP client based on the provided configuration. + pub fn create_tcp_tls(config: Arc<TcpClientConfig>) -> Result<Self, IggyError> { + Self::create(Arc::new(TcpTlsTransport), config) + } + + /// Create a new TLS TCP client for the provided server address. + pub fn new_tls( + server_address: &str, + domain: &str, + auto_sign_in: AutoLogin, + heartbeat_interval: IggyDuration, + ) -> Result<Self, IggyError> { + Self::create( + Arc::new(TcpTlsTransport), + Arc::new(TcpClientConfig { + heartbeat_interval, + server_address: server_address.to_string(), + tls_enabled: true, + tls_domain: domain.to_string(), + auto_login: auto_sign_in, + ..Default::default() + }), + ) + } +} + +impl Default for TcpClientSync<TcpTransport> { + fn default() -> Self { + TcpClientSync::create_tcp(Arc::new(TcpClientConfig::default())).unwrap() + } +} + +/// Unit tests for TcpClientSync. +/// Tests connection string parsing, configuration validation, and constructors. +#[cfg(test)] +mod tests { + use super::*; + use iggy_common::Credentials; + + #[test] + fn should_fail_with_empty_connection_string() { + let value = ""; + let tcp_client = TcpClientSync::<TcpTransport>::from_connection_string(value); + assert!(tcp_client.is_err()); + } + + #[test] + fn should_fail_without_username() { + let connection_string_prefix = "iggy+"; + let protocol = TransportProtocol::Tcp; + let server_address = "127.0.0.1"; + let port = "1234"; + let username = ""; + let password = "secret"; + let value = format!( + "{connection_string_prefix}{protocol}://{username}:{password}@{server_address}:{port}" + ); + let tcp_client = TcpClientSync::<TcpTransport>::from_connection_string(&value); + assert!(tcp_client.is_err()); + } + + #[test] + fn should_fail_without_password() { + let connection_string_prefix = "iggy+"; + let protocol = TransportProtocol::Tcp; + let server_address = "127.0.0.1"; + let port = "1234"; + let username = "user"; + let password = ""; + let value = format!( + "{connection_string_prefix}{protocol}://{username}:{password}@{server_address}:{port}" + ); + let tcp_client = TcpClientSync::<TcpTransport>::from_connection_string(&value); + assert!(tcp_client.is_err()); + } + + #[test] + fn should_fail_without_server_address() { + let connection_string_prefix = "iggy+"; + let protocol = TransportProtocol::Tcp; + let server_address = ""; + let port = "1234"; + let username = "user"; + let password = "secret"; + let value = format!( + "{connection_string_prefix}{protocol}://{username}:{password}@{server_address}:{port}" + ); + let tcp_client = TcpClientSync::<TcpTransport>::from_connection_string(&value); + assert!(tcp_client.is_err()); + } + + #[test] + fn should_fail_without_port() { + let connection_string_prefix = "iggy+"; + let protocol = TransportProtocol::Tcp; + let server_address = "127.0.0.1"; + let port = ""; + let username = "user"; + let password = "secret"; + let value = format!( + "{connection_string_prefix}{protocol}://{username}:{password}@{server_address}:{port}" + ); + let tcp_client = TcpClientSync::<TcpTransport>::from_connection_string(&value); + assert!(tcp_client.is_err()); + } + + #[test] + fn should_fail_with_invalid_prefix() { + let connection_string_prefix = "invalid+"; + let protocol = TransportProtocol::Tcp; + let server_address = "127.0.0.1"; + let port = "1234"; + let username = "user"; + let password = "secret"; + let value = format!( + "{connection_string_prefix}{protocol}://{username}:{password}@{server_address}:{port}" + ); + let tcp_client = TcpClientSync::<TcpTransport>::from_connection_string(&value); + assert!(tcp_client.is_err()); + } + + #[test] + fn should_fail_with_unmatch_protocol() { + let connection_string_prefix = "iggy+"; + let protocol = TransportProtocol::Quic; + let server_address = "127.0.0.1"; + let port = "1234"; + let username = "user"; + let password = "secret"; + let value = format!( + "{connection_string_prefix}{protocol}://{username}:{password}@{server_address}:{port}" + ); + let tcp_client = TcpClientSync::<TcpTransport>::from_connection_string(&value); + assert!(tcp_client.is_err()); + } + + #[test] + fn should_succeed_with_default_prefix() { + let default_connection_string_prefix = "iggy://"; + let server_address = "127.0.0.1"; + let port = "1234"; + let username = "user"; + let password = "secret"; + let value = format!( + "{default_connection_string_prefix}{username}:{password}@{server_address}:{port}" + ); + let tcp_client = TcpClientSync::<TcpTransport>::from_connection_string(&value); + assert!(tcp_client.is_ok()); + } + + #[test] + fn should_fail_with_invalid_options() { + let connection_string_prefix = "iggy+"; + let protocol = TransportProtocol::Tcp; + let server_address = "127.0.0.1"; + let port = ""; + let username = "user"; + let password = "secret"; + let value = format!( + "{connection_string_prefix}{protocol}://{username}:{password}@{server_address}:{port}?invalid_option=invalid" + ); + let tcp_client = TcpClientSync::<TcpTransport>::from_connection_string(&value); + assert!(tcp_client.is_err()); + } + + #[test] + fn should_succeed_without_options() { + let connection_string_prefix = "iggy+"; + let protocol = TransportProtocol::Tcp; + let server_address = "127.0.0.1"; + let port = "1234"; + let username = "user"; + let password = "secret"; + let value = format!( + "{connection_string_prefix}{protocol}://{username}:{password}@{server_address}:{port}" + ); + let tcp_client = TcpClientSync::<TcpTransport>::from_connection_string(&value); + assert!(tcp_client.is_ok()); + + let tcp_client_sync = tcp_client.unwrap(); + assert_eq!( + tcp_client_sync.config.server_address, + format!("{server_address}:{port}") + ); + assert_eq!( + tcp_client_sync.config.auto_login, + AutoLogin::Enabled(Credentials::UsernamePassword( + username.to_string(), + password.to_string() + )) + ); + + assert!(!tcp_client_sync.config.tls_enabled); + assert!(tcp_client_sync.config.tls_domain.is_empty()); + assert!(tcp_client_sync.config.tls_ca_file.is_none()); + assert_eq!( + tcp_client_sync.config.heartbeat_interval, + IggyDuration::from_str("5s").unwrap() + ); + + assert!(tcp_client_sync.config.reconnection.enabled); + assert!(tcp_client_sync.config.reconnection.max_retries.is_none()); + assert_eq!( + tcp_client_sync.config.reconnection.interval, + IggyDuration::from_str("1s").unwrap() + ); + assert_eq!( + tcp_client_sync.config.reconnection.reestablish_after, + IggyDuration::from_str("5s").unwrap() + ); + } + + #[test] + fn should_succeed_with_options() { + let connection_string_prefix = "iggy+"; + let protocol = TransportProtocol::Tcp; + let server_address = "127.0.0.1"; + let port = "1234"; + let username = "user"; + let password = "secret"; + let heartbeat_interval = "10s"; + let reconnection_retries = "10"; + let value = format!( + "{connection_string_prefix}{protocol}://{username}:{password}@{server_address}:{port}?heartbeat_interval={heartbeat_interval}&reconnection_retries={reconnection_retries}" + ); + let tcp_client = TcpClientSync::<TcpTransport>::from_connection_string(&value); + assert!(tcp_client.is_ok()); + + let tcp_client_sync = tcp_client.unwrap(); + assert_eq!( + tcp_client_sync.config.server_address, + format!("{server_address}:{port}") + ); + assert_eq!( + tcp_client_sync.config.auto_login, + AutoLogin::Enabled(Credentials::UsernamePassword( + username.to_string(), + password.to_string() + )) + ); + + assert!(!tcp_client_sync.config.tls_enabled); + assert!(tcp_client_sync.config.tls_domain.is_empty()); + assert!(tcp_client_sync.config.tls_ca_file.is_none()); + assert_eq!( + tcp_client_sync.config.heartbeat_interval, + IggyDuration::from_str(heartbeat_interval).unwrap() + ); + + assert!(tcp_client_sync.config.reconnection.enabled); + assert_eq!( + tcp_client_sync.config.reconnection.max_retries.unwrap(), + reconnection_retries.parse::<u32>().unwrap() + ); + assert_eq!( + tcp_client_sync.config.reconnection.interval, + IggyDuration::from_str("1s").unwrap() + ); + assert_eq!( + tcp_client_sync.config.reconnection.reestablish_after, + IggyDuration::from_str("5s").unwrap() + ); + } + + #[test] + fn should_succeed_with_pat() { + let connection_string_prefix = "iggy+"; + let protocol = TransportProtocol::Tcp; + let server_address = "127.0.0.1"; + let port = "1234"; + let pat = "iggypat-1234567890abcdef"; + let value = format!("{connection_string_prefix}{protocol}://{pat}@{server_address}:{port}"); + let tcp_client = TcpClientSync::<TcpTransport>::from_connection_string(&value); + assert!(tcp_client.is_ok()); + + let tcp_client_sync = tcp_client.unwrap(); + assert_eq!( + tcp_client_sync.config.server_address, + format!("{server_address}:{port}") + ); + assert_eq!( + tcp_client_sync.config.auto_login, + AutoLogin::Enabled(Credentials::PersonalAccessToken(pat.to_string())) + ); + + assert!(!tcp_client_sync.config.tls_enabled); + assert!(tcp_client_sync.config.tls_domain.is_empty()); + assert!(tcp_client_sync.config.tls_ca_file.is_none()); + assert_eq!( + tcp_client_sync.config.heartbeat_interval, + IggyDuration::from_str("5s").unwrap() + ); + + assert!(tcp_client_sync.config.reconnection.enabled); + assert!(tcp_client_sync.config.reconnection.max_retries.is_none()); + assert_eq!( + tcp_client_sync.config.reconnection.interval, + IggyDuration::from_str("1s").unwrap() + ); + assert_eq!( + tcp_client_sync.config.reconnection.reestablish_after, + IggyDuration::from_str("5s").unwrap() + ); + } + + #[test] + fn should_create_tcp_client_with_new() { + let server_address = "127.0.0.1:8080"; + let auto_login = AutoLogin::Enabled(Credentials::UsernamePassword( + "user".to_string(), + "pass".to_string(), + )); + let heartbeat_interval = IggyDuration::from_str("10s").unwrap(); + + let tcp_client = TcpClientSync::<TcpTransport>::new( + server_address, + auto_login.clone(), + heartbeat_interval, + ); + assert!(tcp_client.is_ok()); + + let client = tcp_client.unwrap(); + assert_eq!(client.config.server_address, server_address); + assert_eq!(client.config.auto_login, auto_login); + assert_eq!(client.config.heartbeat_interval, heartbeat_interval); + assert!(!client.config.tls_enabled); + } + + #[test] + fn should_create_tls_client_with_new_tls() { + let server_address = "127.0.0.1:8080"; + let domain = "localhost"; + let auto_login = AutoLogin::Enabled(Credentials::UsernamePassword( + "user".to_string(), + "pass".to_string(), + )); + let heartbeat_interval = IggyDuration::from_str("10s").unwrap(); + + let tcp_client = TcpClientSyncTls::new_tls( + server_address, + domain, + auto_login.clone(), + heartbeat_interval, + ); + assert!(tcp_client.is_ok()); + + let client = tcp_client.unwrap(); + assert_eq!(client.config.server_address, server_address); + assert_eq!(client.config.auto_login, auto_login); + assert_eq!(client.config.heartbeat_interval, heartbeat_interval); + assert!(client.config.tls_enabled); + assert_eq!(client.config.tls_domain, domain); + } + + #[test] + fn should_create_tcp_client_with_config() { + let config = Arc::new(TcpClientConfig { + server_address: "127.0.0.1:8080".to_string(), + auto_login: AutoLogin::Enabled(Credentials::PersonalAccessToken("token".to_string())), + tls_enabled: false, + ..Default::default() + }); + + let tcp_client = TcpClientSync::<TcpTransport>::create_tcp(config.clone()); + assert!(tcp_client.is_ok()); + + let client = tcp_client.unwrap(); + assert_eq!(client.config.server_address, config.server_address); + assert_eq!(client.config.auto_login, config.auto_login); + assert!(!client.config.tls_enabled); + } + + #[test] + fn should_create_tls_client_with_config() { + let config = Arc::new(TcpClientConfig { + server_address: "127.0.0.1:8080".to_string(), + auto_login: AutoLogin::Enabled(Credentials::PersonalAccessToken("token".to_string())), + tls_enabled: true, + tls_domain: "localhost".to_string(), + ..Default::default() + }); + + let tcp_client = TcpClientSyncTls::create_tcp_tls(config.clone()); + assert!(tcp_client.is_ok()); + + let client = tcp_client.unwrap(); + assert_eq!(client.config.server_address, config.server_address); + assert_eq!(client.config.auto_login, config.auto_login); + assert!(client.config.tls_enabled); + assert_eq!(client.config.tls_domain, config.tls_domain); + } + + #[test] + fn should_handle_default_client() { + let client = TcpClientSync::<TcpTransport>::default(); + let default_config = TcpClientConfig::default(); + + assert_eq!(client.config.server_address, default_config.server_address); + assert_eq!(client.config.auto_login, default_config.auto_login); + assert_eq!(client.config.heartbeat_interval, default_config.heartbeat_interval); + assert_eq!(client.config.tls_enabled, default_config.tls_enabled); + } +}
