This is an automated email from the ASF dual-hosted git repository.

numinnex pushed a commit to branch integration_tests
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit ac96fb6910ac3ae11d51a2e34620392952cb3e08
Author: numinex <[email protected]>
AuthorDate: Wed May 13 16:35:09 2026 +0200

    progress
---
 Cargo.lock                                         |  12 +
 Cargo.toml                                         |   1 +
 core/common/Cargo.toml                             |   3 +
 .../traits/binary_impls/personal_access_tokens.rs  |  65 +++++-
 core/common/src/traits/binary_impls/users.rs       |  76 +++++-
 core/common/src/traits/binary_transport.rs         |  15 ++
 core/integration-vsr/Cargo.toml                    |  30 +++
 .../sdk/mod.rs => integration-vsr/src/lib.rs}      |   3 -
 .../sdk => integration-vsr/tests}/hello_world.rs   |   7 +-
 core/integration/tests/sdk/mod.rs                  |   1 -
 core/sdk/Cargo.toml                                |   5 +
 core/sdk/src/lib.rs                                |   2 +
 core/sdk/src/quic/quic_client.rs                   | 111 +++++++--
 core/sdk/src/tcp/tcp_client.rs                     | 150 ++++++++++--
 core/sdk/src/vsr.rs                                | 255 +++++++++++++++++++++
 core/sdk/src/websocket/websocket_client.rs         | 168 ++++++++++----
 16 files changed, 795 insertions(+), 109 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index fe4dfa0cf..0a6290bd8 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -6300,11 +6300,13 @@ dependencies = [
  "async-dropper",
  "async-trait",
  "bon",
+ "bytemuck",
  "bytes",
  "dashmap",
  "flume 0.12.0",
  "futures",
  "futures-util",
+ "iggy_binary_protocol",
  "iggy_common",
  "mockall",
  "quinn",
@@ -7080,6 +7082,16 @@ dependencies = [
  "zip 8.6.0",
 ]
 
+[[package]]
+name = "integration-vsr"
+version = "0.0.1"
+dependencies = [
+ "iggy",
+ "integration",
+ "serial_test",
+ "tokio",
+]
+
 [[package]]
 name = "interpolate_name"
 version = "0.2.4"
diff --git a/Cargo.toml b/Cargo.toml
index 6c8cb8434..5b06d5f05 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -49,6 +49,7 @@ members = [
     "core/consensus",
     "core/harness_derive",
     "core/integration",
+    "core/integration-vsr",
     "core/journal",
     "core/message_bus",
     "core/metadata",
diff --git a/core/common/Cargo.toml b/core/common/Cargo.toml
index cb5aabe61..8b00608fd 100644
--- a/core/common/Cargo.toml
+++ b/core/common/Cargo.toml
@@ -27,6 +27,9 @@ documentation = "https://iggy.apache.org/docs";
 repository = "https://github.com/apache/iggy";
 readme = "README.md"
 
+[features]
+vsr = []
+
 [dependencies]
 aes-gcm = { workspace = true }
 aligned-vec = { workspace = true }
diff --git a/core/common/src/traits/binary_impls/personal_access_tokens.rs 
b/core/common/src/traits/binary_impls/personal_access_tokens.rs
index f85f34d36..4c740d79d 100644
--- a/core/common/src/traits/binary_impls/personal_access_tokens.rs
+++ b/core/common/src/traits/binary_impls/personal_access_tokens.rs
@@ -27,15 +27,27 @@ use iggy_binary_protocol::WireName;
 use iggy_binary_protocol::codec::WireEncode;
 use iggy_binary_protocol::codes::{
     CREATE_PERSONAL_ACCESS_TOKEN_CODE, DELETE_PERSONAL_ACCESS_TOKEN_CODE,
-    GET_PERSONAL_ACCESS_TOKENS_CODE, LOGIN_WITH_PERSONAL_ACCESS_TOKEN_CODE,
+    GET_PERSONAL_ACCESS_TOKENS_CODE,
 };
+#[cfg(feature = "vsr")]
+use iggy_binary_protocol::codes::LOGIN_REGISTER_WITH_PAT_CODE;
+#[cfg(not(feature = "vsr"))]
+use iggy_binary_protocol::codes::LOGIN_WITH_PERSONAL_ACCESS_TOKEN_CODE;
 use iggy_binary_protocol::requests::personal_access_tokens::{
-    CreatePersonalAccessTokenRequest, DeletePersonalAccessTokenRequest,
-    GetPersonalAccessTokensRequest, LoginWithPersonalAccessTokenRequest,
+    CreatePersonalAccessTokenRequest, DeletePersonalAccessTokenRequest, 
GetPersonalAccessTokensRequest,
 };
+#[cfg(not(feature = "vsr"))]
+use 
iggy_binary_protocol::requests::personal_access_tokens::LoginWithPersonalAccessTokenRequest;
+#[cfg(feature = "vsr")]
+use iggy_binary_protocol::requests::users::LoginRegisterWithPatRequest;
 use 
iggy_binary_protocol::responses::personal_access_tokens::create_personal_access_token::RawPersonalAccessTokenResponse;
 use 
iggy_binary_protocol::responses::personal_access_tokens::get_personal_access_tokens::GetPersonalAccessTokensResponse;
+#[cfg(not(feature = "vsr"))]
 use iggy_binary_protocol::responses::users::login_user::IdentityResponse;
+#[cfg(feature = "vsr")]
+use iggy_binary_protocol::responses::users::LoginRegisterResponse;
+#[cfg(feature = "vsr")]
+use secrecy::SecretString;
 
 #[async_trait::async_trait]
 impl<B: BinaryClient> PersonalAccessTokenClient for B {
@@ -90,16 +102,63 @@ impl<B: BinaryClient> PersonalAccessTokenClient for B {
         &self,
         token: &str,
     ) -> Result<IdentityInfo, IggyError> {
+        #[cfg(feature = "vsr")]
+        {
+            let client_id = self.get_vsr_client_id().await?;
+            let response = match self
+                .send_raw_with_response(
+                    LOGIN_REGISTER_WITH_PAT_CODE,
+                    LoginRegisterWithPatRequest {
+                        client_id,
+                        token: SecretString::from(token.to_string()),
+                        version: Some(env!("CARGO_PKG_VERSION").to_string()),
+                        client_context: Some(String::new()),
+                    }
+                    .to_bytes(),
+                )
+                .await
+            {
+                Ok(response) => response,
+                Err(error) => {
+                    self.reset_vsr_session().await?;
+                    return Err(error);
+                }
+            };
+            let wire_resp = match 
super::decode_response::<LoginRegisterResponse>(&response) {
+                Ok(wire_resp) => wire_resp,
+                Err(error) => {
+                    self.reset_vsr_session().await?;
+                    return Err(error);
+                }
+            };
+            if let Err(error) = self.bind_vsr_session(wire_resp.session).await 
{
+                self.reset_vsr_session().await?;
+                return Err(error);
+            }
+            self.set_state(ClientState::Authenticated).await;
+            self.publish_event(DiagnosticEvent::SignedIn).await;
+            return Ok(IdentityInfo {
+                user_id: wire_resp.user_id,
+                access_token: None,
+            });
+        }
+
+        #[cfg(not(feature = "vsr"))]
         let wire_token = WireName::new(token).map_err(|_| 
IggyError::InvalidFormat)?;
+        #[cfg(not(feature = "vsr"))]
         let response = self
             .send_raw_with_response(
                 LOGIN_WITH_PERSONAL_ACCESS_TOKEN_CODE,
                 LoginWithPersonalAccessTokenRequest { token: wire_token 
}.to_bytes(),
             )
             .await?;
+        #[cfg(not(feature = "vsr"))]
         self.set_state(ClientState::Authenticated).await;
+        #[cfg(not(feature = "vsr"))]
         self.publish_event(DiagnosticEvent::SignedIn).await;
+        #[cfg(not(feature = "vsr"))]
         let wire_resp = super::decode_response::<IdentityResponse>(&response)?;
+        #[cfg(not(feature = "vsr"))]
         Ok(IdentityInfo::from(wire_resp))
     }
 }
diff --git a/core/common/src/traits/binary_impls/users.rs 
b/core/common/src/traits/binary_impls/users.rs
index 24a877a4c..183d942ca 100644
--- a/core/common/src/traits/binary_impls/users.rs
+++ b/core/common/src/traits/binary_impls/users.rs
@@ -26,14 +26,27 @@ use iggy_binary_protocol::WireName;
 use iggy_binary_protocol::codec::WireEncode;
 use iggy_binary_protocol::codes::{
     CHANGE_PASSWORD_CODE, CREATE_USER_CODE, DELETE_USER_CODE, GET_USER_CODE, 
GET_USERS_CODE,
-    LOGIN_USER_CODE, LOGOUT_USER_CODE, UPDATE_PERMISSIONS_CODE, 
UPDATE_USER_CODE,
+    UPDATE_PERMISSIONS_CODE, UPDATE_USER_CODE,
 };
+#[cfg(feature = "vsr")]
+use iggy_binary_protocol::codes::LOGIN_REGISTER_CODE;
+#[cfg(not(feature = "vsr"))]
+use iggy_binary_protocol::codes::{LOGIN_USER_CODE, LOGOUT_USER_CODE};
 use iggy_binary_protocol::requests::users::{
     ChangePasswordRequest, CreateUserRequest, DeleteUserRequest, 
GetUserRequest, GetUsersRequest,
-    LoginUserRequest, LogoutUserRequest, UpdatePermissionsRequest, 
UpdateUserRequest,
+    UpdatePermissionsRequest, UpdateUserRequest,
 };
+#[cfg(feature = "vsr")]
+use iggy_binary_protocol::requests::users::LoginRegisterRequest;
+#[cfg(not(feature = "vsr"))]
+use iggy_binary_protocol::requests::users::{LoginUserRequest, 
LogoutUserRequest};
+#[cfg(feature = "vsr")]
+use iggy_binary_protocol::responses::users::LoginRegisterResponse;
+#[cfg(not(feature = "vsr"))]
 use iggy_binary_protocol::responses::users::login_user::IdentityResponse;
 use iggy_binary_protocol::responses::users::{GetUsersResponse, 
UserDetailsResponse};
+#[cfg(feature = "vsr")]
+use secrecy::SecretString;
 
 #[async_trait::async_trait]
 impl<B: BinaryClient> UserClient for B {
@@ -169,7 +182,52 @@ impl<B: BinaryClient> UserClient for B {
     }
 
     async fn login_user(&self, username: &str, password: &str) -> 
Result<IdentityInfo, IggyError> {
+        #[cfg(feature = "vsr")]
+        {
+            let wire_name = WireName::new(username).map_err(|_| 
IggyError::InvalidFormat)?;
+            let client_id = self.get_vsr_client_id().await?;
+            let response = match self
+                .send_raw_with_response(
+                    LOGIN_REGISTER_CODE,
+                    LoginRegisterRequest {
+                        client_id,
+                        username: wire_name,
+                        password: SecretString::from(password.to_string()),
+                        version: Some(env!("CARGO_PKG_VERSION").to_string()),
+                        client_context: Some(String::new()),
+                    }
+                    .to_bytes(),
+                )
+                .await
+            {
+                Ok(response) => response,
+                Err(error) => {
+                    self.reset_vsr_session().await?;
+                    return Err(error);
+                }
+            };
+            let wire_resp = match 
super::decode_response::<LoginRegisterResponse>(&response) {
+                Ok(wire_resp) => wire_resp,
+                Err(error) => {
+                    self.reset_vsr_session().await?;
+                    return Err(error);
+                }
+            };
+            if let Err(error) = self.bind_vsr_session(wire_resp.session).await 
{
+                self.reset_vsr_session().await?;
+                return Err(error);
+            }
+            self.set_state(ClientState::Authenticated).await;
+            self.publish_event(DiagnosticEvent::SignedIn).await;
+            return Ok(IdentityInfo {
+                user_id: wire_resp.user_id,
+                access_token: None,
+            });
+        }
+
+        #[cfg(not(feature = "vsr"))]
         let wire_name = WireName::new(username).map_err(|_| 
IggyError::InvalidFormat)?;
+        #[cfg(not(feature = "vsr"))]
         let response = self
             .send_raw_with_response(
                 LOGIN_USER_CODE,
@@ -182,18 +240,32 @@ impl<B: BinaryClient> UserClient for B {
                 .to_bytes(),
             )
             .await?;
+        #[cfg(not(feature = "vsr"))]
         self.set_state(ClientState::Authenticated).await;
+        #[cfg(not(feature = "vsr"))]
         self.publish_event(DiagnosticEvent::SignedIn).await;
+        #[cfg(not(feature = "vsr"))]
         let wire_resp = super::decode_response::<IdentityResponse>(&response)?;
+        #[cfg(not(feature = "vsr"))]
         Ok(IdentityInfo::from(wire_resp))
     }
 
     async fn logout_user(&self) -> Result<(), IggyError> {
+        #[cfg(feature = "vsr")]
+        {
+            return Err(IggyError::FeatureUnavailable);
+        }
+
+        #[cfg(not(feature = "vsr"))]
         fail_if_not_authenticated(self).await?;
+        #[cfg(not(feature = "vsr"))]
         self.send_raw_with_response(LOGOUT_USER_CODE, 
LogoutUserRequest.to_bytes())
             .await?;
+        #[cfg(not(feature = "vsr"))]
         self.set_state(ClientState::Connected).await;
+        #[cfg(not(feature = "vsr"))]
         self.publish_event(DiagnosticEvent::SignedOut).await;
+        #[cfg(not(feature = "vsr"))]
         Ok(())
     }
 }
diff --git a/core/common/src/traits/binary_transport.rs 
b/core/common/src/traits/binary_transport.rs
index 4f6af3ba8..99dd73393 100644
--- a/core/common/src/traits/binary_transport.rs
+++ b/core/common/src/traits/binary_transport.rs
@@ -29,4 +29,19 @@ pub trait BinaryTransport {
     async fn publish_event(&self, event: DiagnosticEvent);
     async fn send_raw_with_response(&self, code: u32, payload: Bytes) -> 
Result<Bytes, IggyError>;
     fn get_heartbeat_interval(&self) -> IggyDuration;
+
+    #[cfg(feature = "vsr")]
+    async fn get_vsr_client_id(&self) -> Result<u128, IggyError> {
+        Err(IggyError::FeatureUnavailable)
+    }
+
+    #[cfg(feature = "vsr")]
+    async fn bind_vsr_session(&self, _session: u64) -> Result<(), IggyError> {
+        Err(IggyError::FeatureUnavailable)
+    }
+
+    #[cfg(feature = "vsr")]
+    async fn reset_vsr_session(&self) -> Result<(), IggyError> {
+        Err(IggyError::FeatureUnavailable)
+    }
 }
diff --git a/core/integration-vsr/Cargo.toml b/core/integration-vsr/Cargo.toml
new file mode 100644
index 000000000..4f956b2b9
--- /dev/null
+++ b/core/integration-vsr/Cargo.toml
@@ -0,0 +1,30 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[package]
+name = "integration-vsr"
+version = "0.0.1"
+edition = "2024"
+license = "Apache-2.0"
+publish = false
+
+[dependencies]
+iggy = { workspace = true, features = ["vsr"] }
+integration = { path = "../integration" }
+serial_test = { workspace = true }
+tokio = { workspace = true, features = ["full", "test-util"] }
+
diff --git a/core/integration/tests/sdk/mod.rs b/core/integration-vsr/src/lib.rs
similarity index 96%
copy from core/integration/tests/sdk/mod.rs
copy to core/integration-vsr/src/lib.rs
index 05acd11c6..31bd66e6e 100644
--- a/core/integration/tests/sdk/mod.rs
+++ b/core/integration-vsr/src/lib.rs
@@ -15,6 +15,3 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
-mod hello_world;
-mod producer;
diff --git a/core/integration/tests/sdk/hello_world.rs 
b/core/integration-vsr/tests/hello_world.rs
similarity index 86%
rename from core/integration/tests/sdk/hello_world.rs
rename to core/integration-vsr/tests/hello_world.rs
index 42d5a25ba..bcb94f461 100644
--- a/core/integration/tests/sdk/hello_world.rs
+++ b/core/integration-vsr/tests/hello_world.rs
@@ -24,6 +24,9 @@ use integration::iggy_harness;
     server(executable_path = "iggy-server-ng")
 )]
 async fn hello_world(harness: &TestHarness) {
-    let client = harness.root_client().await.unwrap();
-    client.ping().await.unwrap();
+    let client = harness.new_client().await.unwrap();
+    client
+        .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD)
+        .await
+        .unwrap();
 }
