This is an automated email from the ASF dual-hosted git repository.

bashirbekov pushed a commit to branch add-sync-client
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit 031d7de97639cb79a50525042d08b06cc92c1dd5
Author: haze518 <[email protected]>
AuthorDate: Fri Sep 19 07:51:52 2025 +0600

    complete sync client
---
 core/common/src/error/iggy_error.rs                |   6 +
 core/integration/src/lib.rs                        |   1 +
 core/integration/src/tcp_client_sync.rs            |  88 +++
 core/integration/tests/sdk/mod.rs                  |   1 +
 core/integration/tests/sdk/tcp_client_sync.rs      | 274 +++++++
 core/sdk/src/client_wrappers/binary_client.rs      |   4 +
 .../binary_consumer_group_client.rs                |  29 +
 .../binary_consumer_offset_client.rs               |  15 +
 .../src/client_wrappers/binary_message_client.rs   |  23 +
 .../src/client_wrappers/binary_partition_client.rs |  10 +
 .../binary_personal_access_token_client.rs         |   4 +
 .../src/client_wrappers/binary_segment_client.rs   |   5 +
 .../src/client_wrappers/binary_stream_client.rs    |   6 +
 .../src/client_wrappers/binary_system_client.rs    |   7 +
 .../sdk/src/client_wrappers/binary_topic_client.rs |  31 +
 core/sdk/src/client_wrappers/binary_user_client.rs |  17 +
 core/sdk/src/client_wrappers/client_wrapper.rs     |   1 +
 core/sdk/src/connection/mod.rs                     |   1 +
 core/sdk/src/connection/transport.rs               | 178 +++++
 core/sdk/src/lib.rs                                |   2 +
 core/sdk/src/prelude.rs                            |   1 +
 core/sdk/src/protocol/core.rs                      | 796 +++++++++++++++++++
 core/sdk/src/protocol/mod.rs                       |   1 +
 core/sdk/src/tcp/mod.rs                            |   2 +
 core/sdk/src/tcp/tcp_client_sync.rs                | 862 +++++++++++++++++++++
 25 files changed, 2365 insertions(+)

diff --git a/core/common/src/error/iggy_error.rs 
b/core/common/src/error/iggy_error.rs
index cc8a284b..2597e154 100644
--- a/core/common/src/error/iggy_error.rs
+++ b/core/common/src/error/iggy_error.rs
@@ -459,6 +459,12 @@ pub enum IggyError {
     CannotReadIndexPosition = 10011,
     #[error("Cannot read index timestamp")]
     CannotReadIndexTimestamp = 10012,
+    #[error("Max number of retry has exceeded")]
+    MaxRetriesExceeded = 10050,
+    #[error("Connection missed socket")]
+    ConnectionMissedSocket = 10051,
+    #[error("Incorrect connection state")]
+    IncorrectConnectionState = 10052,
 }
 
 impl IggyError {
diff --git a/core/integration/src/lib.rs b/core/integration/src/lib.rs
index 88c51ac1..1b35d43b 100644
--- a/core/integration/src/lib.rs
+++ b/core/integration/src/lib.rs
@@ -24,6 +24,7 @@ pub mod http_client;
 pub mod quic_client;
 #[allow(deprecated)]
 pub mod tcp_client;
+pub mod tcp_client_sync;
 #[allow(deprecated)]
 pub mod test_mcp_server;
 #[allow(deprecated)]
diff --git a/core/integration/src/tcp_client_sync.rs 
b/core/integration/src/tcp_client_sync.rs
new file mode 100644
index 00000000..e885686c
--- /dev/null
+++ b/core/integration/src/tcp_client_sync.rs
@@ -0,0 +1,88 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::test_server::{ClientFactory, Transport};
+use async_trait::async_trait;
+use iggy::prelude::{ClientWrapper, TcpClientConfig, TcpClientSyncTcp, 
TcpClientSyncTls};
+use std::sync::Arc;
+
+#[derive(Debug, Clone, Default)]
+pub struct TcpClientSyncFactory {
+    pub server_addr: String,
+    pub nodelay: bool,
+    pub tls_enabled: bool,
+    pub tls_domain: String,
+    pub tls_ca_file: Option<String>,
+    pub tls_validate_certificate: bool,
+}
+
+#[async_trait]
+impl ClientFactory for TcpClientSyncFactory {
+    async fn create_client(&self) -> ClientWrapper {
+        let config = TcpClientConfig {
+            server_address: self.server_addr.clone(),
+            nodelay: self.nodelay,
+            tls_enabled: self.tls_enabled,
+            tls_domain: self.tls_domain.clone(),
+            tls_ca_file: self.tls_ca_file.clone(),
+            tls_validate_certificate: self.tls_validate_certificate,
+            ..TcpClientConfig::default()
+        };
+
+        let client: Box<dyn iggy_binary_protocol::Client> = if 
self.tls_enabled {
+            
Box::new(TcpClientSyncTls::create_tcp_tls(Arc::new(config)).unwrap_or_else(|e| {
+                panic!(
+                    "Failed to create TcpClientSyncTls, iggy-server has 
address {}, error: {:?}",
+                    self.server_addr, e
+                )
+            }))
+        } else {
+            
Box::new(TcpClientSyncTcp::create_tcp(Arc::new(config)).unwrap_or_else(|e| {
+                panic!(
+                    "Failed to create TcpClientSyncTcp, iggy-server has 
address {}, error: {:?}",
+                    self.server_addr, e
+                )
+            }))
+        };
+
+        // Connect the client
+        client.connect().await.unwrap_or_else(|e| {
+            if self.tls_enabled {
+                panic!(
+                    "Failed to connect to iggy-server at {} with TLS enabled, 
error: {:?}\\n\\\n                    Hint: Make sure the server is started 
with TLS enabled and self-signed certificate:\\n\\\n                    
IGGY_TCP_TLS_ENABLED=true IGGY_TCP_TLS_SELF_SIGNED=true\\n\n                    
or start iggy-bench with relevant tcp tls arguments: --tls --tls-domain 
<domain> --tls-ca-file <ca_file>\\n",
+                    self.server_addr, e
+                )
+            } else {
+                panic!(
+                    "Failed to connect to iggy-server at {}, error: {:?}",
+                    self.server_addr, e
+                )
+            }
+        });
+
+        ClientWrapper::TcpSync(client)
+    }
+
+    fn transport(&self) -> Transport {
+        Transport::Tcp
+    }
+
+    fn server_addr(&self) -> String {
+        self.server_addr.clone()
+    }
+}
\ No newline at end of file
diff --git a/core/integration/tests/sdk/mod.rs 
b/core/integration/tests/sdk/mod.rs
index 6d94bfa6..26796a62 100644
--- a/core/integration/tests/sdk/mod.rs
+++ b/core/integration/tests/sdk/mod.rs
@@ -17,3 +17,4 @@
  */
 
 mod producer;
+mod tcp_client_sync;
diff --git a/core/integration/tests/sdk/tcp_client_sync.rs 
b/core/integration/tests/sdk/tcp_client_sync.rs
new file mode 100644
index 00000000..7c79d65a
--- /dev/null
+++ b/core/integration/tests/sdk/tcp_client_sync.rs
@@ -0,0 +1,274 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use std::sync::Arc;
+
+use iggy::prelude::*;
+use integration::test_server::{TestServer, IpAddrKind, ClientFactory};
+use integration::tcp_client_sync::TcpClientSyncFactory;
+use serial_test::serial;
+use bytes::Bytes;
+
+#[tokio::test]
+#[serial]
+async fn should_connect_and_ping_tcp_sync_client() {
+    let mut test_server = TestServer::default();
+    test_server.start();
+
+    let tcp_client_config = TcpClientConfig {
+        server_address: test_server.get_raw_tcp_addr().unwrap(),
+        ..TcpClientConfig::default()
+    };
+    let client = 
ClientWrapper::TcpSync(Box::new(TcpClientSyncTcp::create_tcp(Arc::new(tcp_client_config)).unwrap()));
+    let client = IggyClient::create(client, None, None);
+
+    client.connect().await.unwrap();
+    assert!(client.ping().await.is_ok(), "Failed to ping server");
+    
+    // let factory = TcpClientSyncFactory {
+    //     server_addr: test_server.get_raw_tcp_addr().unwrap(),
+    //     nodelay: false,
+    //     tls_enabled: false,
+    //     tls_domain: "localhost".to_string(),
+    //     tls_ca_file: None,
+    //     tls_validate_certificate: false,
+    // };
+    
+    // let client_wrapper = factory.create_client().await;
+    
+    // Test direct ping on the wrapper (which should already be connected)
+    // match &client_wrapper {
+    //     ClientWrapper::TcpSync(sync_client) => {
+    //         sync_client.ping().await.unwrap();
+            
+    //         // Test login as root user
+    //         sync_client
+    //             .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD)
+    //             .await
+    //             .unwrap();
+    //     }
+    //     _ => panic!("Expected TcpSync client"),
+    // }
+    
+    test_server.stop();
+}
+
+#[tokio::test]
+#[serial]
+async fn should_perform_basic_streaming_operations() {
+    let mut server = TestServer::new(None, true, None, IpAddrKind::V4);
+    server.start();
+    
+    let factory = TcpClientSyncFactory {
+        server_addr: server.get_raw_tcp_addr().unwrap(),
+        nodelay: false,
+        tls_enabled: false,
+        tls_domain: "localhost".to_string(),
+        tls_ca_file: None,
+        tls_validate_certificate: false,
+    };
+    
+    let client_wrapper = factory.create_client().await;
+    let client = IggyClient::create(client_wrapper, None, None);
+    
+    // Connect first
+    client.connect().await.unwrap();
+    
+    // Login as root
+    client
+        .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD)
+        .await
+        .unwrap();
+    
+    let stream_id = 1;
+    let topic_id = 1;
+    let stream_name = "test-stream";
+    let topic_name = "test-topic";
+    
+    // Create stream
+    let stream = client
+        .create_stream(stream_name, Some(stream_id))
+        .await
+        .unwrap();
+    assert_eq!(stream.id, stream_id);
+    assert_eq!(stream.name, stream_name);
+    
+    // Create topic
+    let topic = client
+        .create_topic(
+            &Identifier::numeric(stream_id).unwrap(),
+            topic_name,
+            1, // partitions_count
+            Default::default(), // compression_algorithm
+            None, // replication_factor
+            Some(topic_id),
+            IggyExpiry::NeverExpire, // message_expiry
+            MaxTopicSize::ServerDefault, // max_topic_size
+        )
+        .await
+        .unwrap();
+    assert_eq!(topic.id, topic_id);
+    assert_eq!(topic.name, topic_name);
+    
+    // Send a message
+    let payload = Bytes::from("Hello, World!");
+    let mut message = IggyMessage::builder()
+        .payload(payload.clone())
+        .build()
+        .unwrap();
+    client
+        .send_messages(
+            &Identifier::numeric(stream_id).unwrap(),
+            &Identifier::numeric(topic_id).unwrap(),
+            &Partitioning::partition_id(1),
+            &mut [message],
+        )
+        .await
+        .unwrap();
+    
+    // Poll the message
+    let consumer = Consumer::default();
+    let polled = client
+        .poll_messages(
+            &Identifier::numeric(stream_id).unwrap(),
+            &Identifier::numeric(topic_id).unwrap(),
+            Some(1), // partition_id
+            &consumer,
+            &PollingStrategy::next(),
+            1, // count
+            false, // auto_commit
+        )
+        .await
+        .unwrap();
+    
+    assert_eq!(polled.messages.len(), 1);
+    assert_eq!(
+        polled.messages[0].payload,
+        payload
+    );
+    
+    server.stop();
+}
+
+#[tokio::test] 
+#[serial]
+async fn should_handle_authentication() {
+    let mut server = TestServer::new(None, true, None, IpAddrKind::V4);
+    server.start();
+    
+    let factory = TcpClientSyncFactory {
+        server_addr: server.get_raw_tcp_addr().unwrap(),
+        nodelay: false,
+        tls_enabled: false,
+        tls_domain: "localhost".to_string(),
+        tls_ca_file: None,
+        tls_validate_certificate: false,
+    };
+    
+    let client_wrapper = factory.create_client().await;
+    let client = IggyClient::create(client_wrapper, None, None);
+    
+    // Connect first
+    client.connect().await.unwrap();
+    
+    // Test login as root user
+    let identity = client
+        .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD)
+        .await
+        .unwrap();
+    
+    assert_eq!(identity.user_id, DEFAULT_ROOT_USER_ID);
+    
+    // Test getting current user info
+    let me = client.get_me().await.unwrap();
+    assert_eq!(me.user_id, Some(DEFAULT_ROOT_USER_ID));
+    
+    // Test logout
+    client.logout_user().await.unwrap();
+    
+    server.stop();
+}
+
+#[tokio::test]
+#[serial] 
+async fn should_handle_connection_lifecycle() {
+    let mut server = TestServer::new(None, true, None, IpAddrKind::V4);
+    server.start();
+    
+    let factory = TcpClientSyncFactory {
+        server_addr: server.get_raw_tcp_addr().unwrap(),
+        nodelay: false,
+        tls_enabled: false,
+        tls_domain: "localhost".to_string(),
+        tls_ca_file: None,
+        tls_validate_certificate: false,
+    };
+    
+    let client_wrapper = factory.create_client().await;
+    let client = IggyClient::create(client_wrapper, None, None);
+    
+    // Connect first
+    client.connect().await.unwrap();
+    
+    // Test basic ping after connection
+    client.ping().await.unwrap();
+    
+    // Test getting server stats
+    let stats = client.get_stats().await.unwrap();
+    assert!(stats.clients_count > 0);
+    
+    server.stop();
+}
+
+#[tokio::test]
+#[serial]
+async fn should_connect_with_tls() {
+    let mut server = TestServer::new(
+        Some([("IGGY_TCP_TLS_ENABLED".to_string(), 
"true".to_string())].into()),
+        true,
+        None,
+        IpAddrKind::V4,
+    );
+    server.start();
+    
+    let factory = TcpClientSyncFactory {
+        server_addr: server.get_raw_tcp_addr().unwrap(),
+        nodelay: false,
+        tls_enabled: true,
+        tls_domain: "localhost".to_string(),
+        tls_ca_file: None,
+        tls_validate_certificate: false,
+    };
+    
+    let client_wrapper = factory.create_client().await;
+    let client = IggyClient::create(client_wrapper, None, None);
+
+    // Connect first
+    client.connect().await.unwrap();
+    
+    // Test basic ping with TLS
+    client.ping().await.unwrap();
+    
+    // Test login with TLS
+    client
+        .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD)
+        .await
+        .unwrap();
+    
+    server.stop();
+}
\ No newline at end of file
diff --git a/core/sdk/src/client_wrappers/binary_client.rs 
b/core/sdk/src/client_wrappers/binary_client.rs
index 41d5744e..757d16a7 100644
--- a/core/sdk/src/client_wrappers/binary_client.rs
+++ b/core/sdk/src/client_wrappers/binary_client.rs
@@ -29,6 +29,7 @@ impl Client for ClientWrapper {
             ClientWrapper::Iggy(client) => client.connect().await,
             ClientWrapper::Http(client) => client.connect().await,
             ClientWrapper::Tcp(client) => client.connect().await,
+            ClientWrapper::TcpSync(client) => client.connect().await,
             ClientWrapper::Quic(client) => client.connect().await,
         }
     }