diff --git a/core/integration/tests/sdk/mod.rs 
b/core/integration/tests/sdk/mod.rs
index 05acd11c6..6d94bfa61 100644
--- a/core/integration/tests/sdk/mod.rs
+++ b/core/integration/tests/sdk/mod.rs
@@ -16,5 +16,4 @@
  * under the License.
  */
 
-mod hello_world;
 mod producer;
diff --git a/core/sdk/Cargo.toml b/core/sdk/Cargo.toml
index 2056ae56d..091779e22 100644
--- a/core/sdk/Cargo.toml
+++ b/core/sdk/Cargo.toml
@@ -28,16 +28,21 @@ documentation = "https://iggy.apache.org/docs";
 repository = "https://github.com/apache/iggy";
 readme = "README.md"
 
+[features]
+vsr = ["iggy_common/vsr"]
+
 [dependencies]
 async-broadcast = { workspace = true }
 async-dropper = { workspace = true }
 async-trait = { workspace = true }
 bon = { workspace = true }
+bytemuck = { workspace = true }
 bytes = { workspace = true }
 dashmap = { workspace = true }
 flume = { workspace = true }
 futures = { workspace = true }
 futures-util = { workspace = true }
+iggy_binary_protocol = { workspace = true }
 iggy_common = { workspace = true }
 quinn = { workspace = true }
 reqwest = { workspace = true }
diff --git a/core/sdk/src/lib.rs b/core/sdk/src/lib.rs
index 35f0e1b73..9e738728e 100644
--- a/core/sdk/src/lib.rs
+++ b/core/sdk/src/lib.rs
@@ -28,4 +28,6 @@ pub mod quic;
 pub mod session;
 pub mod stream_builder;
 pub mod tcp;
+#[cfg(feature = "vsr")]
+mod vsr;
 pub mod websocket;
diff --git a/core/sdk/src/quic/quic_client.rs b/core/sdk/src/quic/quic_client.rs
index 0824e962a..7217bc319 100644
--- a/core/sdk/src/quic/quic_client.rs
+++ b/core/sdk/src/quic/quic_client.rs
@@ -18,6 +18,8 @@
 
 use crate::leader_aware::{LeaderRedirectionState, 
check_and_redirect_to_leader};
 use crate::prelude::AutoLogin;
+#[cfg(feature = "vsr")]
+use crate::session::ConsensusSession;
 use iggy_common::{BinaryClient, BinaryTransport, Client, 
PersonalAccessTokenClient, UserClient};
 
 use crate::prelude::{IggyDuration, IggyError, IggyTimestamp, QuicClientConfig};
@@ -41,7 +43,9 @@ use tokio::sync::Mutex;
 use tokio::time::sleep;
 use tracing::{error, info, trace, warn};
 
+#[cfg(not(feature = "vsr"))]
 const REQUEST_INITIAL_BYTES_LENGTH: usize = 4;
+#[cfg(not(feature = "vsr"))]
 const RESPONSE_INITIAL_BYTES_LENGTH: usize = 8;
 const NAME: &str = "Iggy";
 
@@ -56,6 +60,8 @@ pub struct QuicClient {
     pub(crate) connected_at: Mutex<Option<IggyTimestamp>>,
     leader_redirection_state: Mutex<LeaderRedirectionState>,
     pub(crate) current_server_address: Mutex<String>,
+    #[cfg(feature = "vsr")]
+    consensus_session: Arc<Mutex<ConsensusSession>>,
 }
 
 unsafe impl Send for QuicClient {}
@@ -139,6 +145,32 @@ impl BinaryTransport for QuicClient {
     fn get_heartbeat_interval(&self) -> IggyDuration {
         self.config.heartbeat_interval
     }
+
+    #[cfg(feature = "vsr")]
+    async fn get_vsr_client_id(&self) -> Result<u128, IggyError> {
+        Ok(self.consensus_session.lock().await.client_id())
+    }
+
+    #[cfg(feature = "vsr")]
+    async fn bind_vsr_session(&self, session: u64) -> Result<(), IggyError> {
+        if session == 0 {
+            return Err(IggyError::InvalidConfiguration);
+        }
+
+        let mut consensus_session = self.consensus_session.lock().await;
+        if consensus_session.is_bound() {
+            return Err(IggyError::InvalidConfiguration);
+        }
+
+        consensus_session.bind(session);
+        Ok(())
+    }
+
+    #[cfg(feature = "vsr")]
+    async fn reset_vsr_session(&self) -> Result<(), IggyError> {
+        *self.consensus_session.lock().await = ConsensusSession::new();
+        Ok(())
+    }
 }
 
 impl BinaryClient for QuicClient {}
@@ -205,6 +237,8 @@ impl QuicClient {
             connected_at: Mutex::new(None),
             leader_redirection_state: 
Mutex::new(LeaderRedirectionState::new()),
             current_server_address: Mutex::new(server_address),
+            #[cfg(feature = "vsr")]
+            consensus_session: Arc::new(Mutex::new(ConsensusSession::new())),
         })
     }
 
@@ -234,34 +268,43 @@ impl QuicClient {
             return Err(IggyError::EmptyResponse);
         }
 