@@ -38,6 +39,7 @@ impl Client for ClientWrapper {
             ClientWrapper::Iggy(client) => client.disconnect().await,
             ClientWrapper::Http(client) => client.disconnect().await,
             ClientWrapper::Tcp(client) => client.disconnect().await,
+            ClientWrapper::TcpSync(client) => client.disconnect().await,
             ClientWrapper::Quic(client) => client.disconnect().await,
         }
     }
@@ -47,6 +49,7 @@ impl Client for ClientWrapper {
             ClientWrapper::Iggy(client) => client.shutdown().await,
             ClientWrapper::Http(client) => client.shutdown().await,
             ClientWrapper::Tcp(client) => client.shutdown().await,
+            ClientWrapper::TcpSync(client) => client.shutdown().await,
             ClientWrapper::Quic(client) => client.shutdown().await,
         }
     }
@@ -56,6 +59,7 @@ impl Client for ClientWrapper {
             ClientWrapper::Iggy(client) => client.subscribe_events().await,
             ClientWrapper::Http(client) => client.subscribe_events().await,
             ClientWrapper::Tcp(client) => client.subscribe_events().await,
+            ClientWrapper::TcpSync(client) => client.subscribe_events().await,
             ClientWrapper::Quic(client) => client.subscribe_events().await,
         }
     }
diff --git a/core/sdk/src/client_wrappers/binary_consumer_group_client.rs 
b/core/sdk/src/client_wrappers/binary_consumer_group_client.rs
index 18117588..976be37c 100644
--- a/core/sdk/src/client_wrappers/binary_consumer_group_client.rs
+++ b/core/sdk/src/client_wrappers/binary_consumer_group_client.rs
@@ -46,6 +46,11 @@ impl ConsumerGroupClient for ClientWrapper {
                     .get_consumer_group(stream_id, topic_id, group_id)
                     .await
             }
+            ClientWrapper::TcpSync(client) => {
+                client
+                    .get_consumer_group(stream_id, topic_id, group_id)
+                    .await
+            }
             ClientWrapper::Quic(client) => {
                 client
                     .get_consumer_group(stream_id, topic_id, group_id)
@@ -63,6 +68,7 @@ impl ConsumerGroupClient for ClientWrapper {
             ClientWrapper::Iggy(client) => 
client.get_consumer_groups(stream_id, topic_id).await,
             ClientWrapper::Http(client) => 
client.get_consumer_groups(stream_id, topic_id).await,
             ClientWrapper::Tcp(client) => 
client.get_consumer_groups(stream_id, topic_id).await,
+            ClientWrapper::TcpSync(client) => 
client.get_consumer_groups(stream_id, topic_id).await,
             ClientWrapper::Quic(client) => 
client.get_consumer_groups(stream_id, topic_id).await,
         }
     }
@@ -90,6 +96,11 @@ impl ConsumerGroupClient for ClientWrapper {
                     .create_consumer_group(stream_id, topic_id, name, group_id)
                     .await
             }
+            ClientWrapper::TcpSync(client) => {
+                client
+                    .create_consumer_group(stream_id, topic_id, name, group_id)
+                    .await
+            }
             ClientWrapper::Quic(client) => {
                 client
                     .create_consumer_group(stream_id, topic_id, name, group_id)
@@ -120,6 +131,11 @@ impl ConsumerGroupClient for ClientWrapper {
                     .delete_consumer_group(stream_id, topic_id, group_id)
                     .await
             }
+            ClientWrapper::TcpSync(client) => {
+                client
+                    .delete_consumer_group(stream_id, topic_id, group_id)
+                    .await
+            }
             ClientWrapper::Quic(client) => {
                 client
                     .delete_consumer_group(stream_id, topic_id, group_id)
@@ -150,6 +166,11 @@ impl ConsumerGroupClient for ClientWrapper {
                     .join_consumer_group(stream_id, topic_id, group_id)
                     .await
             }
+            ClientWrapper::TcpSync(client) => {
+                client
+                    .join_consumer_group(stream_id, topic_id, group_id)
+                    .await
+            }
             ClientWrapper::Quic(client) => {
                 client
                     .join_consumer_group(stream_id, topic_id, group_id)
@@ -180,6 +201,11 @@ impl ConsumerGroupClient for ClientWrapper {
                     .leave_consumer_group(stream_id, topic_id, group_id)
                     .await
             }
+            ClientWrapper::TcpSync(client) => {
+                client
+                    .leave_consumer_group(stream_id, topic_id, group_id)
+                    .await
+            }
             ClientWrapper::Quic(client) => {
                 client
                     .leave_consumer_group(stream_id, topic_id, group_id)
@@ -202,6 +228,9 @@ impl AsyncDrop for ClientWrapper {
             ClientWrapper::Tcp(client) => {
                 let _ = client.logout_user().await;
             }
+            ClientWrapper::TcpSync(client) => {
+                let _ = client.logout_user().await;
+            }
             ClientWrapper::Quic(client) => {
                 let _ = client.logout_user().await;
             }
diff --git a/core/sdk/src/client_wrappers/binary_consumer_offset_client.rs 
b/core/sdk/src/client_wrappers/binary_consumer_offset_client.rs
index 668f120b..51c6ea2b 100644
--- a/core/sdk/src/client_wrappers/binary_consumer_offset_client.rs
+++ b/core/sdk/src/client_wrappers/binary_consumer_offset_client.rs
@@ -47,6 +47,11 @@ impl ConsumerOffsetClient for ClientWrapper {
                     .store_consumer_offset(consumer, stream_id, topic_id, 
partition_id, offset)
                     .await
             }
+            ClientWrapper::TcpSync(client) => {
+                client
+                    .store_consumer_offset(consumer, stream_id, topic_id, 
partition_id, offset)
+                    .await
+            }
             ClientWrapper::Quic(client) => {
                 client
                     .store_consumer_offset(consumer, stream_id, topic_id, 
partition_id, offset)
@@ -78,6 +83,11 @@ impl ConsumerOffsetClient for ClientWrapper {
                     .get_consumer_offset(consumer, stream_id, topic_id, 
partition_id)
                     .await
             }
+            ClientWrapper::TcpSync(client) => {
+                client
+                    .get_consumer_offset(consumer, stream_id, topic_id, 
partition_id)
+                    .await
+            }
             ClientWrapper::Quic(client) => {
                 client
                     .get_consumer_offset(consumer, stream_id, topic_id, 
partition_id)
@@ -109,6 +119,11 @@ impl ConsumerOffsetClient for ClientWrapper {
                     .delete_consumer_offset(consumer, stream_id, topic_id, 
partition_id)
                     .await
             }
+            ClientWrapper::TcpSync(client) => {
+                client
+                    .delete_consumer_offset(consumer, stream_id, topic_id, 
partition_id)
+                    .await
+            }
             ClientWrapper::Quic(client) => {
                 client
                     .delete_consumer_offset(consumer, stream_id, topic_id, 
partition_id)
diff --git a/core/sdk/src/client_wrappers/binary_message_client.rs 
b/core/sdk/src/client_wrappers/binary_message_client.rs
index 01e0c166..98db2924 100644
--- a/core/sdk/src/client_wrappers/binary_message_client.rs
+++ b/core/sdk/src/client_wrappers/binary_message_client.rs
@@ -75,6 +75,19 @@ impl MessageClient for ClientWrapper {
                     )
                     .await
             }
+            ClientWrapper::TcpSync(client) => {
+                client
+                    .poll_messages(
+                        stream_id,
+                        topic_id,
+                        partition_id,
+                        consumer,
+                        strategy,
+                        count,
+                        auto_commit,
+                    )
+                    .await
+            }
             ClientWrapper::Quic(client) => {
                 client
                     .poll_messages(
@@ -114,6 +127,11 @@ impl MessageClient for ClientWrapper {
                     .send_messages(stream_id, topic_id, partitioning, messages)
                     .await
             }
+            ClientWrapper::TcpSync(client) => {
+                client
+                    .send_messages(stream_id, topic_id, partitioning, messages)
+                    .await
+            }
             ClientWrapper::Quic(client) => {
                 client
                     .send_messages(stream_id, topic_id, partitioning, messages)
@@ -145,6 +163,11 @@ impl MessageClient for ClientWrapper {
                     .flush_unsaved_buffer(stream_id, topic_id, 
partitioning_id, fsync)
                     .await
             }
+            ClientWrapper::TcpSync(client) => {
+                client
+                    .flush_unsaved_buffer(stream_id, topic_id, 
partitioning_id, fsync)
+                    .await
+            }
             ClientWrapper::Quic(client) => {
                 client
                     .flush_unsaved_buffer(stream_id, topic_id, 
partitioning_id, fsync)
diff --git a/core/sdk/src/client_wrappers/binary_partition_client.rs 
b/core/sdk/src/client_wrappers/binary_partition_client.rs
index 26ccad1d..ceee0ad8 100644
--- a/core/sdk/src/client_wrappers/binary_partition_client.rs
+++ b/core/sdk/src/client_wrappers/binary_partition_client.rs
@@ -50,6 +50,11 @@ impl PartitionClient for ClientWrapper {
                     .create_partitions(stream_id, topic_id, partitions_count)
                     .await
             }
+            ClientWrapper::TcpSync(client) => {
+                client
+                    .create_partitions(stream_id, topic_id, partitions_count)
+                    .await
+            }
         }
     }
 
@@ -80,6 +85,11 @@ impl PartitionClient for ClientWrapper {
                     .delete_partitions(stream_id, topic_id, partitions_count)
                     .await
             }
+            ClientWrapper::TcpSync(client) => {
+                client
+                    .delete_partitions(stream_id, topic_id, partitions_count)
+                    .await
+            }
         }
     }
 }
diff --git 
a/core/sdk/src/client_wrappers/binary_personal_access_token_client.rs 
b/core/sdk/src/client_wrappers/binary_personal_access_token_client.rs
index 58c6fbed..98dd829f 100644
--- a/core/sdk/src/client_wrappers/binary_personal_access_token_client.rs
+++ b/core/sdk/src/client_wrappers/binary_personal_access_token_client.rs
@@ -32,6 +32,7 @@ impl PersonalAccessTokenClient for ClientWrapper {
             ClientWrapper::Http(client) => 
client.get_personal_access_tokens().await,
             ClientWrapper::Tcp(client) => 
client.get_personal_access_tokens().await,
             ClientWrapper::Quic(client) => 
client.get_personal_access_tokens().await,
+            ClientWrapper::TcpSync(client) => 
client.get_personal_access_tokens().await,
         }
     }
 
@@ -45,6 +46,7 @@ impl PersonalAccessTokenClient for ClientWrapper {
             ClientWrapper::Http(client) => 
client.create_personal_access_token(name, expiry).await,
             ClientWrapper::Tcp(client) => 
client.create_personal_access_token(name, expiry).await,
             ClientWrapper::Quic(client) => 
client.create_personal_access_token(name, expiry).await,
+            ClientWrapper::TcpSync(client) => 
client.create_personal_access_token(name, expiry).await,
         }
     }
 
@@ -54,6 +56,7 @@ impl PersonalAccessTokenClient for ClientWrapper {
             ClientWrapper::Http(client) => 
client.delete_personal_access_token(name).await,
             ClientWrapper::Tcp(client) => 
client.delete_personal_access_token(name).await,
             ClientWrapper::Quic(client) => 
client.delete_personal_access_token(name).await,
+            ClientWrapper::TcpSync(client) => 
client.delete_personal_access_token(name).await,
         }
     }
 
@@ -66,6 +69,7 @@ impl PersonalAccessTokenClient for ClientWrapper {
             ClientWrapper::Http(client) => 
client.login_with_personal_access_token(token).await,
             ClientWrapper::Tcp(client) => 
client.login_with_personal_access_token(token).await,
             ClientWrapper::Quic(client) => 
client.login_with_personal_access_token(token).await,
+            ClientWrapper::TcpSync(client) => 
client.login_with_personal_access_token(token).await,
         }
     }
 }
diff --git a/core/sdk/src/client_wrappers/binary_segment_client.rs 
b/core/sdk/src/client_wrappers/binary_segment_client.rs
index 0ec082ff..53845df0 100644
--- a/core/sdk/src/client_wrappers/binary_segment_client.rs
+++ b/core/sdk/src/client_wrappers/binary_segment_client.rs
@@ -51,6 +51,11 @@ impl SegmentClient for ClientWrapper {
                     .delete_segments(stream_id, topic_id, partition_id, 
segments_count)
                     .await
             }
+            ClientWrapper::TcpSync(client) => {
+                client
+                    .delete_segments(stream_id, topic_id, partition_id, 
segments_count)
+                    .await
+            }
         }
     }
 }
diff --git a/core/sdk/src/client_wrappers/binary_stream_client.rs 
b/core/sdk/src/client_wrappers/binary_stream_client.rs
index 93d785dd..37ee448c 100644
--- a/core/sdk/src/client_wrappers/binary_stream_client.rs
+++ b/core/sdk/src/client_wrappers/binary_stream_client.rs
@@ -29,6 +29,7 @@ impl StreamClient for ClientWrapper {
             ClientWrapper::Http(client) => client.get_stream(stream_id).await,
             ClientWrapper::Tcp(client) => client.get_stream(stream_id).await,
             ClientWrapper::Quic(client) => client.get_stream(stream_id).await,
+            ClientWrapper::TcpSync(client) => 
client.get_stream(stream_id).await,
         }
     }
 
@@ -38,6 +39,7 @@ impl StreamClient for ClientWrapper {
             ClientWrapper::Http(client) => client.get_streams().await,
             ClientWrapper::Tcp(client) => client.get_streams().await,
             ClientWrapper::Quic(client) => client.get_streams().await,
+            ClientWrapper::TcpSync(client) => client.get_streams().await,
         }
     }
 
@@ -51,6 +53,7 @@ impl StreamClient for ClientWrapper {
             ClientWrapper::Http(client) => client.create_stream(name, 
stream_id).await,
             ClientWrapper::Tcp(client) => client.create_stream(name, 
stream_id).await,
             ClientWrapper::Quic(client) => client.create_stream(name, 
stream_id).await,
+            ClientWrapper::TcpSync(client) => client.create_stream(name, 
stream_id).await,
         }
     }
 
@@ -60,6 +63,7 @@ impl StreamClient for ClientWrapper {
             ClientWrapper::Http(client) => client.update_stream(stream_id, 
name).await,
             ClientWrapper::Tcp(client) => client.update_stream(stream_id, 
name).await,
             ClientWrapper::Quic(client) => client.update_stream(stream_id, 
name).await,
+            ClientWrapper::TcpSync(client) => client.update_stream(stream_id, 
name).await,
         }
     }
 
@@ -69,6 +73,7 @@ impl StreamClient for ClientWrapper {
             ClientWrapper::Http(client) => 
client.delete_stream(stream_id).await,
             ClientWrapper::Tcp(client) => 
client.delete_stream(stream_id).await,
             ClientWrapper::Quic(client) => 
client.delete_stream(stream_id).await,
+            ClientWrapper::TcpSync(client) => 
client.delete_stream(stream_id).await,
         }
     }
 
@@ -78,6 +83,7 @@ impl StreamClient for ClientWrapper {
             ClientWrapper::Http(client) => 
client.purge_stream(stream_id).await,
             ClientWrapper::Tcp(client) => client.purge_stream(stream_id).await,
             ClientWrapper::Quic(client) => 
client.purge_stream(stream_id).await,
+            ClientWrapper::TcpSync(client) => 
client.purge_stream(stream_id).await,
         }
     }
 }
diff --git a/core/sdk/src/client_wrappers/binary_system_client.rs 
b/core/sdk/src/client_wrappers/binary_system_client.rs
index 825dbd8e..af69c08f 100644
--- a/core/sdk/src/client_wrappers/binary_system_client.rs
+++ b/core/sdk/src/client_wrappers/binary_system_client.rs
@@ -32,6 +32,7 @@ impl SystemClient for ClientWrapper {
             ClientWrapper::Http(client) => client.get_stats().await,
             ClientWrapper::Tcp(client) => client.get_stats().await,
             ClientWrapper::Quic(client) => client.get_stats().await,
+            ClientWrapper::TcpSync(client) => client.get_stats().await,
         }
     }
 
@@ -41,6 +42,7 @@ impl SystemClient for ClientWrapper {
             ClientWrapper::Http(client) => client.get_me().await,
             ClientWrapper::Tcp(client) => client.get_me().await,
             ClientWrapper::Quic(client) => client.get_me().await,
+            ClientWrapper::TcpSync(client) => client.get_me().await,
         }
     }
 
@@ -50,6 +52,7 @@ impl SystemClient for ClientWrapper {
             ClientWrapper::Http(client) => client.get_client(client_id).await,
             ClientWrapper::Tcp(client) => client.get_client(client_id).await,
             ClientWrapper::Quic(client) => client.get_client(client_id).await,
+            ClientWrapper::TcpSync(client) => 
client.get_client(client_id).await,
         }
     }
 
@@ -59,6 +62,7 @@ impl SystemClient for ClientWrapper {
             ClientWrapper::Http(client) => client.get_clients().await,
             ClientWrapper::Tcp(client) => client.get_clients().await,
             ClientWrapper::Quic(client) => client.get_clients().await,
+            ClientWrapper::TcpSync(client) => client.get_clients().await,
         }
     }
 
@@ -68,6 +72,7 @@ impl SystemClient for ClientWrapper {
             ClientWrapper::Http(client) => client.ping().await,
             ClientWrapper::Tcp(client) => client.ping().await,
             ClientWrapper::Quic(client) => client.ping().await,
+            ClientWrapper::TcpSync(client) => client.ping().await,
         }
     }
 
@@ -77,6 +82,7 @@ impl SystemClient for ClientWrapper {
             ClientWrapper::Http(client) => client.heartbeat_interval().await,
             ClientWrapper::Tcp(client) => client.heartbeat_interval().await,
             ClientWrapper::Quic(client) => client.heartbeat_interval().await,
+            ClientWrapper::TcpSync(client) => 
client.heartbeat_interval().await,
         }
     }
 
@@ -90,6 +96,7 @@ impl SystemClient for ClientWrapper {
             ClientWrapper::Http(client) => client.snapshot(compression, 
snapshot_types).await,
             ClientWrapper::Tcp(client) => client.snapshot(compression, 
snapshot_types).await,
             ClientWrapper::Quic(client) => client.snapshot(compression, 
snapshot_types).await,
+            ClientWrapper::TcpSync(client) => client.snapshot(compression, 
snapshot_types).await,
         }
     }
 }
diff --git a/core/sdk/src/client_wrappers/binary_topic_client.rs 
b/core/sdk/src/client_wrappers/binary_topic_client.rs
index c9685bdb..ae6e733e 100644
--- a/core/sdk/src/client_wrappers/binary_topic_client.rs
+++ b/core/sdk/src/client_wrappers/binary_topic_client.rs
@@ -35,6 +35,7 @@ impl TopicClient for ClientWrapper {
             ClientWrapper::Http(client) => client.get_topic(stream_id, 
topic_id).await,
             ClientWrapper::Tcp(client) => client.get_topic(stream_id, 
topic_id).await,
             ClientWrapper::Quic(client) => client.get_topic(stream_id, 
topic_id).await,
+            ClientWrapper::TcpSync(client) => client.get_topic(stream_id, 
topic_id).await,
         }
     }
 
@@ -44,6 +45,7 @@ impl TopicClient for ClientWrapper {
             ClientWrapper::Http(client) => client.get_topics(stream_id).await,
             ClientWrapper::Tcp(client) => client.get_topics(stream_id).await,
             ClientWrapper::Quic(client) => client.get_topics(stream_id).await,
+            ClientWrapper::TcpSync(client) => 
client.get_topics(stream_id).await,
         }
     }
 
@@ -115,6 +117,20 @@ impl TopicClient for ClientWrapper {
                     )
                     .await
             }
+            ClientWrapper::TcpSync(client) => {
+                client
+                    .create_topic(
+                        stream_id,
+                        name,
+                        partitions_count,
+                        compression_algorithm,
+                        replication_factor,
+                        topic_id,
+                        message_expiry,
+                        max_topic_size,
+                    )
+                    .await
+            }
         }
     }
 
@@ -181,6 +197,19 @@ impl TopicClient for ClientWrapper {
                     )
                     .await
             }
+            ClientWrapper::TcpSync(client) => {
+                client
+                    .update_topic(
+                        stream_id,
+                        topic_id,
+                        name,
+                        compression_algorithm,
+                        replication_factor,
+                        message_expiry,
+                        max_topic_size,
+                    )
+                    .await
+            }
         }
     }
 
@@ -194,6 +223,7 @@ impl TopicClient for ClientWrapper {
             ClientWrapper::Http(client) => client.delete_topic(stream_id, 
topic_id).await,
             ClientWrapper::Tcp(client) => client.delete_topic(stream_id, 
topic_id).await,
             ClientWrapper::Quic(client) => client.delete_topic(stream_id, 
topic_id).await,
+            ClientWrapper::TcpSync(client) => client.delete_topic(stream_id, 
topic_id).await,
         }
     }
 
@@ -207,6 +237,7 @@ impl TopicClient for ClientWrapper {
             ClientWrapper::Http(client) => client.purge_topic(stream_id, 
topic_id).await,
             ClientWrapper::Tcp(client) => client.purge_topic(stream_id, 
topic_id).await,
             ClientWrapper::Quic(client) => client.purge_topic(stream_id, 
topic_id).await,
+            ClientWrapper::TcpSync(client) => client.purge_topic(stream_id, 
topic_id).await,
         }
     }
 }
diff --git a/core/sdk/src/client_wrappers/binary_user_client.rs 
b/core/sdk/src/client_wrappers/binary_user_client.rs
index f7289c24..d05174f4 100644
--- a/core/sdk/src/client_wrappers/binary_user_client.rs
+++ b/core/sdk/src/client_wrappers/binary_user_client.rs
@@ -31,6 +31,7 @@ impl UserClient for ClientWrapper {
             ClientWrapper::Http(client) => client.get_user(user_id).await,
             ClientWrapper::Tcp(client) => client.get_user(user_id).await,
             ClientWrapper::Quic(client) => client.get_user(user_id).await,
+            ClientWrapper::TcpSync(client) => client.get_user(user_id).await,
         }
     }
 
@@ -40,6 +41,7 @@ impl UserClient for ClientWrapper {
             ClientWrapper::Http(client) => client.get_users().await,
             ClientWrapper::Tcp(client) => client.get_users().await,
             ClientWrapper::Quic(client) => client.get_users().await,
+            ClientWrapper::TcpSync(client) => client.get_users().await,
         }
     }
 
@@ -71,6 +73,11 @@ impl UserClient for ClientWrapper {
                     .create_user(username, password, status, permissions)
                     .await
             }
+            ClientWrapper::TcpSync(client) => {
+                client
+                    .create_user(username, password, status, permissions)
+                    .await
+            }
         }
     }
 
@@ -80,6 +87,7 @@ impl UserClient for ClientWrapper {
             ClientWrapper::Tcp(client) => client.delete_user(user_id).await,
             ClientWrapper::Quic(client) => client.delete_user(user_id).await,
             ClientWrapper::Iggy(client) => client.delete_user(user_id).await,
+            ClientWrapper::TcpSync(client) => 
client.delete_user(user_id).await,
         }
     }
 
@@ -94,6 +102,7 @@ impl UserClient for ClientWrapper {
             ClientWrapper::Tcp(client) => client.update_user(user_id, 
username, status).await,
             ClientWrapper::Quic(client) => client.update_user(user_id, 
username, status).await,
             ClientWrapper::Iggy(client) => client.update_user(user_id, 
username, status).await,
+            ClientWrapper::TcpSync(client) => client.update_user(user_id, 
username, status).await,
         }
     }
 
@@ -107,6 +116,7 @@ impl UserClient for ClientWrapper {
             ClientWrapper::Http(client) => client.update_permissions(user_id, 
permissions).await,
             ClientWrapper::Tcp(client) => client.update_permissions(user_id, 
permissions).await,
             ClientWrapper::Quic(client) => client.update_permissions(user_id, 
permissions).await,
+            ClientWrapper::TcpSync(client) => 
client.update_permissions(user_id, permissions).await,
         }
     }
 
@@ -137,6 +147,11 @@ impl UserClient for ClientWrapper {
                     .change_password(user_id, current_password, new_password)
                     .await
             }
+            ClientWrapper::TcpSync(client) => {
+                client
+                    .change_password(user_id, current_password, new_password)
+                    .await
+            }
         }
     }
 
@@ -146,6 +161,7 @@ impl UserClient for ClientWrapper {
             ClientWrapper::Http(client) => client.login_user(username, 
password).await,
             ClientWrapper::Tcp(client) => client.login_user(username, 
password).await,
             ClientWrapper::Quic(client) => client.login_user(username, 
password).await,
+            ClientWrapper::TcpSync(client) => client.login_user(username, 
password).await,
         }
     }
 
@@ -155,6 +171,7 @@ impl UserClient for ClientWrapper {
             ClientWrapper::Http(client) => client.logout_user().await,
             ClientWrapper::Tcp(client) => client.logout_user().await,
             ClientWrapper::Quic(client) => client.logout_user().await,
+            ClientWrapper::TcpSync(client) => client.logout_user().await,
         }
     }
 }
diff --git a/core/sdk/src/client_wrappers/client_wrapper.rs 
b/core/sdk/src/client_wrappers/client_wrapper.rs
index d391bfda..12f758f5 100644
--- a/core/sdk/src/client_wrappers/client_wrapper.rs
+++ b/core/sdk/src/client_wrappers/client_wrapper.rs
@@ -27,5 +27,6 @@ pub enum ClientWrapper {
     Iggy(IggyClient),
     Http(HttpClient),
     Tcp(TcpClient),
+    TcpSync(Box<dyn iggy_binary_protocol::Client>),
     Quic(QuicClient),
 }