-        let status = u32::from_le_bytes(
-            buffer[..4]
-                .try_into()
-                .map_err(|_| IggyError::InvalidNumberEncoding)?,
-        );
-        if status != 0 {
-            error!(
-                "Received an invalid response with status: {} ({}).",
-                status,
-                IggyError::from_code_as_string(status)
+        #[cfg(feature = "vsr")]
+        {
+            return crate::vsr::decode_response(&buffer);
+        }
+
+        #[cfg(not(feature = "vsr"))]
+        {
+            let status = u32::from_le_bytes(
+                buffer[..4]
+                    .try_into()
+                    .map_err(|_| IggyError::InvalidNumberEncoding)?,
             );
+            if status != 0 {
+                error!(
+                    "Received an invalid response with status: {} ({}).",
+                    status,
+                    IggyError::from_code_as_string(status)
+                );
 
-            return Err(IggyError::from_code(status));
-        }
+                return Err(IggyError::from_code(status));
+            }
 
-        let length = u32::from_le_bytes(
-            buffer[4..RESPONSE_INITIAL_BYTES_LENGTH]
-                .try_into()
-                .map_err(|_| IggyError::InvalidNumberEncoding)?,
-        );
-        trace!("Status: OK. Response length: {}", length);
-        if length <= 1 {
-            return Ok(Bytes::new());
-        }
+            let length = u32::from_le_bytes(
+                buffer[4..RESPONSE_INITIAL_BYTES_LENGTH]
+                    .try_into()
+                    .map_err(|_| IggyError::InvalidNumberEncoding)?,
+            );
+            trace!("Status: OK. Response length: {}", length);
+            if length <= 1 {
+                return Ok(Bytes::new());
+            }
 
-        Ok(Bytes::copy_from_slice(
-            
&buffer[RESPONSE_INITIAL_BYTES_LENGTH..RESPONSE_INITIAL_BYTES_LENGTH + length 
as usize],
-        ))
+            Ok(Bytes::copy_from_slice(
+                &buffer[RESPONSE_INITIAL_BYTES_LENGTH
+                    ..RESPONSE_INITIAL_BYTES_LENGTH + length as usize],
+            ))
+        }
     }
 
     async fn connect(&self) -> Result<(), IggyError> {
@@ -469,6 +512,8 @@ impl QuicClient {
         }
 
         self.endpoint.wait_idle().await;
+        #[cfg(feature = "vsr")]
+        self.reset_vsr_session().await?;
         self.set_state(ClientState::Shutdown).await;
         self.publish_event(DiagnosticEvent::Shutdown).await;
         info!("{NAME} QUIC client has been shutdown.");
@@ -487,6 +532,8 @@ impl QuicClient {
         self.set_state(ClientState::Disconnected).await;
         self.connection.lock().await.take();
         self.endpoint.wait_idle().await;
+        #[cfg(feature = "vsr")]
+        self.reset_vsr_session().await?;
         self.publish_event(DiagnosticEvent::Disconnected).await;
         let now = IggyTimestamp::now();
         info!(
@@ -521,26 +568,42 @@ impl QuicClient {
 
         let connection = self.connection.clone();
         let response_buffer_size = self.config.response_buffer_size;
+        #[cfg(feature = "vsr")]
+        let consensus_session = self.consensus_session.clone();
         // SAFETY: we run code holding the `connection` lock in a task so we 
can't be cancelled while holding the lock.
         tokio::spawn(async move {
             let connection = connection.lock().await;
             if let Some(connection) = connection.as_ref() {
+                #[cfg(feature = "vsr")]
+                let request = {
+                    let mut consensus_session = consensus_session.lock().await;
+                    crate::vsr::encode_request(&mut consensus_session, code, 
&payload)?
+                };
+                #[cfg(not(feature = "vsr"))]
                 let payload_length = payload.len() + 
REQUEST_INITIAL_BYTES_LENGTH;
                 let (mut send, mut recv) = 
connection.open_bi().await.map_err(|error| {
                     error!("Failed to open a bidirectional stream: {error}");
                     IggyError::QuicError
                 })?;
                 trace!("Sending a QUIC request with code: {code}");
+                #[cfg(feature = "vsr")]
+                send.write_all(&request).await.map_err(|error| {
+                    error!("Failed to write VSR request: {error}");
+                    IggyError::QuicError
+                })?;
+                #[cfg(not(feature = "vsr"))]
                 send.write_all(&(payload_length as u32).to_le_bytes())
                     .await
                     .map_err(|error| {
                         error!("Failed to write payload length: {error}");
                         IggyError::QuicError
                     })?;
+                #[cfg(not(feature = "vsr"))]
                 send.write_all(&code.to_le_bytes()).await.map_err(|error| {
                     error!("Failed to write payload code: {error}");
                     IggyError::QuicError
                 })?;
+                #[cfg(not(feature = "vsr"))]
                 send.write_all(&payload).await.map_err(|error| {
                     error!("Failed to write payload: {error}");
                     IggyError::QuicError
diff --git a/core/sdk/src/tcp/tcp_client.rs b/core/sdk/src/tcp/tcp_client.rs
index 059ca4d02..5a1a1502e 100644
--- a/core/sdk/src/tcp/tcp_client.rs
+++ b/core/sdk/src/tcp/tcp_client.rs
@@ -19,6 +19,8 @@
 use crate::leader_aware::{LeaderRedirectionState, 
check_and_redirect_to_leader};
 use crate::prelude::Client;
 use crate::prelude::TcpClientConfig;
+#[cfg(feature = "vsr")]
+use crate::session::ConsensusSession;
 use crate::tcp::tcp_connection_stream::TcpConnectionStream;
 use crate::tcp::tcp_connection_stream_kind::ConnectionStreamKind;
 use crate::tcp::tcp_tls_connection_stream::TcpTlsConnectionStream;
@@ -27,9 +29,10 @@ use async_trait::async_trait;
 use bytes::{BufMut, Bytes, BytesMut};
 use iggy_common::{
     AutoLogin, ClientState, ConnectionString, ConnectionStringUtils, 
Credentials, DiagnosticEvent,
-    IggyDuration, IggyError, IggyErrorDiscriminants, IggyTimestamp, 
TcpConnectionStringOptions,
-    TransportProtocol,
+    IggyDuration, IggyError, IggyTimestamp, TcpConnectionStringOptions, 
TransportProtocol,
 };
+#[cfg(not(feature = "vsr"))]
+use iggy_common::IggyErrorDiscriminants;
 use iggy_common::{BinaryClient, BinaryTransport, PersonalAccessTokenClient, 
UserClient};
 use rustls::pki_types::{CertificateDer, ServerName, pem::PemObject};
 use secrecy::ExposeSecret;
@@ -42,7 +45,9 @@ use tokio::time::sleep;
 use tokio_rustls::{TlsConnector, TlsStream};
 use tracing::{error, info, trace, warn};
 
+#[cfg(not(feature = "vsr"))]
 const REQUEST_INITIAL_BYTES_LENGTH: usize = 4;
+#[cfg(not(feature = "vsr"))]
 const RESPONSE_INITIAL_BYTES_LENGTH: usize = 8;
 const NAME: &str = "Iggy";
 
@@ -58,6 +63,8 @@ pub struct TcpClient {
     pub(crate) connected_at: Mutex<Option<IggyTimestamp>>,
     leader_redirection_state: Mutex<LeaderRedirectionState>,
     pub(crate) current_server_address: Mutex<String>,
+    #[cfg(feature = "vsr")]
+    consensus_session: Arc<Mutex<ConsensusSession>>,
 }
 
 impl Default for TcpClient {
@@ -144,6 +151,32 @@ impl BinaryTransport for TcpClient {
     fn get_heartbeat_interval(&self) -> IggyDuration {
         self.config.heartbeat_interval
     }
+
+    #[cfg(feature = "vsr")]
+    async fn get_vsr_client_id(&self) -> Result<u128, IggyError> {
+        Ok(self.consensus_session.lock().await.client_id())
+    }
+
+    #[cfg(feature = "vsr")]
+    async fn bind_vsr_session(&self, session: u64) -> Result<(), IggyError> {
+        if session == 0 {
+            return Err(IggyError::InvalidConfiguration);
+        }
+
+        let mut consensus_session = self.consensus_session.lock().await;
+        if consensus_session.is_bound() {
+            return Err(IggyError::InvalidConfiguration);
+        }
+
+        consensus_session.bind(session);
+        Ok(())
+    }
+
+    #[cfg(feature = "vsr")]
+    async fn reset_vsr_session(&self) -> Result<(), IggyError> {
+        *self.consensus_session.lock().await = ConsensusSession::new();
+        Ok(())
+    }
 }
 
 impl BinaryClient for TcpClient {}
@@ -203,9 +236,12 @@ impl TcpClient {
             connected_at: Mutex::new(None),
             leader_redirection_state: 
Mutex::new(LeaderRedirectionState::new()),
             current_server_address: Mutex::new(server_address),
+            #[cfg(feature = "vsr")]
+            consensus_session: Arc::new(Mutex::new(ConsensusSession::new())),
         })
     }
 
+    #[cfg(not(feature = "vsr"))]
     async fn handle_response(
         status: u32,
         length: u32,
@@ -510,6 +546,8 @@ impl TcpClient {
         info!("{NAME} client: {client_address} is disconnecting from 
server...");
         self.set_state(ClientState::Disconnected).await;
         self.stream.lock().await.take();
+        #[cfg(feature = "vsr")]
+        self.reset_vsr_session().await?;
         self.publish_event(DiagnosticEvent::Disconnected).await;
         let now = IggyTimestamp::now();
         info!("{NAME} client: {client_address} has disconnected from server 
at: {now}.");
@@ -527,6 +565,8 @@ impl TcpClient {
         if let Some(mut stream) = stream {
             stream.shutdown().await?;
         }
+        #[cfg(feature = "vsr")]
+        self.reset_vsr_session().await?;
         self.set_state(ClientState::Shutdown).await;
         self.publish_event(DiagnosticEvent::Shutdown).await;
         info!("{NAME} TCP client: {client_address} has been shutdown.");
@@ -551,43 +591,103 @@ impl TcpClient {
         }
 
         let stream = self.stream.clone();
+        #[cfg(feature = "vsr")]
+        let consensus_session = self.consensus_session.clone();
         // SAFETY: we run code holding the `stream` lock in a task so we can't 
be cancelled while holding the lock.
         tokio::spawn(async move {
             let mut stream = stream.lock().await;
             if let Some(stream) = stream.as_mut() {
+                #[cfg(feature = "vsr")]
+                let request = {
+                    let mut consensus_session = consensus_session.lock().await;
+                    crate::vsr::encode_request(&mut consensus_session, code, 
&payload)?
+                };
+                #[cfg(not(feature = "vsr"))]
                 let payload_length = payload.len() + 
REQUEST_INITIAL_BYTES_LENGTH;
+                #[cfg(feature = "vsr")]
+                trace!(
+                    "Sending a TCP VSR request of size {} with code: {code}",
+                    request.len()
+                );
+                #[cfg(not(feature = "vsr"))]
                 trace!("Sending a TCP request of size {payload_length} with 
code: {code}");
+                #[cfg(feature = "vsr")]
+                stream.write(&request).await?;
+                #[cfg(not(feature = "vsr"))]
                 stream.write(&(payload_length as u32).to_le_bytes()).await?;
+                #[cfg(not(feature = "vsr"))]
                 stream.write(&code.to_le_bytes()).await?;
+                #[cfg(not(feature = "vsr"))]
                 stream.write(&payload).await?;
                 stream.flush().await?;
                 trace!("Sent a TCP request with code: {code}, waiting for a 
response...");
-                let mut response_buffer = [0u8; RESPONSE_INITIAL_BYTES_LENGTH];
-                let read_bytes = stream.read(&mut 
response_buffer).await.map_err(|error| {
-                    error!(
-                        "Failed to read response for TCP request with code: 
{code}: {error}",
-                        code = code,
-                        error = error
-                    );
-                    IggyError::Disconnected
-                })?;
+                #[cfg(feature = "vsr")]
+                {
+                    let mut response_header = [0u8; 
iggy_binary_protocol::HEADER_SIZE];
+                    let read_bytes = stream.read(&mut 
response_header).await.map_err(|error| {
+                        error!(
+                            "Failed to read VSR response header for TCP 
request with code: {code}: {error}",
+                            code = code,
+                            error = error
+                        );
+                        IggyError::Disconnected
+                    })?;
+
+                    if read_bytes != iggy_binary_protocol::HEADER_SIZE {
+                        error!("Received an invalid or empty VSR response 
header.");
+                        return Err(IggyError::EmptyResponse);
+                    }
+
+                    let response_size = 
crate::vsr::response_size(&response_header)?;
+                    let mut response = BytesMut::with_capacity(response_size);
+                    response.put_slice(&response_header);
+
+                    if response_size > iggy_binary_protocol::HEADER_SIZE {
+                        let body_size = response_size - 
iggy_binary_protocol::HEADER_SIZE;
+                        let mut body = vec![0u8; body_size];
+                        stream.read(&mut body).await.map_err(|error| {
+                            error!(
+                                "Failed to read VSR response body for TCP 
request with code: {code}: {error}",
+                                code = code,
+                                error = error
+                            );
+                            IggyError::Disconnected
+                        })?;
+                        response.put_slice(&body);
+                    }
 
-                if read_bytes != RESPONSE_INITIAL_BYTES_LENGTH {
-                    error!("Received an invalid or empty response.");
-                    return Err(IggyError::EmptyResponse);
+                    return crate::vsr::decode_response(&response.freeze());
                 }
 
-                let status = u32::from_le_bytes(
-                    response_buffer[..4]
-                        .try_into()
-                        .map_err(|_| IggyError::InvalidNumberEncoding)?,
-                );
-                let length = u32::from_le_bytes(
-                    response_buffer[4..]
-                        .try_into()
-                        .map_err(|_| IggyError::InvalidNumberEncoding)?,
-                );
-                return TcpClient::handle_response(status, length, 
stream).await;
+                #[cfg(not(feature = "vsr"))]
+                {
+                    let mut response_buffer = [0u8; 
RESPONSE_INITIAL_BYTES_LENGTH];
+                    let read_bytes = stream.read(&mut 
response_buffer).await.map_err(|error| {
+                        error!(
+                            "Failed to read response for TCP request with 
code: {code}: {error}",
+                            code = code,
+                            error = error
+                        );
+                        IggyError::Disconnected
+                    })?;
+
+                    if read_bytes != RESPONSE_INITIAL_BYTES_LENGTH {
+                        error!("Received an invalid or empty response.");
+                        return Err(IggyError::EmptyResponse);
+                    }
+
+                    let status = u32::from_le_bytes(
+                        response_buffer[..4]
+                            .try_into()
+                            .map_err(|_| IggyError::InvalidNumberEncoding)?,
+                    );
+                    let length = u32::from_le_bytes(
+                        response_buffer[4..]
+                            .try_into()
+                            .map_err(|_| IggyError::InvalidNumberEncoding)?,
+                    );
+                    return TcpClient::handle_response(status, length, 
stream).await;
+                }
             }
 
             error!("Cannot send data. Client is not connected.");
diff --git a/core/sdk/src/vsr.rs b/core/sdk/src/vsr.rs
new file mode 100644
index 000000000..c7fcb934e
--- /dev/null
+++ b/core/sdk/src/vsr.rs
@@ -0,0 +1,255 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::session::ConsensusSession;
+use bytes::{BufMut, Bytes, BytesMut};
+use iggy_binary_protocol::codec::WireDecode;
+use iggy_binary_protocol::codes::{
+    DELETE_CONSUMER_OFFSET_2_CODE, DELETE_CONSUMER_OFFSET_CODE, 
DELETE_SEGMENTS_CODE,
+    LOGIN_REGISTER_CODE, LOGIN_REGISTER_WITH_PAT_CODE, SEND_MESSAGES_CODE,
+    STORE_CONSUMER_OFFSET_2_CODE, STORE_CONSUMER_OFFSET_CODE,
+};
+use iggy_binary_protocol::consensus::{
+    Command2, HEADER_SIZE, Operation, ReplyHeader, RequestHeader, 
read_size_field,
+};
+use iggy_binary_protocol::requests::consumer_offsets::{
+    DeleteConsumerOffset2Request, DeleteConsumerOffsetRequest, 
StoreConsumerOffset2Request,
+    StoreConsumerOffsetRequest,
+};
+use iggy_binary_protocol::requests::messages::SendMessagesHeader;
+use iggy_binary_protocol::requests::segments::DeleteSegmentsRequest;
+use iggy_binary_protocol::{WireIdentifier, WirePartitioning};
+use iggy_common::sharding::IggyNamespace;
+use iggy_common::{IggyError, IggyTimestamp};
+
+pub(crate) fn encode_request(
+    session: &mut ConsensusSession,
+    code: u32,
+    payload: &Bytes,
+) -> Result<Bytes, IggyError> {
+    let (operation, request_id, session_id) = match code {
+        LOGIN_REGISTER_CODE | LOGIN_REGISTER_WITH_PAT_CODE => {
+            (Operation::Register, session.register_request_id(), 0)
+        }
+        _ => {
+            let operation = 
Operation::from_command_code(code).ok_or(IggyError::FeatureUnavailable)?;
+            let session_id = 
session.session().ok_or(IggyError::Unauthenticated)?;
+            (operation, session.next_request_id(), session_id)
+        }
+    };
+    let namespace = namespace_for_request(code, payload, operation)?;
+    let total_size = HEADER_SIZE
+        .checked_add(payload.len())
+        .ok_or(IggyError::InvalidConfiguration)?;
+    let header = RequestHeader {
+        command: Command2::Request,
+        operation,
+        size: total_size as u32,
+        client: session.client_id(),
+        request: request_id,
+        session: session_id,
+        namespace,
+        timestamp: IggyTimestamp::now().as_micros(),
+        ..Default::default()
+    };
+
+    let mut request = BytesMut::with_capacity(total_size);
+    request.put_slice(bytemuck::bytes_of(&header));
+    request.put_slice(payload);
+    Ok(request.freeze())
+}
+
+pub(crate) fn response_size(header: &[u8]) -> Result<usize, IggyError> {
+    let size = read_size_field(header).ok_or(IggyError::InvalidCommand)? as 
usize;
+    if size < HEADER_SIZE {
+        return Err(IggyError::InvalidCommand);
+    }
+    Ok(size)
+}
+
+pub(crate) fn decode_response(response: &[u8]) -> Result<Bytes, IggyError> {
+    if response.len() < HEADER_SIZE {
+        return Err(IggyError::EmptyResponse);
+    }
+
+    let header = 
bytemuck::checked::try_from_bytes::<ReplyHeader>(&response[..HEADER_SIZE])
+        .map_err(|_| IggyError::InvalidCommand)?;
+    if header.command != Command2::Reply {
+        return Err(IggyError::InvalidCommand);
+    }
+
+    let total_size = header.size as usize;
+    if total_size < HEADER_SIZE || response.len() < total_size {
+        return Err(IggyError::InvalidCommand);
+    }
+
+    Ok(Bytes::copy_from_slice(&response[HEADER_SIZE..total_size]))
+}
+
+fn namespace_for_request(
+    code: u32,
+    payload: &Bytes,
+    operation: Operation,
+) -> Result<u64, IggyError> {
+    // Control-plane requests do not target a concrete stream/topic/partition 
shard.
+    // They are encoded with namespace 0 and routed through metadata/shard 0.
+    if operation == Operation::Register || operation.is_metadata() {
+        return Ok(0);
+    }
+
+    let namespace = match code {
+        SEND_MESSAGES_CODE => {
+            if payload.len() < 4 {
+                return Err(IggyError::InvalidCommand);
+            }
+            let metadata_length =
+                u32::from_le_bytes(payload[..4].try_into().map_err(|_| 
IggyError::InvalidNumberEncoding)?)
+                    as usize;
+            if payload.len() < 4 + metadata_length {
+                return Err(IggyError::InvalidCommand);
+            }
+            let header = SendMessagesHeader::decode_from(&payload[4..4 + 
metadata_length])
+                .map_err(|_| IggyError::InvalidCommand)?;
+            namespace_from_partitioning(
+                &header.stream_id,
+                &header.topic_id,
+                &header.partitioning,
+            )?
+        }
+        STORE_CONSUMER_OFFSET_CODE => {
+            let request = StoreConsumerOffsetRequest::decode_from(payload)
+                .map_err(|_| IggyError::InvalidCommand)?;
+            namespace_from_partition(&request.stream_id, &request.topic_id, 
request.partition_id)?
+        }
+        DELETE_CONSUMER_OFFSET_CODE => {
+            let request = DeleteConsumerOffsetRequest::decode_from(payload)
+                .map_err(|_| IggyError::InvalidCommand)?;
+            namespace_from_partition(&request.stream_id, &request.topic_id, 
request.partition_id)?
+        }
+        STORE_CONSUMER_OFFSET_2_CODE => {
+            let request = StoreConsumerOffset2Request::decode_from(payload)
+                .map_err(|_| IggyError::InvalidCommand)?;
+            namespace_from_partition(&request.stream_id, &request.topic_id, 
request.partition_id)?
+        }
+        DELETE_CONSUMER_OFFSET_2_CODE => {
+            let request = DeleteConsumerOffset2Request::decode_from(payload)
+                .map_err(|_| IggyError::InvalidCommand)?;
+            namespace_from_partition(&request.stream_id, &request.topic_id, 
request.partition_id)?
+        }
+        DELETE_SEGMENTS_CODE => {
+            let request = DeleteSegmentsRequest::decode_from(payload)
+                .map_err(|_| IggyError::InvalidCommand)?;
+            namespace_from_partition(
+                &request.stream_id,
+                &request.topic_id,
+                Some(request.partition_id),
+            )?
+        }
+        _ => return Err(IggyError::FeatureUnavailable),
+    };
+
+    Ok(namespace)
+}
+
+fn namespace_from_partitioning(
+    stream_id: &WireIdentifier,
+    topic_id: &WireIdentifier,
+    partitioning: &WirePartitioning,
+) -> Result<u64, IggyError> {
+    let WirePartitioning::PartitionId(partition_id) = partitioning else {
+        return Err(IggyError::FeatureUnavailable);
+    };
+    namespace_from_partition(stream_id, topic_id, Some(*partition_id))
+}
+
+fn namespace_from_partition(
+    stream_id: &WireIdentifier,
+    topic_id: &WireIdentifier,
+    partition_id: Option<u32>,
+) -> Result<u64, IggyError> {
+    let stream_id = stream_id.as_u32().ok_or(IggyError::FeatureUnavailable)?;
+    let topic_id = topic_id.as_u32().ok_or(IggyError::FeatureUnavailable)?;
+    let partition_id = partition_id.ok_or(IggyError::FeatureUnavailable)?;
+    Ok(IggyNamespace::new(stream_id as usize, topic_id as usize, partition_id 
as usize).inner())
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::session::ConsensusSession;
+    use iggy_binary_protocol::codes::{CREATE_STREAM_CODE, GET_STREAM_CODE};
+    use iggy_binary_protocol::consensus::{Message, RequestHeader, 
iobuf::Owned};
+    use iggy_binary_protocol::requests::streams::CreateStreamRequest;
+    use iggy_binary_protocol::requests::users::LoginRegisterRequest;
+    use iggy_binary_protocol::{WireEncode, WireName};
+    use secrecy::SecretString;
+
+    fn decode_request(bytes: &Bytes) -> Message<RequestHeader> {
+        
Message::try_from(Owned::<4096>::copy_from_slice(bytes.as_ref())).unwrap()
+    }
+
+    #[test]
+    fn register_request_uses_zero_request_and_session() {
+        let mut session = ConsensusSession::with_client_id(7);
+        let request = LoginRegisterRequest {
+            client_id: 7,
+            username: WireName::new("admin").unwrap(),
+            password: SecretString::from("secret"),
+            version: None,
+            client_context: None,
+        };
+
+        let bytes = encode_request(&mut session, LOGIN_REGISTER_CODE, 
&request.to_bytes()).unwrap();
+        let request = decode_request(&bytes);
+        let header = request.header();
+
+        assert_eq!(header.operation, Operation::Register);
+        assert_eq!(header.request, 0);
+        assert_eq!(header.session, 0);
+        assert_eq!(header.client, 7);
+        assert_eq!(header.namespace, 0);
+    }
+
+    #[test]
+    fn replicated_request_increments_request_counter() {
+        let mut session = ConsensusSession::with_client_id(42);
+        let _ = session.register_request_id();
+        session.bind(99);
+        let payload = CreateStreamRequest {
+            name: WireName::new("stream").unwrap(),
+        }
+        .to_bytes();
+
+        let first = encode_request(&mut session, CREATE_STREAM_CODE, 
&payload).unwrap();
+        let second = encode_request(&mut session, CREATE_STREAM_CODE, 
&payload).unwrap();
+
+        assert_eq!(decode_request(&first).header().request, 1);
+        assert_eq!(decode_request(&second).header().request, 2);
+        assert_eq!(decode_request(&second).header().session, 99);
+        assert_eq!(decode_request(&second).header().namespace, 0);
+    }
+
+    #[test]
+    fn unsupported_non_replicated_request_is_rejected() {
+        let mut session = ConsensusSession::new();
+        assert!(matches!(
+            encode_request(&mut session, GET_STREAM_CODE, &Bytes::new()),
+            Err(IggyError::FeatureUnavailable)
+        ));
+    }
+}
diff --git a/core/sdk/src/websocket/websocket_client.rs 
b/core/sdk/src/websocket/websocket_client.rs
index 5dd187a88..84a849066 100644
--- a/core/sdk/src/websocket/websocket_client.rs
+++ b/core/sdk/src/websocket/websocket_client.rs
@@ -17,6 +17,8 @@
  */
 
 use crate::leader_aware::{LeaderRedirectionState, 
check_and_redirect_to_leader};
+#[cfg(feature = "vsr")]
+use crate::session::ConsensusSession;
 use crate::websocket::websocket_connection_stream::WebSocketConnectionStream;
 use crate::websocket::websocket_stream_kind::WebSocketStreamKind;
 use 
crate::websocket::websocket_tls_connection_stream::WebSocketTlsConnectionStream;
@@ -28,9 +30,10 @@ use async_trait::async_trait;
 use bytes::{BufMut, Bytes, BytesMut};
 use iggy_common::{
     AutoLogin, ClientState, ConnectionString, Credentials, DiagnosticEvent, 
IggyDuration,
-    IggyError, IggyErrorDiscriminants, IggyTimestamp, WebSocketClientConfig,
-    WebSocketConnectionStringOptions,
+    IggyError, IggyTimestamp, WebSocketClientConfig, 
WebSocketConnectionStringOptions,
 };
+#[cfg(not(feature = "vsr"))]
+use iggy_common::IggyErrorDiscriminants;
 use iggy_common::{BinaryClient, BinaryTransport, PersonalAccessTokenClient, 
UserClient};
 use secrecy::ExposeSecret;
 use std::net::SocketAddr;
@@ -44,7 +47,9 @@ use tokio_tungstenite::{
 };
 use tracing::{debug, error, info, trace, warn};
 
+#[cfg(not(feature = "vsr"))]
 const REQUEST_INITIAL_BYTES_LENGTH: usize = 4;
+#[cfg(not(feature = "vsr"))]
 const RESPONSE_INITIAL_BYTES_LENGTH: usize = 8;
 const NAME: &str = "WebSocket";
 
@@ -58,6 +63,8 @@ pub struct WebSocketClient {
     pub(crate) connected_at: Mutex<Option<IggyTimestamp>>,
     leader_redirection_state: Mutex<LeaderRedirectionState>,
     pub(crate) current_server_address: Mutex<String>,
+    #[cfg(feature = "vsr")]
+    consensus_session: Arc<Mutex<ConsensusSession>>,
 }
 
 impl Default for WebSocketClient {
@@ -145,6 +152,32 @@ impl BinaryTransport for WebSocketClient {
     fn get_heartbeat_interval(&self) -> IggyDuration {
         self.config.heartbeat_interval
     }
+
+    #[cfg(feature = "vsr")]
+    async fn get_vsr_client_id(&self) -> Result<u128, IggyError> {
+        Ok(self.consensus_session.lock().await.client_id())
+    }
+
+    #[cfg(feature = "vsr")]
+    async fn bind_vsr_session(&self, session: u64) -> Result<(), IggyError> {
+        if session == 0 {
+            return Err(IggyError::InvalidConfiguration);
+        }
+
+        let mut consensus_session = self.consensus_session.lock().await;
+        if consensus_session.is_bound() {
+            return Err(IggyError::InvalidConfiguration);
+        }
+
+        consensus_session.bind(session);
+        Ok(())
+    }
+
+    #[cfg(feature = "vsr")]
+    async fn reset_vsr_session(&self) -> Result<(), IggyError> {
+        *self.consensus_session.lock().await = ConsensusSession::new();
+        Ok(())
+    }
 }
 
 impl BinaryClient for WebSocketClient {}
@@ -163,6 +196,8 @@ impl WebSocketClient {
             connected_at: Mutex::new(None),
             leader_redirection_state: 
Mutex::new(LeaderRedirectionState::new()),
             current_server_address: Mutex::new(server_address),
+            #[cfg(feature = "vsr")]
+            consensus_session: Arc::new(Mutex::new(ConsensusSession::new())),
         })
     }
 
@@ -520,6 +555,8 @@ impl WebSocketClient {
         self.set_state(ClientState::Disconnected).await;
 
         self.stream.lock().await.take();
+        #[cfg(feature = "vsr")]
+        self.reset_vsr_session().await?;
 
         self.publish_event(DiagnosticEvent::Disconnected).await;
         let now = IggyTimestamp::now();
@@ -542,6 +579,8 @@ impl WebSocketClient {
             let _ = stream.shutdown().await;
         }
 
+        #[cfg(feature = "vsr")]
+        self.reset_vsr_session().await?;
         self.set_state(ClientState::Shutdown).await;
         self.publish_event(DiagnosticEvent::Shutdown).await;
         info!("{NAME} client: {client_address} has been shutdown.");
@@ -571,10 +610,20 @@ impl WebSocketClient {
             IggyError::NotConnected
         })?;
 
+        #[cfg(feature = "vsr")]
+        let request = {
+            let mut consensus_session = self.consensus_session.lock().await;
+            crate::vsr::encode_request(&mut consensus_session, code, &payload)?
+        };
+        #[cfg(not(feature = "vsr"))]
         let payload_length = payload.len() + REQUEST_INITIAL_BYTES_LENGTH;
+        #[cfg(not(feature = "vsr"))]
         let mut request = BytesMut::with_capacity(4 + 
REQUEST_INITIAL_BYTES_LENGTH + payload.len());
+        #[cfg(not(feature = "vsr"))]
         request.put_u32_le(payload_length as u32);
+        #[cfg(not(feature = "vsr"))]
         request.put_u32_le(code);
+        #[cfg(not(feature = "vsr"))]
         request.put_slice(&payload);
 
         trace!(
@@ -586,61 +635,82 @@ impl WebSocketClient {
         stream.write(&request).await?;
         stream.flush().await?;
 
-        let mut response_initial_buffer = vec![0u8; 
RESPONSE_INITIAL_BYTES_LENGTH];
-        stream.read(&mut response_initial_buffer).await?;
-
-        let status = u32::from_le_bytes([
-            response_initial_buffer[0],
-            response_initial_buffer[1],
-            response_initial_buffer[2],
-            response_initial_buffer[3],
-        ]);
+        #[cfg(feature = "vsr")]
+        {
+            let mut response_header = vec![0u8; 
iggy_binary_protocol::HEADER_SIZE];
+            stream.read(&mut response_header).await?;
 
-        let length = u32::from_le_bytes([
-            response_initial_buffer[4],
-            response_initial_buffer[5],
-            response_initial_buffer[6],
-            response_initial_buffer[7],
-        ]) as usize;
+            let response_size = crate::vsr::response_size(&response_header)?;
+            let mut response = BytesMut::with_capacity(response_size);
+            response.put_slice(&response_header);
 
-        trace!(
-            "Received {NAME} response status: {}, length: {} bytes",
-            status, length
-        );
-
-        if status != 0 {
-            // TEMP: See https://github.com/apache/iggy/pull/604 for context.
-            if status == IggyErrorDiscriminants::TopicNameAlreadyExists as u32
-                || status == IggyErrorDiscriminants::StreamNameAlreadyExists 
as u32
-                || status == IggyErrorDiscriminants::UserAlreadyExists as u32
-                || status == 
IggyErrorDiscriminants::PersonalAccessTokenAlreadyExists as u32
-                || status == 
IggyErrorDiscriminants::ConsumerGroupNameAlreadyExists as u32
-            {
-                debug!(
-                    "Received a server resource already exists response: {} 
({})",
-                    status,
-                    IggyError::from_code_as_string(status)
-                )
-            } else {
-                error!(
-                    "Received an invalid response with status: {} ({}).",
-                    status,
-                    IggyError::from_code_as_string(status),
-                );
+            if response_size > iggy_binary_protocol::HEADER_SIZE {
+                let mut response_body = vec![0u8; response_size - 
iggy_binary_protocol::HEADER_SIZE];
+                stream.read(&mut response_body).await?;
+                response.put_slice(&response_body);
             }
 
-            return Err(IggyError::from_code(status));
+            return crate::vsr::decode_response(&response.freeze());
         }
 
-        if length == 0 {
-            return Ok(Bytes::new());
-        }
+        #[cfg(not(feature = "vsr"))]
+        {
+            let mut response_initial_buffer = vec![0u8; 
RESPONSE_INITIAL_BYTES_LENGTH];
+            stream.read(&mut response_initial_buffer).await?;
+
+            let status = u32::from_le_bytes([
+                response_initial_buffer[0],
+                response_initial_buffer[1],
+                response_initial_buffer[2],
+                response_initial_buffer[3],
+            ]);
+
+            let length = u32::from_le_bytes([
+                response_initial_buffer[4],
+                response_initial_buffer[5],
+                response_initial_buffer[6],
+                response_initial_buffer[7],
+            ]) as usize;
+
+            trace!(
+                "Received {NAME} response status: {}, length: {} bytes",
+                status, length
+            );
+
+            if status != 0 {
+                // TEMP: See https://github.com/apache/iggy/pull/604 for 
context.
+                if status == IggyErrorDiscriminants::TopicNameAlreadyExists as 
u32
+                    || status == 
IggyErrorDiscriminants::StreamNameAlreadyExists as u32
+                    || status == IggyErrorDiscriminants::UserAlreadyExists as 
u32
+                    || status == 
IggyErrorDiscriminants::PersonalAccessTokenAlreadyExists as u32
+                    || status == 
IggyErrorDiscriminants::ConsumerGroupNameAlreadyExists as u32
+                {
+                    debug!(
+                        "Received a server resource already exists response: 
{} ({})",
+                        status,
+                        IggyError::from_code_as_string(status)
+                    )
+                } else {
+                    error!(
+                        "Received an invalid response with status: {} ({}).",
+                        status,
+                        IggyError::from_code_as_string(status),
+                    );
+                }
+
+                return Err(IggyError::from_code(status));
+            }
+
+            if length == 0 {
+                return Ok(Bytes::new());
+            }
 
-        let mut response_buffer = vec![0u8; length];
-        stream.read(&mut response_buffer).await?;
+            let mut response_buffer = vec![0u8; length];
+            stream.read(&mut response_buffer).await?;
 
-        trace!("Received {NAME} response payload, size: {} bytes", length);
-        Ok(Bytes::from(response_buffer))
+            trace!("Received {NAME} response payload, size: {} bytes", length);
+            Ok(Bytes::from(response_buffer))
+        }
     }
 }
 

Reply via email to