diff --git a/core/sdk/src/connection/mod.rs b/core/sdk/src/connection/mod.rs
new file mode 100644
index 00000000..bfc7a330
--- /dev/null
+++ b/core/sdk/src/connection/mod.rs
@@ -0,0 +1 @@
+pub mod transport;
diff --git a/core/sdk/src/connection/transport.rs 
b/core/sdk/src/connection/transport.rs
new file mode 100644
index 00000000..b2ec7304
--- /dev/null
+++ b/core/sdk/src/connection/transport.rs
@@ -0,0 +1,178 @@
+use std::io::{Read, Write};
+use std::net::{SocketAddr, TcpStream};
+use std::str::FromStr;
+use std::sync::Arc;
+
+use tracing::error;
+use iggy_common::{AutoLogin, IggyDuration, IggyError, TcpClientConfig};
+use rustls::pki_types::pem::PemObject;
+use rustls::pki_types::{CertificateDer, ServerName};
+use rustls::{ClientConnection, StreamOwned};
+
+pub trait ClientConfig {
+    fn server_address(&self) -> SocketAddr;
+    fn auto_login(&self) -> AutoLogin;
+    fn reconnection_reestablish_after(&self) -> IggyDuration;
+    fn reconnection_max_retries(&self) -> Option<u32>;
+    fn reconnection_enabled(&self) -> bool;
+    fn reconnection_interval(&self) -> IggyDuration;
+    fn heartbeat_interval(&self) -> IggyDuration;
+}
+
+impl ClientConfig for TcpClientConfig {
+    fn auto_login(&self) -> AutoLogin {
+        self.auto_login.clone()
+    }
+
+    fn heartbeat_interval(&self) -> IggyDuration {
+        self.heartbeat_interval
+    }
+
+    fn reconnection_max_retries(&self) -> Option<u32> {
+        self.reconnection.max_retries
+    }
+
+    fn reconnection_reestablish_after(&self) -> IggyDuration {
+        self.reconnection.reestablish_after
+    }
+
+    fn reconnection_enabled(&self) -> bool {
+        self.reconnection.enabled
+    }
+
+    fn reconnection_interval(&self) -> IggyDuration {
+        self.reconnection.interval
+    }
+
+    fn server_address(&self) -> SocketAddr {
+        SocketAddr::from_str(&self.server_address).unwrap()
+    }
+}
+
+pub trait Transport {
+    type Stream: Read + Write;
+    type Config: ClientConfig;
+
+    fn connect(
+        cfg: Arc<Self::Config>, // TODO maybe should remove arc
+        server_address: SocketAddr,
+    ) -> Result<Self::Stream, IggyError>;
+    
+    fn shutdown(stream: &mut Self::Stream) -> Result<(), IggyError>;
+}
+
+#[derive(Debug)]
+pub struct TcpTransport;
+
+impl Transport for TcpTransport {
+    type Stream = TcpStream;
+    type Config = TcpClientConfig;
+
+    fn connect(
+            cfg: Arc<Self::Config>,
+            server_address: SocketAddr,
+        ) -> Result<Self::Stream, IggyError> {
+            let stream = TcpStream::connect(server_address).map_err(|e| {
+                error!("Failed to establish a TCP connection to the server: 
{e}",);
+                IggyError::CannotEstablishConnection
+            })?;
+            if let Err(e) = stream.set_nodelay(cfg.nodelay) {
+                error!("Failed to set the nodelay option on the client: {e}, 
continuing...",);
+            }
+            Ok(stream)
+        }
+    
+    fn shutdown(stream: &mut Self::Stream) -> Result<(), IggyError> {
+        stream.shutdown(std::net::Shutdown::Both).map_err(|e| {
+            error!(
+                "Failed to shutdown the TCP connection to the TCP connection: 
{e}",
+            );
+            IggyError::TcpError
+        })
+    }
+}
+
+#[derive(Debug)]
+pub struct TcpTlsTransport;
+
+impl Transport for TcpTlsTransport {
+    type Stream = StreamOwned<ClientConnection, TcpStream>;
+    type Config = TcpClientConfig;
+
+    fn connect(
+            cfg: Arc<Self::Config>,
+            server_address: SocketAddr,
+        ) -> Result<Self::Stream, IggyError> {
+            let stream = TcpStream::connect(server_address).map_err(|e| {
+                error!("Failed to establish a TLS connection to the server: 
{e}",);
+                IggyError::CannotEstablishConnection
+            })?;
+            if let Err(e) = stream.set_nodelay(cfg.nodelay) {
+                error!("Failed to set the nodelay option on the client: {e}, 
continuing...",);
+            }
+
+            let _ = 
rustls::crypto::aws_lc_rs::default_provider().install_default();
+
+            let config = if cfg.tls_validate_certificate {
+                let mut root_cert_store = rustls::RootCertStore::empty();
+                if let Some(certificate_path) = cfg.tls_ca_file.clone() {
+                    for cert in
+                        
CertificateDer::pem_file_iter(&certificate_path).map_err(|error| {
+                            error!("Failed to read the CA file: 
{certificate_path}. {error}",);
+                            IggyError::InvalidTlsCertificatePath
+                        })?
+                    {
+                        let certificate = cert.map_err(|error| {
+                            error!(
+                                "Failed to read a certificate from the CA 
file: {certificate_path}. {error}",
+                            );
+                            IggyError::InvalidTlsCertificate
+                        })?;
+                        root_cert_store.add(certificate).map_err(|error| {
+                            error!(
+                                "Failed to add a certificate to the root 
certificate store. {error}",
+                            );
+                            IggyError::InvalidTlsCertificate
+                        })?;
+                    }
+                } else {
+                    
root_cert_store.extend(webpki_roots::TLS_SERVER_ROOTS.iter().cloned());
+                }
+
+                rustls::ClientConfig::builder()
+                    .with_root_certificates(root_cert_store)
+                    .with_no_client_auth()
+            } else {
+                use crate::tcp::tcp_tls_verifier::NoServerVerification;
+                rustls::ClientConfig::builder()
+                    .dangerous()
+                    
.with_custom_certificate_verifier(Arc::new(NoServerVerification))
+                    .with_no_client_auth()
+            };
+
+            let tls_domain = cfg.tls_domain.to_owned();
+            let domain = ServerName::try_from(tls_domain).map_err(|error| {
+                error!("Failed to create a server name from the domain. 
{error}",);
+                IggyError::InvalidTlsDomain
+            })?;
+            
+
+            let conn = ClientConnection::new(Arc::new(config), domain)
+                .map_err(|e| {
+                    error!("Failed to establish a TCP/TLS connection to the 
server: {e}",);
+                    IggyError::CannotEstablishConnection
+                })?;
+
+            Ok(StreamOwned::new(conn, stream))
+    }
+
+    fn shutdown(stream: &mut Self::Stream) -> Result<(), IggyError> {
+        stream.conn.send_close_notify();
+        stream.sock.shutdown(std::net::Shutdown::Both).map_err(|e| {
+            error!(
+                "Failed to shutdown the TCP/TLS connection to the TCP 
connection: {e}",
+            );
+            IggyError::TcpError
+        })
+    }
+}
diff --git a/core/sdk/src/lib.rs b/core/sdk/src/lib.rs
index c315fd5e..52e7e118 100644
--- a/core/sdk/src/lib.rs
+++ b/core/sdk/src/lib.rs
@@ -26,3 +26,5 @@ pub mod prelude;
 pub mod quic;
 pub mod stream_builder;
 pub mod tcp;
+pub mod protocol;
+pub mod connection;
diff --git a/core/sdk/src/prelude.rs b/core/sdk/src/prelude.rs
index 1e1ee111..e93112a1 100644
--- a/core/sdk/src/prelude.rs
+++ b/core/sdk/src/prelude.rs
@@ -45,6 +45,7 @@ pub use crate::stream_builder::IggyStreamConsumer;
 pub use crate::stream_builder::{IggyProducerConfig, IggyStreamProducer};
 pub use crate::stream_builder::{IggyStream, IggyStreamConfig};
 pub use crate::tcp::tcp_client::TcpClient;
+pub use crate::tcp::tcp_client_sync::{TcpClientSync, TcpClientSyncTcp, 
TcpClientSyncTls};
 pub use iggy_binary_protocol::{
     Client, ConsumerGroupClient, ConsumerOffsetClient, MessageClient, 
PartitionClient,
     PersonalAccessTokenClient, SegmentClient, StreamClient, SystemClient, 
TopicClient, UserClient,
diff --git a/core/sdk/src/protocol/core.rs b/core/sdk/src/protocol/core.rs
new file mode 100644
index 00000000..31e30d54
--- /dev/null
+++ b/core/sdk/src/protocol/core.rs
@@ -0,0 +1,796 @@
+use std::{
+    collections::VecDeque,
+    net::SocketAddr,
+};
+
+use bytes::{Buf, BufMut, Bytes, BytesMut};
+use iggy_common::{AutoLogin, ClientState, Credentials, IggyDuration, 
IggyError, IggyErrorDiscriminants, IggyTimestamp};
+use tracing::{debug, error, info, warn};
+
+const REQUEST_INITIAL_BYTES_LENGTH: usize = 4;
+const REQUEST_HEADER_BYTES: usize = 8;
+const RESPONSE_INITIAL_BYTES_LENGTH: usize = 8;
+#[derive(Debug)]
+pub struct ProtocolCoreConfig {
+    pub auto_login: AutoLogin,
+    pub reestablish_after: IggyDuration,
+    pub max_retries: Option<u32>,
+    pub reconnection_enabled: bool,
+    pub reconnection_interval: IggyDuration,
+}
+
+#[derive(Debug)]
+pub enum ControlAction {
+    Connect(SocketAddr),
+    Wait(IggyDuration),
+    Authenticate { username: String, password: String },
+    Noop,
+    Error(IggyError),
+}
+
+pub struct TxBuf {
+    pub header: [u8; 8],
+    pub payload: Bytes,
+    pub request_id: u64,
+}
+
+impl TxBuf {
+    #[inline]
+    pub fn total_len(&self) -> usize {
+        REQUEST_HEADER_BYTES + self.payload.len()
+    }
+}
+
+#[derive(Debug)]
+pub struct ProtocolCore {
+    pub state: ClientState,
+    config: ProtocolCoreConfig,
+    last_connect_attempt: Option<IggyTimestamp>,
+    last_reconnection_attempt: Option<IggyTimestamp>,
+    pub retry_count: u32,
+    pub connection_retry_count: u32,
+    next_request_id: u64,
+    pending_sends: VecDeque<(u32, Bytes, u64)>,
+    sent_order: VecDeque<u64>,
+    auth_pending: bool,
+    auth_request_id: Option<u64>,
+    server_address: Option<SocketAddr>,
+    last_auth_result: Option<Result<(), IggyError>>,
+    connection_established: bool,
+}
+
+impl ProtocolCore {
+    pub fn new(config: ProtocolCoreConfig) -> Self {
+        Self {
+            state: ClientState::Disconnected,
+            config,
+            last_connect_attempt: None,
+            last_reconnection_attempt: None,
+            retry_count: 0,
+            connection_retry_count: 0,
+            next_request_id: 1,
+            pending_sends: VecDeque::new(),
+            sent_order: VecDeque::new(),
+            auth_pending: false,
+            auth_request_id: None,
+            server_address: None,
+            last_auth_result: None,
+            connection_established: false,
+        }
+    }
+
+    pub fn poll_transmit(&mut self) -> Option<TxBuf> {
+        if let Some((code, payload, request_id)) = 
self.pending_sends.pop_front() {
+            let total_len = (payload.len() + REQUEST_INITIAL_BYTES_LENGTH) as 
u32;
+            self.sent_order.push_back(request_id);
+
+            Some(TxBuf {
+                payload,
+                header: make_header(total_len, code),
+                request_id,
+            })
+        } else {
+            None
+        }
+    }
+
+    pub fn send(&mut self, code: u32, payload: Bytes) -> Result<u64, 
IggyError> {
+        match self.state {
+            ClientState::Shutdown => Err(IggyError::ClientShutdown),
+            ClientState::Disconnected | ClientState::Connecting => 
Err(IggyError::NotConnected),
+            ClientState::Connected | ClientState::Authenticating | 
ClientState::Authenticated => {
+                Ok(self.queue_send(code, payload))
+            }
+        }
+    }
+
+    fn queue_send(&mut self, code: u32, payload: Bytes) -> u64 {
+        let request_id = self.next_request_id;
+        self.next_request_id += 1;
+        self.pending_sends.push_back((code, payload, request_id));
+        request_id
+    }
+
+    pub fn process_incoming_with<F: FnMut(u64, u32, Bytes)>(
+        &mut self,
+        buf: &mut BytesMut,
+        mut f: F,
+    ) {
+        loop {
+            if buf.len() < RESPONSE_INITIAL_BYTES_LENGTH {
+                break;
+            }
+            let status = u32::from_le_bytes(buf[..4].try_into().unwrap());
+            let length = u32::from_le_bytes(buf[4..8].try_into().unwrap());
+            let total = RESPONSE_INITIAL_BYTES_LENGTH + length as usize;
+            if buf.len() < total {
+                break;
+            }
+
+            buf.advance(RESPONSE_INITIAL_BYTES_LENGTH);
+            let payload = if length <= 1 {
+                Bytes::new()
+            } else {
+                buf.split_to(length as usize).freeze()
+            };
+            
+            // Handle response status logging
+            if status != 0 {
+                self.handle_error_response(status);
+            }
+            
+            if let Some(id) = self.on_response(status) {
+                f(id, status, payload);
+            }
+        }
+    }
+
+    /// Handle error response logging based on status code
+    fn handle_error_response(&self, status: u32) {
+        // TEMP: See https://github.com/apache/iggy/pull/604 for context.
+        if status == IggyErrorDiscriminants::TopicIdAlreadyExists as u32
+            || status == IggyErrorDiscriminants::TopicNameAlreadyExists as u32
+            || status == IggyErrorDiscriminants::StreamIdAlreadyExists as u32
+            || status == IggyErrorDiscriminants::StreamNameAlreadyExists as u32
+            || status == IggyErrorDiscriminants::UserAlreadyExists as u32
+            || status == 
IggyErrorDiscriminants::PersonalAccessTokenAlreadyExists as u32
+            || status == IggyErrorDiscriminants::ConsumerGroupIdAlreadyExists 
as u32
+            || status == 
IggyErrorDiscriminants::ConsumerGroupNameAlreadyExists as u32
+        {
+            debug!(
+                "Received a server resource already exists response: {} ({})",
+                status,
+                IggyError::from_code_as_string(status)
+            )
+        } else {
+            error!(
+                "Received an invalid response with status: {} ({}).",
+                status,
+                IggyError::from_code_as_string(status),
+            );
+        }
+    }
+
+    pub fn on_response(&mut self, status: u32) -> Option<u64> {
+        let request_id = self.sent_order.pop_front()?;
+
+        if Some(request_id) == self.auth_request_id {
+            if status == 0 {
+                debug!("Authentication successful");
+                self.state = ClientState::Authenticated;
+                self.auth_pending = false;
+                self.last_auth_result = Some(Ok(()));
+            } else {
+                warn!("Authentication failed with status: {}", status);
+                self.state = ClientState::Connected;
+                self.auth_pending = false;
+                self.last_auth_result = Some(Err(IggyError::Unauthenticated));
+            }
+            self.auth_request_id = None;
+        }
+
+        Some(request_id)
+    }
+
+    pub fn poll(&mut self) -> ControlAction {
+        match self.state {
+            ClientState::Shutdown => 
ControlAction::Error(IggyError::ClientShutdown),
+            ClientState::Disconnected => ControlAction::Noop,
+            ClientState::Authenticated | ClientState::Authenticating | 
ClientState::Connected => {
+                ControlAction::Noop
+            }
+            ClientState::Connecting => {
+                let server_address = match self.server_address {
+                    Some(addr) => addr,
+                    None => return 
ControlAction::Error(IggyError::ConnectionMissedSocket),
+                };
+
+                // Check if reconnection is enabled
+                if !self.config.reconnection_enabled {
+                    if self.connection_retry_count > 0 {
+                        warn!("Automatic reconnection is disabled.");
+                        return 
ControlAction::Error(IggyError::CannotEstablishConnection);
+                    }
+                }
+
+                // Handle timing for reestablish_after (initial connect timing)
+                if let Some(last_reconnection) = 
self.last_reconnection_attempt {
+                    let now = IggyTimestamp::now();
+                    let elapsed = 
now.as_micros().saturating_sub(last_reconnection.as_micros());
+                    if elapsed < self.config.reestablish_after.as_micros() {
+                        let remaining = 
IggyDuration::from(self.config.reestablish_after.as_micros() - elapsed);
+                        info!(
+                            "Trying to connect to the server in: {remaining}",
+                            remaining = remaining.as_human_time_string()
+                        );
+                        return ControlAction::Wait(remaining);
+                    }
+                }
+
+                // Handle timing for reconnection interval (between retries)
+                if let Some(last_attempt) = self.last_connect_attempt {
+                    let now = IggyTimestamp::now();
+                    let elapsed = 
now.as_micros().saturating_sub(last_attempt.as_micros());
+                    if elapsed < self.config.reconnection_interval.as_micros() 
{
+                        let remaining = 
IggyDuration::from(self.config.reconnection_interval.as_micros() - elapsed);
+                        return ControlAction::Wait(remaining);
+                    }
+                }
+
+                // Check retry limits
+                let unlimited_retries = self.config.max_retries.is_none();
+                let max_retries = self.config.max_retries.unwrap_or_default();
+                
+                if !unlimited_retries && self.connection_retry_count >= 
max_retries {
+                    error!(
+                        "Maximum retry attempts ({}) exceeded for server: 
{:?}",
+                        max_retries, server_address
+                    );
+                    return ControlAction::Error(IggyError::MaxRetriesExceeded);
+                }
+
+                // Increment retry count and log attempt
+                self.connection_retry_count += 1;
+                self.last_connect_attempt = Some(IggyTimestamp::now());
+
+                let max_retries_str = if let Some(max_retries) = 
self.config.max_retries {
+                    max_retries.to_string()
+                } else {
+                    "unlimited".to_string()
+                };
+
+                if self.connection_retry_count > 1 {
+                    let interval_str = 
self.config.reconnection_interval.as_human_time_string();
+                    info!(
+                        "Retrying to connect to server ({}/{max_retries_str}): 
{server_address:?} in: {interval_str}",
+                        self.connection_retry_count,
+                        max_retries_str = max_retries_str,
+                        server_address = server_address,
+                        interval_str = interval_str
+                    );
+                } else {
+                    info!(
+                        "Iggy client is connecting to server: 
{server_address:?}...",
+                        server_address = server_address
+                    );
+                }
+
+                ControlAction::Connect(server_address)
+            }
+        }
+    }
+
+    pub fn desire_connect(&mut self, server_address: SocketAddr) -> Result<(), 
IggyError> {
+        match self.state {
+            ClientState::Shutdown => return Err(IggyError::ClientShutdown),
+            ClientState::Connecting => return Ok(()),
+            ClientState::Connected | ClientState::Authenticating | 
ClientState::Authenticated => {
+                return Ok(());
+            }
+            _ => {
+                self.state = ClientState::Connecting;
+                self.server_address = Some(server_address);
+            }
+        }
+
+        Ok(())
+    }
+
+    pub fn on_connected(&mut self) -> Result<(), IggyError> {
+        debug!("Transport connected");
+        if self.state != ClientState::Connecting {
+            return Err(IggyError::IncorrectConnectionState);
+        }
+        self.state = ClientState::Connected;
+        self.connection_established = true;
+        // Reset retry counters on successful connection
+        self.retry_count = 0;
+        self.connection_retry_count = 0;
+
+        match &self.config.auto_login {
+            AutoLogin::Disabled => {
+                info!("Automatic sign-in is disabled.");
+            }
+            AutoLogin::Enabled(credentials) => {
+                if !self.auth_pending {
+                    self.state = ClientState::Authenticating;
+                    self.auth_pending = true;
+
+                    match credentials {
+                        Credentials::UsernamePassword(username, password) => {
+                            let auth_payload = encode_auth(&username, 
&password);
+                            let auth_id = self.queue_send(0x0A, auth_payload);
+                            self.auth_request_id = Some(auth_id);
+                        }
+                        Credentials::PersonalAccessToken(token) => {
+                            let auth_payload = encode_pat_auth(token);
+                            let auth_id = self.queue_send(0x0B, auth_payload); 
// Different command code for PAT
+                            self.auth_request_id = Some(auth_id);
+                        }
+                    }
+                }
+            }
+        }
+
+        Ok(())
+    }
+
+    pub fn disconnect(&mut self) {
+        debug!("Transport disconnected");
+        self.state = ClientState::Disconnected;
+        self.auth_pending = false;
+        self.auth_request_id = None;
+        self.sent_order.clear();
+        
+        // Set last reconnection attempt time if we had an established 
connection
+        if self.connection_established {
+            self.last_reconnection_attempt = Some(IggyTimestamp::now());
+            self.connection_established = false;
+        }
+    }
+
+    pub fn shutdown(&mut self) {
+        self.state = ClientState::Shutdown;
+        self.auth_pending = false;
+        self.auth_request_id = None;
+        self.sent_order.clear();
+    }
+
+    pub fn should_wait_auth(&self) -> bool {
+        matches!(self.config.auto_login, AutoLogin::Enabled(_)) && 
self.auth_pending
+    }
+
+    pub fn take_auth_result(&mut self) -> Option<Result<(), IggyError>> {
+        self.last_auth_result.take()
+    }
+
+    /// Check if we should attempt reconnection for the given error
+    pub fn should_reconnect_for_error(&self, error: &IggyError) -> bool {
+        if !self.config.reconnection_enabled {
+            return false;
+        }
+
+        matches!(
+            error,
+            IggyError::Disconnected
+                | IggyError::EmptyResponse
+                | IggyError::Unauthenticated
+                | IggyError::StaleClient
+        )
+    }
+
+    /// Initiate reconnection process
+    pub fn initiate_reconnection(&mut self, server_address: SocketAddr) -> 
Result<(), IggyError> {
+        info!(
+            "Reconnecting to the server: {server_address:?} by client...",
+            server_address = server_address
+        );
+        
+        self.disconnect();
+        self.desire_connect(server_address)
+    }
+
+    /// Get current retry information for logging
+    pub fn get_retry_info(&self) -> (u32, String, String) {
+        let max_retries_str = if let Some(max_retries) = 
self.config.max_retries {
+            max_retries.to_string()
+        } else {
+            "unlimited".to_string()
+        };
+        
+        let interval_str = 
self.config.reconnection_interval.as_human_time_string();
+        
+        (self.connection_retry_count, max_retries_str, interval_str)
+    }
+}
+
+fn encode_auth(username: &str, password: &str) -> Bytes {
+    let mut buf = BytesMut::new();
+    buf.put_u32_le(username.len() as u32);
+    buf.put_slice(username.as_bytes());
+    buf.put_u32_le(password.len() as u32);
+    buf.put_slice(password.as_bytes());
+    buf.freeze()
+}
+
+fn encode_pat_auth(token: &str) -> Bytes {
+    let mut buf = BytesMut::new();
+    buf.put_u32_le(token.len() as u32);
+    buf.put_slice(token.as_bytes());
+    buf.freeze()
+}
+
+fn make_header(total_len: u32, code: u32) -> [u8; 8] {
+    let mut h = [0u8; 8];
+    h[..4].copy_from_slice(&total_len.to_le_bytes());
+    h[4..].copy_from_slice(&code.to_le_bytes());
+    h
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use std::net::SocketAddr;
+    use std::str::FromStr;
+
+    fn create_test_config() -> ProtocolCoreConfig {
+        ProtocolCoreConfig {
+            auto_login: AutoLogin::Disabled,
+            reestablish_after: IggyDuration::from_str("5s").unwrap(),
+            max_retries: Some(3),
+            reconnection_enabled: true,
+            reconnection_interval: IggyDuration::from_str("1s").unwrap(),
+        }
+    }
+
+    #[test]
+    fn test_protocol_core_creation() {
+        let config = create_test_config();
+        let core = ProtocolCore::new(config);
+        
+        assert_eq!(core.state, ClientState::Disconnected);
+        assert_eq!(core.retry_count, 0);
+        assert_eq!(core.connection_retry_count, 0);
+        assert_eq!(core.next_request_id, 1);
+        assert!(core.pending_sends.is_empty());
+        assert!(core.sent_order.is_empty());
+        assert!(!core.auth_pending);
+        assert!(core.auth_request_id.is_none());
+        assert!(core.server_address.is_none());
+        assert!(core.last_auth_result.is_none());
+        assert!(!core.connection_established);
+    }
+
+    #[test]
+    fn test_state_transitions_connecting() {
+        let config = create_test_config();
+        let mut core = ProtocolCore::new(config);
+        let addr = SocketAddr::from_str("127.0.0.1:8080").unwrap();
+
+        // Test desire_connect
+        core.desire_connect(addr).unwrap();
+        assert_eq!(core.state, ClientState::Connecting);
+        assert_eq!(core.server_address, Some(addr));
+    }
+
+    #[test]
+    fn test_on_connected_success() {
+        let config = create_test_config();
+        let mut core = ProtocolCore::new(config);
+        let addr = SocketAddr::from_str("127.0.0.1:8080").unwrap();
+
+        core.desire_connect(addr).unwrap();
+        core.on_connected().unwrap();
+        
+        assert_eq!(core.state, ClientState::Connected);
+        assert_eq!(core.retry_count, 0);
+        assert_eq!(core.connection_retry_count, 0);
+        assert!(core.connection_established);
+    }
+
+    #[test]
+    fn test_on_connected_wrong_state() {
+        let config = create_test_config();
+        let mut core = ProtocolCore::new(config);
+        
+        // Try to connect without being in Connecting state
+        let result = core.on_connected();
+        assert!(result.is_err());
+        assert_eq!(result.unwrap_err(), IggyError::IncorrectConnectionState);
+    }
+
+    #[test]
+    fn test_disconnect() {
+        let config = create_test_config();
+        let mut core = ProtocolCore::new(config);
+        let addr = SocketAddr::from_str("127.0.0.1:8080").unwrap();
+
+        core.desire_connect(addr).unwrap();
+        core.on_connected().unwrap();
+        
+        core.disconnect();
+        
+        assert_eq!(core.state, ClientState::Disconnected);
+        assert!(!core.auth_pending);
+        assert!(core.auth_request_id.is_none());
+        assert!(core.sent_order.is_empty());
+        assert!(!core.connection_established);
+        assert!(core.last_reconnection_attempt.is_some());
+    }
+
+    #[test]
+    fn test_shutdown() {
+        let config = create_test_config();
+        let mut core = ProtocolCore::new(config);
+        let addr = SocketAddr::from_str("127.0.0.1:8080").unwrap();
+
+        core.desire_connect(addr).unwrap();
+        core.on_connected().unwrap();
+        
+        core.shutdown();
+        
+        assert_eq!(core.state, ClientState::Shutdown);
+        assert!(!core.auth_pending);
+        assert!(core.auth_request_id.is_none());
+    }
+
+    #[test]
+    fn test_retry_logic_with_limits() {
+        let config = ProtocolCoreConfig {
+            auto_login: AutoLogin::Disabled,
+            reestablish_after: IggyDuration::from_str("0ms").unwrap(), // No 
wait
+            max_retries: Some(2),
+            reconnection_enabled: true,
+            reconnection_interval: IggyDuration::from_str("0ms").unwrap(), // 
No wait
+        };
+        let mut core = ProtocolCore::new(config);
+        let addr = SocketAddr::from_str("127.0.0.1:8080").unwrap();
+
+        core.desire_connect(addr).unwrap();
+        
+        // First poll should return Connect
+        let action = core.poll();
+        assert!(matches!(action, ControlAction::Connect(_)));
+        assert_eq!(core.connection_retry_count, 1);
+        
+        // Simulate connection failure
+        core.disconnect();
+        core.desire_connect(addr).unwrap();
+        
+        // Second poll should return Connect
+        let action = core.poll();
+        assert!(matches!(action, ControlAction::Connect(_)));
+        assert_eq!(core.connection_retry_count, 2);
+        
+        // Simulate connection failure
+        core.disconnect();
+        core.desire_connect(addr).unwrap();
+        
+        // Third poll should return Error due to max retries exceeded
+        let action = core.poll();
+        assert!(matches!(action, 
ControlAction::Error(IggyError::MaxRetriesExceeded)));
+    }
+
+    #[test]
+    fn test_retry_logic_unlimited() {
+        let config = ProtocolCoreConfig {
+            auto_login: AutoLogin::Disabled,
+            reestablish_after: IggyDuration::from_str("0ms").unwrap(),
+            max_retries: None, // Unlimited
+            reconnection_enabled: true,
+            reconnection_interval: IggyDuration::from_str("0ms").unwrap(),
+        };
+        let mut core = ProtocolCore::new(config);
+        let addr = SocketAddr::from_str("127.0.0.1:8080").unwrap();
+
+        core.desire_connect(addr).unwrap();
+        
+        // Should always allow retries
+        for i in 1..=10 {
+            let action = core.poll();
+            assert!(matches!(action, ControlAction::Connect(_)));
+            assert_eq!(core.connection_retry_count, i);
+            
+            core.disconnect();
+            core.desire_connect(addr).unwrap();
+        }
+    }
+
+    #[test]
+    fn test_reconnection_disabled() {
+        let config = ProtocolCoreConfig {
+            auto_login: AutoLogin::Disabled,
+            reestablish_after: IggyDuration::from_str("0ms").unwrap(),
+            max_retries: Some(3),
+            reconnection_enabled: false,
+            reconnection_interval: IggyDuration::from_str("1s").unwrap(),
+        };
+        let mut core = ProtocolCore::new(config);
+        let addr = SocketAddr::from_str("127.0.0.1:8080").unwrap();
+
+        core.desire_connect(addr).unwrap();
+        
+        // First connection attempt should work
+        let action = core.poll();
+        assert!(matches!(action, ControlAction::Connect(_)));
+        
+        // Simulate failure and retry
+        core.disconnect();
+        core.desire_connect(addr).unwrap();
+        
+        // Should fail immediately when reconnection is disabled
+        let action = core.poll();
+        assert!(matches!(action, 
ControlAction::Error(IggyError::CannotEstablishConnection)));
+    }
+
+    #[test]
+    fn test_should_reconnect_for_error() {
+        let config = create_test_config();
+        let core = ProtocolCore::new(config);
+        
+        // Should reconnect for these errors
+        assert!(core.should_reconnect_for_error(&IggyError::Disconnected));
+        assert!(core.should_reconnect_for_error(&IggyError::EmptyResponse));
+        assert!(core.should_reconnect_for_error(&IggyError::Unauthenticated));
+        assert!(core.should_reconnect_for_error(&IggyError::StaleClient));
+        
+        // Should not reconnect for other errors
+        assert!(!core.should_reconnect_for_error(&IggyError::InvalidCommand));
+        
assert!(!core.should_reconnect_for_error(&IggyError::StreamIdNotFound(1)));
+    }
+
+    #[test]
+    fn test_should_reconnect_disabled() {
+        let config = ProtocolCoreConfig {
+            auto_login: AutoLogin::Disabled,
+            reestablish_after: IggyDuration::from_str("5s").unwrap(),
+            max_retries: Some(3),
+            reconnection_enabled: false,
+            reconnection_interval: IggyDuration::from_str("1s").unwrap(),
+        };
+        let core = ProtocolCore::new(config);
+        
+        // Should not reconnect when reconnection is disabled
+        assert!(!core.should_reconnect_for_error(&IggyError::Disconnected));
+        assert!(!core.should_reconnect_for_error(&IggyError::EmptyResponse));
+    }
+
+    #[test]
+    fn test_initiate_reconnection() {
+        let config = create_test_config();
+        let mut core = ProtocolCore::new(config);
+        let addr = SocketAddr::from_str("127.0.0.1:8080").unwrap();
+
+        // First establish connection
+        core.desire_connect(addr).unwrap();
+        core.on_connected().unwrap();
+        
+        // Initiate reconnection
+        core.initiate_reconnection(addr).unwrap();
+        
+        assert_eq!(core.state, ClientState::Connecting);
+        assert_eq!(core.server_address, Some(addr));
+    }
+
+    #[test]
+    fn test_queue_send_and_request_id() {
+        let config = create_test_config();
+        let mut core = ProtocolCore::new(config);
+        
+        let payload = Bytes::from("test message");
+        let request_id = core.queue_send(0x10, payload.clone());
+        
+        assert_eq!(request_id, 1);
+        assert_eq!(core.next_request_id, 2);
+        assert_eq!(core.pending_sends.len(), 1);
+        
+        let (code, queued_payload, queued_id) = &core.pending_sends[0];
+        assert_eq!(*code, 0x10);
+        assert_eq!(*queued_payload, payload);
+        assert_eq!(*queued_id, request_id);
+    }
+
+    #[test]
+    fn test_poll_transmit() {
+        let config = create_test_config();
+        let mut core = ProtocolCore::new(config);
+        
+        // Queue a message
+        let payload = Bytes::from("test");
+        core.queue_send(0x10, payload);
+        
+        // Poll should return the queued message
+        let tx_buf = core.poll_transmit();
+        assert!(tx_buf.is_some());
+        
+        let tx = tx_buf.unwrap();
+        // Extract command code from header (last 4 bytes)
+        let command_code = 
u32::from_le_bytes(tx.header[4..8].try_into().unwrap());
+        assert_eq!(command_code, 0x10);
+        assert_eq!(tx.payload, Bytes::from("test"));
+        
+        // Should be empty after polling
+        assert!(core.poll_transmit().is_none());
+    }
+
+    #[test] 
+    fn test_auth_with_username_password() {
+        let config = ProtocolCoreConfig {
+            auto_login: AutoLogin::Enabled(Credentials::UsernamePassword(
+                "user".to_string(),
+                "pass".to_string(),
+            )),
+            reestablish_after: IggyDuration::from_str("5s").unwrap(),
+            max_retries: Some(3),
+            reconnection_enabled: true,
+            reconnection_interval: IggyDuration::from_str("1s").unwrap(),
+        };
+        let mut core = ProtocolCore::new(config);
+        let addr = SocketAddr::from_str("127.0.0.1:8080").unwrap();
+
+        core.desire_connect(addr).unwrap();
+        core.on_connected().unwrap();
+        
+        assert_eq!(core.state, ClientState::Authenticating);
+        assert!(core.auth_pending);
+        assert!(core.auth_request_id.is_some());
+        assert!(!core.pending_sends.is_empty());
+    }
+
+    #[test]
+    fn test_auth_with_pat() {
+        let config = ProtocolCoreConfig {
+            auto_login: AutoLogin::Enabled(Credentials::PersonalAccessToken(
+                "test-token".to_string(),
+            )),
+            reestablish_after: IggyDuration::from_str("5s").unwrap(),
+            max_retries: Some(3),
+            reconnection_enabled: true,
+            reconnection_interval: IggyDuration::from_str("1s").unwrap(),
+        };
+        let mut core = ProtocolCore::new(config);
+        let addr = SocketAddr::from_str("127.0.0.1:8080").unwrap();
+
+        core.desire_connect(addr).unwrap();
+        core.on_connected().unwrap();
+        
+        assert_eq!(core.state, ClientState::Authenticating);
+        assert!(core.auth_pending);
+        assert!(core.auth_request_id.is_some());
+        assert!(!core.pending_sends.is_empty());
+    }
+
+    #[test]
+    fn test_get_retry_info() {
+        let config = ProtocolCoreConfig {
+            auto_login: AutoLogin::Disabled,
+            reestablish_after: IggyDuration::from_str("5s").unwrap(),
+            max_retries: Some(5),
+            reconnection_enabled: true,
+            reconnection_interval: IggyDuration::from_str("2s").unwrap(),
+        };
+        let mut core = ProtocolCore::new(config);
+        
+        core.connection_retry_count = 3;
+        let (count, max_str, interval_str) = core.get_retry_info();
+        
+        assert_eq!(count, 3);
+        assert_eq!(max_str, "5");
+        assert_eq!(interval_str, "2s");
+    }
+
+    #[test]
+    fn test_get_retry_info_unlimited() {
+        let config = ProtocolCoreConfig {
+            auto_login: AutoLogin::Disabled,
+            reestablish_after: IggyDuration::from_str("5s").unwrap(),
+            max_retries: None,
+            reconnection_enabled: true,
+            reconnection_interval: IggyDuration::from_str("1s").unwrap(),
+        };
+        let core = ProtocolCore::new(config);
+        
+        let (_, max_str, _) = core.get_retry_info();
+        assert_eq!(max_str, "unlimited");
+    }
+}
diff --git a/core/sdk/src/protocol/mod.rs b/core/sdk/src/protocol/mod.rs
new file mode 100644
index 00000000..90d7cc36
--- /dev/null
+++ b/core/sdk/src/protocol/mod.rs
@@ -0,0 +1 @@
+pub(crate) mod core;
diff --git a/core/sdk/src/tcp/mod.rs b/core/sdk/src/tcp/mod.rs
index c073ff0e..e98605c4 100644
--- a/core/sdk/src/tcp/mod.rs
+++ b/core/sdk/src/tcp/mod.rs
@@ -22,3 +22,5 @@ pub(crate) mod tcp_connection_stream_kind;
 pub(crate) mod tcp_stream;
 pub(crate) mod tcp_tls_connection_stream;
 pub(crate) mod tcp_tls_verifier;
+
+pub mod tcp_client_sync;
diff --git a/core/sdk/src/tcp/tcp_client_sync.rs 
b/core/sdk/src/tcp/tcp_client_sync.rs
new file mode 100644
index 00000000..c60cb8f5
--- /dev/null
+++ b/core/sdk/src/tcp/tcp_client_sync.rs
@@ -0,0 +1,862 @@
+use std::{
+    fmt::Debug,
+    io::{IoSlice, Read, Write},
+    mem::MaybeUninit,
+    net::SocketAddr,
+    ops::DerefMut,
+    str::FromStr,
+    sync::{Arc, Mutex},
+};
+
+use async_broadcast::{self, Receiver, Sender, broadcast};
+use async_trait::async_trait;
+use bytes::{BufMut, Bytes, BytesMut};
+use iggy_binary_protocol::{BinaryClient, BinaryTransport, Client};
+use iggy_common::{
+    AutoLogin, ClientState, Command, ConnectionString, ConnectionStringUtils, 
+    DiagnosticEvent, IggyDuration, IggyError, IggyTimestamp, TcpClientConfig, 
TcpConnectionStringOptions,
+    TransportProtocol,
+};
+use tracing::{debug, error, trace};
+
+use crate::{
+    connection::transport::{ClientConfig, Transport, TcpTransport, 
TcpTlsTransport},
+    protocol::core::{ControlAction, ProtocolCore, ProtocolCoreConfig, TxBuf},
+};
+
+#[derive(Debug)]
+pub struct TcpClientSync<T>
+where
+    T: Transport + Debug,
+    T::Config: ClientConfig,
+{
+    pub(crate) stream_factory: Arc<T>,
+    pub(crate) config: Arc<T::Config>,
+    inner: Mutex<ProtocolCore>,
+    stream: Mutex<Option<T::Stream>>,
+    events: (Sender<DiagnosticEvent>, Receiver<DiagnosticEvent>),
+    recv_buffer: Mutex<BytesMut>,
+    client_address: Mutex<Option<SocketAddr>>,
+    connected_at: Mutex<Option<IggyTimestamp>>,
+}
+
+impl<T> TcpClientSync<T>
+where
+    T: Transport + Debug,
+    T::Config: ClientConfig,
+{
+    pub fn create(stream_factory: Arc<T>, config: Arc<T::Config>) -> 
Result<Self, IggyError> {
+        let proto_config = ProtocolCoreConfig {
+            auto_login: config.auto_login(),
+            reestablish_after: config.reconnection_reestablish_after(),
+            max_retries: config.reconnection_max_retries(),
+            reconnection_enabled: config.reconnection_enabled(),
+            reconnection_interval: config.reconnection_interval(),
+        };
+        let (tx, rx) = broadcast(1000);
+
+        Ok(Self {
+            stream_factory,
+            config,
+            inner: Mutex::new(ProtocolCore::new(proto_config)),
+            stream: Mutex::new(None),
+            recv_buffer: Mutex::new(BytesMut::with_capacity(16 * 1024)),
+            events: (tx, rx),
+            client_address: Mutex::new(None),
+            connected_at: Mutex::new(None),
+        })
+    }
+
+    async fn get_client_address_value(&self) -> String {
+        let client_address = self.client_address.lock().unwrap();
+        if let Some(client_address) = &*client_address {
+            client_address.to_string()
+        } else {
+            "unknown".to_string()
+        }
+    }
+}
+
+#[async_trait]
+impl<T> Client for TcpClientSync<T>
+where
+    T: Transport + Send + Sync + 'static + Debug,
+    T::Config: Send + Sync + Debug,
+    T::Stream: Send + Sync + Debug,
+{
+    async fn connect(&self) -> Result<(), IggyError> {
+        let address = self.config.server_address();
+        let config = self.config.clone();
+        
+        let stream = {
+            let mut core = self.inner.lock().unwrap();
+            let mut recv_buf = self.recv_buffer.lock().unwrap();
+            Self::connect(&mut core, address, config, &mut recv_buf)?
+        };
+
+        if let Some(stream) = stream {
+            *self.stream.lock().unwrap() = Some(stream);
+            
+            let now = IggyTimestamp::now();
+            *self.connected_at.lock().unwrap() = Some(now);
+            
+            self.publish_event(DiagnosticEvent::Connected).await;
+            
+            let client_address = self.get_client_address_value().await;
+            debug!("TcpClientSync client: {client_address} has connected to 
server at: {now}");
+        }
+
+        Ok(())
+    }
+
+    async fn disconnect(&self) -> Result<(), IggyError> {
+        if self.get_state().await == ClientState::Disconnected {
+            return Ok(());
+        }
+
+        let client_address = self.get_client_address_value().await;
+        debug!("TcpClientSync client: {client_address} is disconnecting from 
server...");
+        
+        // Scope the mutex guards to ensure they're dropped before any await
+        {
+            let mut core = self.inner.lock().unwrap();
+            core.disconnect();
+            *self.stream.lock().unwrap() = None;
+        }
+        
+        self.publish_event(DiagnosticEvent::Disconnected).await;
+        
+        let now = IggyTimestamp::now();
+        debug!("TcpClientSync client: {client_address} has disconnected from 
server at: {now}.");
+        Ok(())
+    }
+
+    async fn shutdown(&self) -> Result<(), IggyError> {
+        if self.get_state().await == ClientState::Shutdown {
+            return Ok(());
+        }
+
+        {
+            let mut core = self.inner.lock().unwrap();
+            let stream = self.stream.lock().unwrap().take();
+            if let Some(mut stream) = stream {
+                if let Err(e) = T::shutdown(&mut stream) {
+                    error!("Failed to shutdown stream gracefully: {}", e);
+                }
+            }
+            core.shutdown();
+        }
+        
+        self.publish_event(DiagnosticEvent::Shutdown).await;
+        Ok(())
+    }
+
+    async fn subscribe_events(&self) -> 
async_broadcast::Receiver<iggy_common::DiagnosticEvent> {
+        self.events.1.clone()
+    }
+}
+
+#[async_trait]
+impl<T> BinaryTransport for TcpClientSync<T>
+where
+    T: Transport + Send + Sync + 'static + Debug,
+    T::Config: Send + Sync + Debug,
+    T::Stream: Send + Sync + Debug,
+{
+    async fn get_state(&self) -> ClientState {
+        self.inner.lock().unwrap().state
+    }
+
+    async fn set_state(&self, client_state: ClientState) {
+        let mut core = self.inner.lock().unwrap();
+        core.state = client_state
+    }
+
+    async fn publish_event(&self, event: DiagnosticEvent) {
+        if let Err(error) = self.events.0.broadcast(event).await {
+            error!("Failed to send a TCP diagnostic event: {error}");
+        }
+    }
+
+    fn get_heartbeat_interval(&self) -> IggyDuration {
+        self.config.heartbeat_interval()
+    }
+
+    async fn send_with_response<C: Command>(&self, command: &C) -> 
Result<Bytes, IggyError> {
+        command.validate()?;
+        self.send_raw_with_response(command.code(), command.to_bytes())
+            .await
+    }
+
+    async fn send_raw_with_response(&self, code: u32, payload: Bytes) -> 
Result<Bytes, IggyError> {
+        let result = self.send_raw(code, payload.clone()).await;
+        if result.is_ok() {
+            return result;
+        }
+
+        let error = result.unwrap_err();
+        
+        // Check if we should attempt reconnection using ProtocolCore logic
+        let should_reconnect = {
+            let core = self.inner.lock().unwrap();
+            core.should_reconnect_for_error(&error)
+        };
+
+        if !should_reconnect {
+            return Err(error);
+        }
+
+        // Perform reconnection
+        let server_address = self.config.server_address();
+        {
+            let mut core = self.inner.lock().unwrap();
+            core.initiate_reconnection(server_address)?;
+        }
+
+        // Attempt to reconnect
+        self.connect().await?;
+        self.send_raw(code, payload).await
+    }
+}
+
+impl<T> BinaryClient for TcpClientSync<T>
+where
+    T: Transport + Send + Sync + 'static + Debug,
+    T::Config: Send + Sync + Debug,
+    T::Stream: Send + Sync + Debug,
+{
+}
+
+impl<T> TcpClientSync<T>
+where
+    T: Transport + Debug,
+{
+    fn connect(
+        core: &mut ProtocolCore,
+        address: SocketAddr,
+        config: Arc<T::Config>,
+        recv_buf: &mut BytesMut,
+    ) -> Result<Option<T::Stream>, IggyError> {
+        let current_state = core.state;
+        if matches!(
+            current_state,
+            ClientState::Connected | ClientState::Authenticating | 
ClientState::Authenticated
+        ) {
+            debug!(
+                "TcpClientSync: Already connected (state: {:?}), completing 
waiter immediately",
+                current_state
+            );
+            return Ok(None);
+        }
+        if matches!(current_state, ClientState::Connecting) {
+            debug!("TcpClientSync: Already connecting, adding to waiters");
+            return Ok(None);
+        }
+
+        core.desire_connect(address).map_err(|e| {
+            error!("TcpClientSync: desire_connect failed: {}", e.as_string());
+            e
+        })?;
+
+        let mut stream;
+        loop {
+            match core.poll() {
+                ControlAction::Connect(addr) => match 
T::connect(config.clone(), addr) {
+                    Ok(s) => {
+                        core.on_connected()?;
+                        stream = s;
+                        break;
+                    }
+                    Err(_) => {
+                        core.disconnect();
+                        return Err(IggyError::CannotEstablishConnection)
+                    }
+                },
+                ControlAction::Wait(duration) => {
+                    std::thread::sleep(duration.get_duration());
+                }
+                ControlAction::Error(err) => return Err(err),
+                ControlAction::Noop | ControlAction::Authenticate { .. } => 
return Ok(None),
+            }
+        }
+        debug!("TcpClientSync: Connection established");
+
+        if !core.should_wait_auth() {
+            return Ok(Some(stream));
+        }
+
+        let tx = core
+            .poll_transmit()
+            .ok_or(IggyError::IncorrectConnectionState)?;
+        write::<T>(&mut stream, tx)?;
+
+        trace!("Sent AUTH, waiting for a response...");
+
+        let _ = read::<T>(&mut stream, core, recv_buf);
+        let _ = match core.take_auth_result() {
+            Some(res) => res,
+            None => Err(IggyError::IncorrectConnectionState),
+        }?;
+
+        Ok(Some(stream))
+    }
+
+    async fn send_raw(&self, code: u32, payload: Bytes) -> Result<Bytes, 
IggyError> {
+        let mut core = self.inner.lock().unwrap();
+        let mut stream = self.stream.lock().unwrap();
+        let mut recv_buf = self.recv_buffer.lock().unwrap();
+        let s = match stream.deref_mut() {
+            Some(s) => s,
+            // TODO add error trace
+            None => return Err(IggyError::IncorrectConnectionState),
+        };
+
+        core.send(code, payload)?;
+        if let Some(tx) = core.poll_transmit() {
+            write::<T>(s, tx)?;
+            return read::<T>(s, core.deref_mut(), &mut recv_buf);
+        }
+        Ok(Bytes::new())
+    }
+}
+
+fn write<T: Transport>(stream: &mut T::Stream, tx: TxBuf) -> Result<(), 
IggyError> {
+    let mut off = 0usize;
+    let total = tx.total_len();
+
+    while off < total {
+        let mut slices = [IoSlice::new(&[]), IoSlice::new(&[])];
+        let iov = if off < 8 {
+            slices[0] = IoSlice::new(&tx.header[off..]);
+            if !tx.payload.is_empty() {
+                slices[1] = IoSlice::new(&tx.payload);
+                &slices[..2]
+            } else {
+                &slices[..1]
+            }
+        } else {
+            let body_off = off - 8;
+            slices[0] = IoSlice::new(&tx.payload[body_off..]);
+            &slices[..1]
+        };
+
+        let n = stream
+            .write_vectored(iov)
+            .map_err(|_| IggyError::CannotEstablishConnection)?;
+        if n == 0 {
+            return Err(IggyError::CannotEstablishConnection);
+        }
+        off += n;
+    }
+
+    stream
+        .flush()
+        .map_err(|_| IggyError::CannotEstablishConnection)?;
+    Ok(())
+}
+
+fn read<T: Transport>(
+    stream: &mut T::Stream,
+    core: &mut ProtocolCore,
+    recv_buf: &mut BytesMut,
+) -> Result<Bytes, IggyError> {
+    let mut result: Option<Result<Bytes, IggyError>> = None;
+    loop {
+        if recv_buf.spare_capacity_mut().is_empty() {
+            recv_buf.reserve(8192);
+        }
+
+        let spare: &mut [MaybeUninit<u8>] = recv_buf.spare_capacity_mut();
+
+        let buf: &mut [u8] = unsafe { &mut *(spare as *mut [MaybeUninit<u8>] 
as *mut [u8]) };
+
+        let n = match stream.read(buf) {
+            Ok(0) => {
+                core.disconnect();
+                return Err(IggyError::CannotEstablishConnection);
+            }
+            Ok(n) => n,
+            Err(e) => {
+                error!("TcpClientSync: read error: {}", e);
+                return Err(IggyError::CannotEstablishConnection);
+            }
+        };
+
+        unsafe {
+            recv_buf.advance_mut(n);
+        }
+
+        core.process_incoming_with(recv_buf, |_, status: u32, payload| {
+            if status == 0 {
+                result = Some(Ok(payload));
+            } else {
+                result = Some(Err(IggyError::from_code(status)));
+            }
+        });
+        if result.is_some() {
+            return result.unwrap();
+        }
+    }
+}
+
+// Specific implementations for TCP
+impl TcpClientSync<TcpTransport> {
+    /// Create a new TCP client for the provided server address.
+    pub fn new(
+        server_address: &str,
+        auto_sign_in: AutoLogin,
+        heartbeat_interval: IggyDuration,
+    ) -> Result<Self, IggyError> {
+        Self::create(
+            Arc::new(TcpTransport),
+            Arc::new(TcpClientConfig {
+                heartbeat_interval,
+                server_address: server_address.to_string(),
+                auto_login: auto_sign_in,
+                ..Default::default()
+            }),
+        )
+    }
+
+    /// Create a new TCP client from the provided connection string.
+    pub fn from_connection_string(connection_string: &str) -> Result<Self, 
IggyError> {
+        if ConnectionStringUtils::parse_protocol(connection_string)? != 
TransportProtocol::Tcp {
+            return Err(IggyError::InvalidConnectionString);
+        }
+
+        Self::create(
+            Arc::new(TcpTransport),
+            Arc::new(
+                
ConnectionString::<TcpConnectionStringOptions>::from_str(connection_string)?.into(),
+            ),
+        )
+    }
+
+    /// Create a new TCP client based on the provided configuration.
+    pub fn create_tcp(config: Arc<TcpClientConfig>) -> Result<Self, IggyError> 
{
+        Self::create(Arc::new(TcpTransport), config)
+    }
+}
+
+// Type aliases for convenience
+pub type TcpClientSyncTcp = TcpClientSync<TcpTransport>;
+pub type TcpClientSyncTls = TcpClientSync<TcpTlsTransport>;
+
+impl TcpClientSyncTls {
+    /// Create a new TLS TCP client from the provided connection string.
+    pub fn from_connection_string_tls(connection_string: &str) -> Result<Self, 
IggyError> {
+        if ConnectionStringUtils::parse_protocol(connection_string)? != 
TransportProtocol::Tcp {
+            return Err(IggyError::InvalidConnectionString);
+        }
+
+        Self::create(
+            Arc::new(TcpTlsTransport),
+            Arc::new(
+                
ConnectionString::<TcpConnectionStringOptions>::from_str(connection_string)?.into(),
+            ),
+        )
+    }
+
+    /// Create a new TLS TCP client based on the provided configuration.
+    pub fn create_tcp_tls(config: Arc<TcpClientConfig>) -> Result<Self, 
IggyError> {
+        Self::create(Arc::new(TcpTlsTransport), config)
+    }
+
+    /// Create a new TLS TCP client for the provided server address.
+    pub fn new_tls(
+        server_address: &str,
+        domain: &str,
+        auto_sign_in: AutoLogin,
+        heartbeat_interval: IggyDuration,
+    ) -> Result<Self, IggyError> {
+        Self::create(
+            Arc::new(TcpTlsTransport),
+            Arc::new(TcpClientConfig {
+                heartbeat_interval,
+                server_address: server_address.to_string(),
+                tls_enabled: true,
+                tls_domain: domain.to_string(),
+                auto_login: auto_sign_in,
+                ..Default::default()
+            }),
+        )
+    }
+}
+
+impl Default for TcpClientSync<TcpTransport> {
+    fn default() -> Self {
+        
TcpClientSync::create_tcp(Arc::new(TcpClientConfig::default())).unwrap()
+    }
+}
+
+/// Unit tests for TcpClientSync.
+/// Tests connection string parsing, configuration validation, and 
constructors.
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use iggy_common::Credentials;
+
+    #[test]
+    fn should_fail_with_empty_connection_string() {
+        let value = "";
+        let tcp_client = 
TcpClientSync::<TcpTransport>::from_connection_string(value);
+        assert!(tcp_client.is_err());
+    }
+
+    #[test]
+    fn should_fail_without_username() {
+        let connection_string_prefix = "iggy+";
+        let protocol = TransportProtocol::Tcp;
+        let server_address = "127.0.0.1";
+        let port = "1234";
+        let username = "";
+        let password = "secret";
+        let value = format!(
+            
"{connection_string_prefix}{protocol}://{username}:{password}@{server_address}:{port}"
+        );
+        let tcp_client = 
TcpClientSync::<TcpTransport>::from_connection_string(&value);
+        assert!(tcp_client.is_err());
+    }
+
+    #[test]
+    fn should_fail_without_password() {
+        let connection_string_prefix = "iggy+";
+        let protocol = TransportProtocol::Tcp;
+        let server_address = "127.0.0.1";
+        let port = "1234";
+        let username = "user";
+        let password = "";
+        let value = format!(
+            
"{connection_string_prefix}{protocol}://{username}:{password}@{server_address}:{port}"
+        );
+        let tcp_client = 
TcpClientSync::<TcpTransport>::from_connection_string(&value);
+        assert!(tcp_client.is_err());
+    }
+
+    #[test]
+    fn should_fail_without_server_address() {
+        let connection_string_prefix = "iggy+";
+        let protocol = TransportProtocol::Tcp;
+        let server_address = "";
+        let port = "1234";
+        let username = "user";
+        let password = "secret";
+        let value = format!(
+            
"{connection_string_prefix}{protocol}://{username}:{password}@{server_address}:{port}"
+        );
+        let tcp_client = 
TcpClientSync::<TcpTransport>::from_connection_string(&value);
+        assert!(tcp_client.is_err());
+    }
+
+    #[test]
+    fn should_fail_without_port() {
+        let connection_string_prefix = "iggy+";
+        let protocol = TransportProtocol::Tcp;
+        let server_address = "127.0.0.1";
+        let port = "";
+        let username = "user";
+        let password = "secret";
+        let value = format!(
+            
"{connection_string_prefix}{protocol}://{username}:{password}@{server_address}:{port}"
+        );
+        let tcp_client = 
TcpClientSync::<TcpTransport>::from_connection_string(&value);
+        assert!(tcp_client.is_err());
+    }
+
+    #[test]
+    fn should_fail_with_invalid_prefix() {
+        let connection_string_prefix = "invalid+";
+        let protocol = TransportProtocol::Tcp;
+        let server_address = "127.0.0.1";
+        let port = "1234";
+        let username = "user";
+        let password = "secret";
+        let value = format!(
+            
"{connection_string_prefix}{protocol}://{username}:{password}@{server_address}:{port}"
+        );
+        let tcp_client = 
TcpClientSync::<TcpTransport>::from_connection_string(&value);
+        assert!(tcp_client.is_err());
+    }
+
+    #[test]
+    fn should_fail_with_unmatch_protocol() {
+        let connection_string_prefix = "iggy+";
+        let protocol = TransportProtocol::Quic;
+        let server_address = "127.0.0.1";
+        let port = "1234";
+        let username = "user";
+        let password = "secret";
+        let value = format!(
+            
"{connection_string_prefix}{protocol}://{username}:{password}@{server_address}:{port}"
+        );
+        let tcp_client = 
TcpClientSync::<TcpTransport>::from_connection_string(&value);
+        assert!(tcp_client.is_err());
+    }
+
+    #[test]
+    fn should_succeed_with_default_prefix() {
+        let default_connection_string_prefix = "iggy://";
+        let server_address = "127.0.0.1";
+        let port = "1234";
+        let username = "user";
+        let password = "secret";
+        let value = format!(
+            
"{default_connection_string_prefix}{username}:{password}@{server_address}:{port}"
+        );
+        let tcp_client = 
TcpClientSync::<TcpTransport>::from_connection_string(&value);
+        assert!(tcp_client.is_ok());
+    }
+
+    #[test]
+    fn should_fail_with_invalid_options() {
+        let connection_string_prefix = "iggy+";
+        let protocol = TransportProtocol::Tcp;
+        let server_address = "127.0.0.1";
+        let port = "";
+        let username = "user";
+        let password = "secret";
+        let value = format!(
+            
"{connection_string_prefix}{protocol}://{username}:{password}@{server_address}:{port}?invalid_option=invalid"
+        );
+        let tcp_client = 
TcpClientSync::<TcpTransport>::from_connection_string(&value);
+        assert!(tcp_client.is_err());
+    }
+
+    #[test]
+    fn should_succeed_without_options() {
+        let connection_string_prefix = "iggy+";
+        let protocol = TransportProtocol::Tcp;
+        let server_address = "127.0.0.1";
+        let port = "1234";
+        let username = "user";
+        let password = "secret";
+        let value = format!(
+            
"{connection_string_prefix}{protocol}://{username}:{password}@{server_address}:{port}"
+        );
+        let tcp_client = 
TcpClientSync::<TcpTransport>::from_connection_string(&value);
+        assert!(tcp_client.is_ok());
+
+        let tcp_client_sync = tcp_client.unwrap();
+        assert_eq!(
+            tcp_client_sync.config.server_address,
+            format!("{server_address}:{port}")
+        );
+        assert_eq!(
+            tcp_client_sync.config.auto_login,
+            AutoLogin::Enabled(Credentials::UsernamePassword(
+                username.to_string(),
+                password.to_string()
+            ))
+        );
+
+        assert!(!tcp_client_sync.config.tls_enabled);
+        assert!(tcp_client_sync.config.tls_domain.is_empty());
+        assert!(tcp_client_sync.config.tls_ca_file.is_none());
+        assert_eq!(
+            tcp_client_sync.config.heartbeat_interval,
+            IggyDuration::from_str("5s").unwrap()
+        );
+
+        assert!(tcp_client_sync.config.reconnection.enabled);
+        assert!(tcp_client_sync.config.reconnection.max_retries.is_none());
+        assert_eq!(
+            tcp_client_sync.config.reconnection.interval,
+            IggyDuration::from_str("1s").unwrap()
+        );
+        assert_eq!(
+            tcp_client_sync.config.reconnection.reestablish_after,
+            IggyDuration::from_str("5s").unwrap()
+        );
+    }
+
+    #[test]
+    fn should_succeed_with_options() {
+        let connection_string_prefix = "iggy+";
+        let protocol = TransportProtocol::Tcp;
+        let server_address = "127.0.0.1";
+        let port = "1234";
+        let username = "user";
+        let password = "secret";
+        let heartbeat_interval = "10s";
+        let reconnection_retries = "10";
+        let value = format!(
+            
"{connection_string_prefix}{protocol}://{username}:{password}@{server_address}:{port}?heartbeat_interval={heartbeat_interval}&reconnection_retries={reconnection_retries}"
+        );
+        let tcp_client = 
TcpClientSync::<TcpTransport>::from_connection_string(&value);
+        assert!(tcp_client.is_ok());
+
+        let tcp_client_sync = tcp_client.unwrap();
+        assert_eq!(
+            tcp_client_sync.config.server_address,
+            format!("{server_address}:{port}")
+        );
+        assert_eq!(
+            tcp_client_sync.config.auto_login,
+            AutoLogin::Enabled(Credentials::UsernamePassword(
+                username.to_string(),
+                password.to_string()
+            ))
+        );
+
+        assert!(!tcp_client_sync.config.tls_enabled);
+        assert!(tcp_client_sync.config.tls_domain.is_empty());
+        assert!(tcp_client_sync.config.tls_ca_file.is_none());
+        assert_eq!(
+            tcp_client_sync.config.heartbeat_interval,
+            IggyDuration::from_str(heartbeat_interval).unwrap()
+        );
+
+        assert!(tcp_client_sync.config.reconnection.enabled);
+        assert_eq!(
+            tcp_client_sync.config.reconnection.max_retries.unwrap(),
+            reconnection_retries.parse::<u32>().unwrap()
+        );
+        assert_eq!(
+            tcp_client_sync.config.reconnection.interval,
+            IggyDuration::from_str("1s").unwrap()
+        );
+        assert_eq!(
+            tcp_client_sync.config.reconnection.reestablish_after,
+            IggyDuration::from_str("5s").unwrap()
+        );
+    }
+
+    #[test]
+    fn should_succeed_with_pat() {
+        let connection_string_prefix = "iggy+";
+        let protocol = TransportProtocol::Tcp;
+        let server_address = "127.0.0.1";
+        let port = "1234";
+        let pat = "iggypat-1234567890abcdef";
+        let value = 
format!("{connection_string_prefix}{protocol}://{pat}@{server_address}:{port}");
+        let tcp_client = 
TcpClientSync::<TcpTransport>::from_connection_string(&value);
+        assert!(tcp_client.is_ok());
+
+        let tcp_client_sync = tcp_client.unwrap();
+        assert_eq!(
+            tcp_client_sync.config.server_address,
+            format!("{server_address}:{port}")
+        );
+        assert_eq!(
+            tcp_client_sync.config.auto_login,
+            
AutoLogin::Enabled(Credentials::PersonalAccessToken(pat.to_string()))
+        );
+
+        assert!(!tcp_client_sync.config.tls_enabled);
+        assert!(tcp_client_sync.config.tls_domain.is_empty());
+        assert!(tcp_client_sync.config.tls_ca_file.is_none());
+        assert_eq!(
+            tcp_client_sync.config.heartbeat_interval,
+            IggyDuration::from_str("5s").unwrap()
+        );
+
+        assert!(tcp_client_sync.config.reconnection.enabled);
+        assert!(tcp_client_sync.config.reconnection.max_retries.is_none());
+        assert_eq!(
+            tcp_client_sync.config.reconnection.interval,
+            IggyDuration::from_str("1s").unwrap()
+        );
+        assert_eq!(
+            tcp_client_sync.config.reconnection.reestablish_after,
+            IggyDuration::from_str("5s").unwrap()
+        );
+    }
+
+    #[test]
+    fn should_create_tcp_client_with_new() {
+        let server_address = "127.0.0.1:8080";
+        let auto_login = AutoLogin::Enabled(Credentials::UsernamePassword(
+            "user".to_string(),
+            "pass".to_string(),
+        ));
+        let heartbeat_interval = IggyDuration::from_str("10s").unwrap();
+
+        let tcp_client = TcpClientSync::<TcpTransport>::new(
+            server_address,
+            auto_login.clone(),
+            heartbeat_interval,
+        );
+        assert!(tcp_client.is_ok());
+
+        let client = tcp_client.unwrap();
+        assert_eq!(client.config.server_address, server_address);
+        assert_eq!(client.config.auto_login, auto_login);
+        assert_eq!(client.config.heartbeat_interval, heartbeat_interval);
+        assert!(!client.config.tls_enabled);
+    }
+
+    #[test]
+    fn should_create_tls_client_with_new_tls() {
+        let server_address = "127.0.0.1:8080";
+        let domain = "localhost";
+        let auto_login = AutoLogin::Enabled(Credentials::UsernamePassword(
+            "user".to_string(),
+            "pass".to_string(),
+        ));
+        let heartbeat_interval = IggyDuration::from_str("10s").unwrap();
+
+        let tcp_client = TcpClientSyncTls::new_tls(
+            server_address,
+            domain,
+            auto_login.clone(),
+            heartbeat_interval,
+        );
+        assert!(tcp_client.is_ok());
+
+        let client = tcp_client.unwrap();
+        assert_eq!(client.config.server_address, server_address);
+        assert_eq!(client.config.auto_login, auto_login);
+        assert_eq!(client.config.heartbeat_interval, heartbeat_interval);
+        assert!(client.config.tls_enabled);
+        assert_eq!(client.config.tls_domain, domain);
+    }
+
+    #[test]
+    fn should_create_tcp_client_with_config() {
+        let config = Arc::new(TcpClientConfig {
+            server_address: "127.0.0.1:8080".to_string(),
+            auto_login: 
AutoLogin::Enabled(Credentials::PersonalAccessToken("token".to_string())),
+            tls_enabled: false,
+            ..Default::default()
+        });
+
+        let tcp_client = 
TcpClientSync::<TcpTransport>::create_tcp(config.clone());
+        assert!(tcp_client.is_ok());
+
+        let client = tcp_client.unwrap();
+        assert_eq!(client.config.server_address, config.server_address);
+        assert_eq!(client.config.auto_login, config.auto_login);
+        assert!(!client.config.tls_enabled);
+    }
+
+    #[test]
+    fn should_create_tls_client_with_config() {
+        let config = Arc::new(TcpClientConfig {
+            server_address: "127.0.0.1:8080".to_string(),
+            auto_login: 
AutoLogin::Enabled(Credentials::PersonalAccessToken("token".to_string())),
+            tls_enabled: true,
+            tls_domain: "localhost".to_string(),
+            ..Default::default()
+        });
+
+        let tcp_client = TcpClientSyncTls::create_tcp_tls(config.clone());
+        assert!(tcp_client.is_ok());
+
+        let client = tcp_client.unwrap();
+        assert_eq!(client.config.server_address, config.server_address);
+        assert_eq!(client.config.auto_login, config.auto_login);
+        assert!(client.config.tls_enabled);
+        assert_eq!(client.config.tls_domain, config.tls_domain);
+    }
+
+    #[test]
+    fn should_handle_default_client() {
+        let client = TcpClientSync::<TcpTransport>::default();
+        let default_config = TcpClientConfig::default();
+        
+        assert_eq!(client.config.server_address, 
default_config.server_address);
+        assert_eq!(client.config.auto_login, default_config.auto_login);
+        assert_eq!(client.config.heartbeat_interval, 
default_config.heartbeat_interval);
+        assert_eq!(client.config.tls_enabled, default_config.tls_enabled);
+    }
+}

Reply via email to