This is an automated email from the ASF dual-hosted git repository. numinnex pushed a commit to branch integration_tests in repository https://gitbox.apache.org/repos/asf/iggy.git
commit ac96fb6910ac3ae11d51a2e34620392952cb3e08 Author: numinex <[email protected]> AuthorDate: Wed May 13 16:35:09 2026 +0200 progress --- Cargo.lock | 12 + Cargo.toml | 1 + core/common/Cargo.toml | 3 + .../traits/binary_impls/personal_access_tokens.rs | 65 +++++- core/common/src/traits/binary_impls/users.rs | 76 +++++- core/common/src/traits/binary_transport.rs | 15 ++ core/integration-vsr/Cargo.toml | 30 +++ .../sdk/mod.rs => integration-vsr/src/lib.rs} | 3 - .../sdk => integration-vsr/tests}/hello_world.rs | 7 +- core/integration/tests/sdk/mod.rs | 1 - core/sdk/Cargo.toml | 5 + core/sdk/src/lib.rs | 2 + core/sdk/src/quic/quic_client.rs | 111 +++++++-- core/sdk/src/tcp/tcp_client.rs | 150 ++++++++++-- core/sdk/src/vsr.rs | 255 +++++++++++++++++++++ core/sdk/src/websocket/websocket_client.rs | 168 ++++++++++---- 16 files changed, 795 insertions(+), 109 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index fe4dfa0cf..0a6290bd8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6300,11 +6300,13 @@ dependencies = [ "async-dropper", "async-trait", "bon", + "bytemuck", "bytes", "dashmap", "flume 0.12.0", "futures", "futures-util", + "iggy_binary_protocol", "iggy_common", "mockall", "quinn", @@ -7080,6 +7082,16 @@ dependencies = [ "zip 8.6.0", ] +[[package]] +name = "integration-vsr" +version = "0.0.1" +dependencies = [ + "iggy", + "integration", + "serial_test", + "tokio", +] + [[package]] name = "interpolate_name" version = "0.2.4" diff --git a/Cargo.toml b/Cargo.toml index 6c8cb8434..5b06d5f05 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -49,6 +49,7 @@ members = [ "core/consensus", "core/harness_derive", "core/integration", + "core/integration-vsr", "core/journal", "core/message_bus", "core/metadata", diff --git a/core/common/Cargo.toml b/core/common/Cargo.toml index cb5aabe61..8b00608fd 100644 --- a/core/common/Cargo.toml +++ b/core/common/Cargo.toml @@ -27,6 +27,9 @@ documentation = "https://iggy.apache.org/docs" repository = "https://github.com/apache/iggy" readme = "README.md" +[features] +vsr = [] + [dependencies] aes-gcm = { workspace = true } aligned-vec = { workspace = true } diff --git a/core/common/src/traits/binary_impls/personal_access_tokens.rs b/core/common/src/traits/binary_impls/personal_access_tokens.rs index f85f34d36..4c740d79d 100644 --- a/core/common/src/traits/binary_impls/personal_access_tokens.rs +++ b/core/common/src/traits/binary_impls/personal_access_tokens.rs @@ -27,15 +27,27 @@ use iggy_binary_protocol::WireName; use iggy_binary_protocol::codec::WireEncode; use iggy_binary_protocol::codes::{ CREATE_PERSONAL_ACCESS_TOKEN_CODE, DELETE_PERSONAL_ACCESS_TOKEN_CODE, - GET_PERSONAL_ACCESS_TOKENS_CODE, LOGIN_WITH_PERSONAL_ACCESS_TOKEN_CODE, + GET_PERSONAL_ACCESS_TOKENS_CODE, }; +#[cfg(feature = "vsr")] +use iggy_binary_protocol::codes::LOGIN_REGISTER_WITH_PAT_CODE; +#[cfg(not(feature = "vsr"))] +use iggy_binary_protocol::codes::LOGIN_WITH_PERSONAL_ACCESS_TOKEN_CODE; use iggy_binary_protocol::requests::personal_access_tokens::{ - CreatePersonalAccessTokenRequest, DeletePersonalAccessTokenRequest, - GetPersonalAccessTokensRequest, LoginWithPersonalAccessTokenRequest, + CreatePersonalAccessTokenRequest, DeletePersonalAccessTokenRequest, GetPersonalAccessTokensRequest, }; +#[cfg(not(feature = "vsr"))] +use iggy_binary_protocol::requests::personal_access_tokens::LoginWithPersonalAccessTokenRequest; +#[cfg(feature = "vsr")] +use iggy_binary_protocol::requests::users::LoginRegisterWithPatRequest; use iggy_binary_protocol::responses::personal_access_tokens::create_personal_access_token::RawPersonalAccessTokenResponse; use iggy_binary_protocol::responses::personal_access_tokens::get_personal_access_tokens::GetPersonalAccessTokensResponse; +#[cfg(not(feature = "vsr"))] use iggy_binary_protocol::responses::users::login_user::IdentityResponse; +#[cfg(feature = "vsr")] +use iggy_binary_protocol::responses::users::LoginRegisterResponse; +#[cfg(feature = "vsr")] +use secrecy::SecretString; #[async_trait::async_trait] impl<B: BinaryClient> PersonalAccessTokenClient for B { @@ -90,16 +102,63 @@ impl<B: BinaryClient> PersonalAccessTokenClient for B { &self, token: &str, ) -> Result<IdentityInfo, IggyError> { + #[cfg(feature = "vsr")] + { + let client_id = self.get_vsr_client_id().await?; + let response = match self + .send_raw_with_response( + LOGIN_REGISTER_WITH_PAT_CODE, + LoginRegisterWithPatRequest { + client_id, + token: SecretString::from(token.to_string()), + version: Some(env!("CARGO_PKG_VERSION").to_string()), + client_context: Some(String::new()), + } + .to_bytes(), + ) + .await + { + Ok(response) => response, + Err(error) => { + self.reset_vsr_session().await?; + return Err(error); + } + }; + let wire_resp = match super::decode_response::<LoginRegisterResponse>(&response) { + Ok(wire_resp) => wire_resp, + Err(error) => { + self.reset_vsr_session().await?; + return Err(error); + } + }; + if let Err(error) = self.bind_vsr_session(wire_resp.session).await { + self.reset_vsr_session().await?; + return Err(error); + } + self.set_state(ClientState::Authenticated).await; + self.publish_event(DiagnosticEvent::SignedIn).await; + return Ok(IdentityInfo { + user_id: wire_resp.user_id, + access_token: None, + }); + } + + #[cfg(not(feature = "vsr"))] let wire_token = WireName::new(token).map_err(|_| IggyError::InvalidFormat)?; + #[cfg(not(feature = "vsr"))] let response = self .send_raw_with_response( LOGIN_WITH_PERSONAL_ACCESS_TOKEN_CODE, LoginWithPersonalAccessTokenRequest { token: wire_token }.to_bytes(), ) .await?; + #[cfg(not(feature = "vsr"))] self.set_state(ClientState::Authenticated).await; + #[cfg(not(feature = "vsr"))] self.publish_event(DiagnosticEvent::SignedIn).await; + #[cfg(not(feature = "vsr"))] let wire_resp = super::decode_response::<IdentityResponse>(&response)?; + #[cfg(not(feature = "vsr"))] Ok(IdentityInfo::from(wire_resp)) } } diff --git a/core/common/src/traits/binary_impls/users.rs b/core/common/src/traits/binary_impls/users.rs index 24a877a4c..183d942ca 100644 --- a/core/common/src/traits/binary_impls/users.rs +++ b/core/common/src/traits/binary_impls/users.rs @@ -26,14 +26,27 @@ use iggy_binary_protocol::WireName; use iggy_binary_protocol::codec::WireEncode; use iggy_binary_protocol::codes::{ CHANGE_PASSWORD_CODE, CREATE_USER_CODE, DELETE_USER_CODE, GET_USER_CODE, GET_USERS_CODE, - LOGIN_USER_CODE, LOGOUT_USER_CODE, UPDATE_PERMISSIONS_CODE, UPDATE_USER_CODE, + UPDATE_PERMISSIONS_CODE, UPDATE_USER_CODE, }; +#[cfg(feature = "vsr")] +use iggy_binary_protocol::codes::LOGIN_REGISTER_CODE; +#[cfg(not(feature = "vsr"))] +use iggy_binary_protocol::codes::{LOGIN_USER_CODE, LOGOUT_USER_CODE}; use iggy_binary_protocol::requests::users::{ ChangePasswordRequest, CreateUserRequest, DeleteUserRequest, GetUserRequest, GetUsersRequest, - LoginUserRequest, LogoutUserRequest, UpdatePermissionsRequest, UpdateUserRequest, + UpdatePermissionsRequest, UpdateUserRequest, }; +#[cfg(feature = "vsr")] +use iggy_binary_protocol::requests::users::LoginRegisterRequest; +#[cfg(not(feature = "vsr"))] +use iggy_binary_protocol::requests::users::{LoginUserRequest, LogoutUserRequest}; +#[cfg(feature = "vsr")] +use iggy_binary_protocol::responses::users::LoginRegisterResponse; +#[cfg(not(feature = "vsr"))] use iggy_binary_protocol::responses::users::login_user::IdentityResponse; use iggy_binary_protocol::responses::users::{GetUsersResponse, UserDetailsResponse}; +#[cfg(feature = "vsr")] +use secrecy::SecretString; #[async_trait::async_trait] impl<B: BinaryClient> UserClient for B { @@ -169,7 +182,52 @@ impl<B: BinaryClient> UserClient for B { } async fn login_user(&self, username: &str, password: &str) -> Result<IdentityInfo, IggyError> { + #[cfg(feature = "vsr")] + { + let wire_name = WireName::new(username).map_err(|_| IggyError::InvalidFormat)?; + let client_id = self.get_vsr_client_id().await?; + let response = match self + .send_raw_with_response( + LOGIN_REGISTER_CODE, + LoginRegisterRequest { + client_id, + username: wire_name, + password: SecretString::from(password.to_string()), + version: Some(env!("CARGO_PKG_VERSION").to_string()), + client_context: Some(String::new()), + } + .to_bytes(), + ) + .await + { + Ok(response) => response, + Err(error) => { + self.reset_vsr_session().await?; + return Err(error); + } + }; + let wire_resp = match super::decode_response::<LoginRegisterResponse>(&response) { + Ok(wire_resp) => wire_resp, + Err(error) => { + self.reset_vsr_session().await?; + return Err(error); + } + }; + if let Err(error) = self.bind_vsr_session(wire_resp.session).await { + self.reset_vsr_session().await?; + return Err(error); + } + self.set_state(ClientState::Authenticated).await; + self.publish_event(DiagnosticEvent::SignedIn).await; + return Ok(IdentityInfo { + user_id: wire_resp.user_id, + access_token: None, + }); + } + + #[cfg(not(feature = "vsr"))] let wire_name = WireName::new(username).map_err(|_| IggyError::InvalidFormat)?; + #[cfg(not(feature = "vsr"))] let response = self .send_raw_with_response( LOGIN_USER_CODE, @@ -182,18 +240,32 @@ impl<B: BinaryClient> UserClient for B { .to_bytes(), ) .await?; + #[cfg(not(feature = "vsr"))] self.set_state(ClientState::Authenticated).await; + #[cfg(not(feature = "vsr"))] self.publish_event(DiagnosticEvent::SignedIn).await; + #[cfg(not(feature = "vsr"))] let wire_resp = super::decode_response::<IdentityResponse>(&response)?; + #[cfg(not(feature = "vsr"))] Ok(IdentityInfo::from(wire_resp)) } async fn logout_user(&self) -> Result<(), IggyError> { + #[cfg(feature = "vsr")] + { + return Err(IggyError::FeatureUnavailable); + } + + #[cfg(not(feature = "vsr"))] fail_if_not_authenticated(self).await?; + #[cfg(not(feature = "vsr"))] self.send_raw_with_response(LOGOUT_USER_CODE, LogoutUserRequest.to_bytes()) .await?; + #[cfg(not(feature = "vsr"))] self.set_state(ClientState::Connected).await; + #[cfg(not(feature = "vsr"))] self.publish_event(DiagnosticEvent::SignedOut).await; + #[cfg(not(feature = "vsr"))] Ok(()) } } diff --git a/core/common/src/traits/binary_transport.rs b/core/common/src/traits/binary_transport.rs index 4f6af3ba8..99dd73393 100644 --- a/core/common/src/traits/binary_transport.rs +++ b/core/common/src/traits/binary_transport.rs @@ -29,4 +29,19 @@ pub trait BinaryTransport { async fn publish_event(&self, event: DiagnosticEvent); async fn send_raw_with_response(&self, code: u32, payload: Bytes) -> Result<Bytes, IggyError>; fn get_heartbeat_interval(&self) -> IggyDuration; + + #[cfg(feature = "vsr")] + async fn get_vsr_client_id(&self) -> Result<u128, IggyError> { + Err(IggyError::FeatureUnavailable) + } + + #[cfg(feature = "vsr")] + async fn bind_vsr_session(&self, _session: u64) -> Result<(), IggyError> { + Err(IggyError::FeatureUnavailable) + } + + #[cfg(feature = "vsr")] + async fn reset_vsr_session(&self) -> Result<(), IggyError> { + Err(IggyError::FeatureUnavailable) + } } diff --git a/core/integration-vsr/Cargo.toml b/core/integration-vsr/Cargo.toml new file mode 100644 index 000000000..4f956b2b9 --- /dev/null +++ b/core/integration-vsr/Cargo.toml @@ -0,0 +1,30 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "integration-vsr" +version = "0.0.1" +edition = "2024" +license = "Apache-2.0" +publish = false + +[dependencies] +iggy = { workspace = true, features = ["vsr"] } +integration = { path = "../integration" } +serial_test = { workspace = true } +tokio = { workspace = true, features = ["full", "test-util"] } + diff --git a/core/integration/tests/sdk/mod.rs b/core/integration-vsr/src/lib.rs similarity index 96% copy from core/integration/tests/sdk/mod.rs copy to core/integration-vsr/src/lib.rs index 05acd11c6..31bd66e6e 100644 --- a/core/integration/tests/sdk/mod.rs +++ b/core/integration-vsr/src/lib.rs @@ -15,6 +15,3 @@ * specific language governing permissions and limitations * under the License. */ - -mod hello_world; -mod producer; diff --git a/core/integration/tests/sdk/hello_world.rs b/core/integration-vsr/tests/hello_world.rs similarity index 86% rename from core/integration/tests/sdk/hello_world.rs rename to core/integration-vsr/tests/hello_world.rs index 42d5a25ba..bcb94f461 100644 --- a/core/integration/tests/sdk/hello_world.rs +++ b/core/integration-vsr/tests/hello_world.rs @@ -24,6 +24,9 @@ use integration::iggy_harness; server(executable_path = "iggy-server-ng") )] async fn hello_world(harness: &TestHarness) { - let client = harness.root_client().await.unwrap(); - client.ping().await.unwrap(); + let client = harness.new_client().await.unwrap(); + client + .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD) + .await + .unwrap(); } diff --git a/core/integration/tests/sdk/mod.rs b/core/integration/tests/sdk/mod.rs index 05acd11c6..6d94bfa61 100644 --- a/core/integration/tests/sdk/mod.rs +++ b/core/integration/tests/sdk/mod.rs @@ -16,5 +16,4 @@ * under the License. */ -mod hello_world; mod producer; diff --git a/core/sdk/Cargo.toml b/core/sdk/Cargo.toml index 2056ae56d..091779e22 100644 --- a/core/sdk/Cargo.toml +++ b/core/sdk/Cargo.toml @@ -28,16 +28,21 @@ documentation = "https://iggy.apache.org/docs" repository = "https://github.com/apache/iggy" readme = "README.md" +[features] +vsr = ["iggy_common/vsr"] + [dependencies] async-broadcast = { workspace = true } async-dropper = { workspace = true } async-trait = { workspace = true } bon = { workspace = true } +bytemuck = { workspace = true } bytes = { workspace = true } dashmap = { workspace = true } flume = { workspace = true } futures = { workspace = true } futures-util = { workspace = true } +iggy_binary_protocol = { workspace = true } iggy_common = { workspace = true } quinn = { workspace = true } reqwest = { workspace = true } diff --git a/core/sdk/src/lib.rs b/core/sdk/src/lib.rs index 35f0e1b73..9e738728e 100644 --- a/core/sdk/src/lib.rs +++ b/core/sdk/src/lib.rs @@ -28,4 +28,6 @@ pub mod quic; pub mod session; pub mod stream_builder; pub mod tcp; +#[cfg(feature = "vsr")] +mod vsr; pub mod websocket; diff --git a/core/sdk/src/quic/quic_client.rs b/core/sdk/src/quic/quic_client.rs index 0824e962a..7217bc319 100644 --- a/core/sdk/src/quic/quic_client.rs +++ b/core/sdk/src/quic/quic_client.rs @@ -18,6 +18,8 @@ use crate::leader_aware::{LeaderRedirectionState, check_and_redirect_to_leader}; use crate::prelude::AutoLogin; +#[cfg(feature = "vsr")] +use crate::session::ConsensusSession; use iggy_common::{BinaryClient, BinaryTransport, Client, PersonalAccessTokenClient, UserClient}; use crate::prelude::{IggyDuration, IggyError, IggyTimestamp, QuicClientConfig}; @@ -41,7 +43,9 @@ use tokio::sync::Mutex; use tokio::time::sleep; use tracing::{error, info, trace, warn}; +#[cfg(not(feature = "vsr"))] const REQUEST_INITIAL_BYTES_LENGTH: usize = 4; +#[cfg(not(feature = "vsr"))] const RESPONSE_INITIAL_BYTES_LENGTH: usize = 8; const NAME: &str = "Iggy"; @@ -56,6 +60,8 @@ pub struct QuicClient { pub(crate) connected_at: Mutex<Option<IggyTimestamp>>, leader_redirection_state: Mutex<LeaderRedirectionState>, pub(crate) current_server_address: Mutex<String>, + #[cfg(feature = "vsr")] + consensus_session: Arc<Mutex<ConsensusSession>>, } unsafe impl Send for QuicClient {} @@ -139,6 +145,32 @@ impl BinaryTransport for QuicClient { fn get_heartbeat_interval(&self) -> IggyDuration { self.config.heartbeat_interval } + + #[cfg(feature = "vsr")] + async fn get_vsr_client_id(&self) -> Result<u128, IggyError> { + Ok(self.consensus_session.lock().await.client_id()) + } + + #[cfg(feature = "vsr")] + async fn bind_vsr_session(&self, session: u64) -> Result<(), IggyError> { + if session == 0 { + return Err(IggyError::InvalidConfiguration); + } + + let mut consensus_session = self.consensus_session.lock().await; + if consensus_session.is_bound() { + return Err(IggyError::InvalidConfiguration); + } + + consensus_session.bind(session); + Ok(()) + } + + #[cfg(feature = "vsr")] + async fn reset_vsr_session(&self) -> Result<(), IggyError> { + *self.consensus_session.lock().await = ConsensusSession::new(); + Ok(()) + } } impl BinaryClient for QuicClient {} @@ -205,6 +237,8 @@ impl QuicClient { connected_at: Mutex::new(None), leader_redirection_state: Mutex::new(LeaderRedirectionState::new()), current_server_address: Mutex::new(server_address), + #[cfg(feature = "vsr")] + consensus_session: Arc::new(Mutex::new(ConsensusSession::new())), }) } @@ -234,34 +268,43 @@ impl QuicClient { return Err(IggyError::EmptyResponse); } - let status = u32::from_le_bytes( - buffer[..4] - .try_into() - .map_err(|_| IggyError::InvalidNumberEncoding)?, - ); - if status != 0 { - error!( - "Received an invalid response with status: {} ({}).", - status, - IggyError::from_code_as_string(status) + #[cfg(feature = "vsr")] + { + return crate::vsr::decode_response(&buffer); + } + + #[cfg(not(feature = "vsr"))] + { + let status = u32::from_le_bytes( + buffer[..4] + .try_into() + .map_err(|_| IggyError::InvalidNumberEncoding)?, ); + if status != 0 { + error!( + "Received an invalid response with status: {} ({}).", + status, + IggyError::from_code_as_string(status) + ); - return Err(IggyError::from_code(status)); - } + return Err(IggyError::from_code(status)); + } - let length = u32::from_le_bytes( - buffer[4..RESPONSE_INITIAL_BYTES_LENGTH] - .try_into() - .map_err(|_| IggyError::InvalidNumberEncoding)?, - ); - trace!("Status: OK. Response length: {}", length); - if length <= 1 { - return Ok(Bytes::new()); - } + let length = u32::from_le_bytes( + buffer[4..RESPONSE_INITIAL_BYTES_LENGTH] + .try_into() + .map_err(|_| IggyError::InvalidNumberEncoding)?, + ); + trace!("Status: OK. Response length: {}", length); + if length <= 1 { + return Ok(Bytes::new()); + } - Ok(Bytes::copy_from_slice( - &buffer[RESPONSE_INITIAL_BYTES_LENGTH..RESPONSE_INITIAL_BYTES_LENGTH + length as usize], - )) + Ok(Bytes::copy_from_slice( + &buffer[RESPONSE_INITIAL_BYTES_LENGTH + ..RESPONSE_INITIAL_BYTES_LENGTH + length as usize], + )) + } } async fn connect(&self) -> Result<(), IggyError> { @@ -469,6 +512,8 @@ impl QuicClient { } self.endpoint.wait_idle().await; + #[cfg(feature = "vsr")] + self.reset_vsr_session().await?; self.set_state(ClientState::Shutdown).await; self.publish_event(DiagnosticEvent::Shutdown).await; info!("{NAME} QUIC client has been shutdown."); @@ -487,6 +532,8 @@ impl QuicClient { self.set_state(ClientState::Disconnected).await; self.connection.lock().await.take(); self.endpoint.wait_idle().await; + #[cfg(feature = "vsr")] + self.reset_vsr_session().await?; self.publish_event(DiagnosticEvent::Disconnected).await; let now = IggyTimestamp::now(); info!( @@ -521,26 +568,42 @@ impl QuicClient { let connection = self.connection.clone(); let response_buffer_size = self.config.response_buffer_size; + #[cfg(feature = "vsr")] + let consensus_session = self.consensus_session.clone(); // SAFETY: we run code holding the `connection` lock in a task so we can't be cancelled while holding the lock. tokio::spawn(async move { let connection = connection.lock().await; if let Some(connection) = connection.as_ref() { + #[cfg(feature = "vsr")] + let request = { + let mut consensus_session = consensus_session.lock().await; + crate::vsr::encode_request(&mut consensus_session, code, &payload)? + }; + #[cfg(not(feature = "vsr"))] let payload_length = payload.len() + REQUEST_INITIAL_BYTES_LENGTH; let (mut send, mut recv) = connection.open_bi().await.map_err(|error| { error!("Failed to open a bidirectional stream: {error}"); IggyError::QuicError })?; trace!("Sending a QUIC request with code: {code}"); + #[cfg(feature = "vsr")] + send.write_all(&request).await.map_err(|error| { + error!("Failed to write VSR request: {error}"); + IggyError::QuicError + })?; + #[cfg(not(feature = "vsr"))] send.write_all(&(payload_length as u32).to_le_bytes()) .await .map_err(|error| { error!("Failed to write payload length: {error}"); IggyError::QuicError })?; + #[cfg(not(feature = "vsr"))] send.write_all(&code.to_le_bytes()).await.map_err(|error| { error!("Failed to write payload code: {error}"); IggyError::QuicError })?; + #[cfg(not(feature = "vsr"))] send.write_all(&payload).await.map_err(|error| { error!("Failed to write payload: {error}"); IggyError::QuicError diff --git a/core/sdk/src/tcp/tcp_client.rs b/core/sdk/src/tcp/tcp_client.rs index 059ca4d02..5a1a1502e 100644 --- a/core/sdk/src/tcp/tcp_client.rs +++ b/core/sdk/src/tcp/tcp_client.rs @@ -19,6 +19,8 @@ use crate::leader_aware::{LeaderRedirectionState, check_and_redirect_to_leader}; use crate::prelude::Client; use crate::prelude::TcpClientConfig; +#[cfg(feature = "vsr")] +use crate::session::ConsensusSession; use crate::tcp::tcp_connection_stream::TcpConnectionStream; use crate::tcp::tcp_connection_stream_kind::ConnectionStreamKind; use crate::tcp::tcp_tls_connection_stream::TcpTlsConnectionStream; @@ -27,9 +29,10 @@ use async_trait::async_trait; use bytes::{BufMut, Bytes, BytesMut}; use iggy_common::{ AutoLogin, ClientState, ConnectionString, ConnectionStringUtils, Credentials, DiagnosticEvent, - IggyDuration, IggyError, IggyErrorDiscriminants, IggyTimestamp, TcpConnectionStringOptions, - TransportProtocol, + IggyDuration, IggyError, IggyTimestamp, TcpConnectionStringOptions, TransportProtocol, }; +#[cfg(not(feature = "vsr"))] +use iggy_common::IggyErrorDiscriminants; use iggy_common::{BinaryClient, BinaryTransport, PersonalAccessTokenClient, UserClient}; use rustls::pki_types::{CertificateDer, ServerName, pem::PemObject}; use secrecy::ExposeSecret; @@ -42,7 +45,9 @@ use tokio::time::sleep; use tokio_rustls::{TlsConnector, TlsStream}; use tracing::{error, info, trace, warn}; +#[cfg(not(feature = "vsr"))] const REQUEST_INITIAL_BYTES_LENGTH: usize = 4; +#[cfg(not(feature = "vsr"))] const RESPONSE_INITIAL_BYTES_LENGTH: usize = 8; const NAME: &str = "Iggy"; @@ -58,6 +63,8 @@ pub struct TcpClient { pub(crate) connected_at: Mutex<Option<IggyTimestamp>>, leader_redirection_state: Mutex<LeaderRedirectionState>, pub(crate) current_server_address: Mutex<String>, + #[cfg(feature = "vsr")] + consensus_session: Arc<Mutex<ConsensusSession>>, } impl Default for TcpClient { @@ -144,6 +151,32 @@ impl BinaryTransport for TcpClient { fn get_heartbeat_interval(&self) -> IggyDuration { self.config.heartbeat_interval } + + #[cfg(feature = "vsr")] + async fn get_vsr_client_id(&self) -> Result<u128, IggyError> { + Ok(self.consensus_session.lock().await.client_id()) + } + + #[cfg(feature = "vsr")] + async fn bind_vsr_session(&self, session: u64) -> Result<(), IggyError> { + if session == 0 { + return Err(IggyError::InvalidConfiguration); + } + + let mut consensus_session = self.consensus_session.lock().await; + if consensus_session.is_bound() { + return Err(IggyError::InvalidConfiguration); + } + + consensus_session.bind(session); + Ok(()) + } + + #[cfg(feature = "vsr")] + async fn reset_vsr_session(&self) -> Result<(), IggyError> { + *self.consensus_session.lock().await = ConsensusSession::new(); + Ok(()) + } } impl BinaryClient for TcpClient {} @@ -203,9 +236,12 @@ impl TcpClient { connected_at: Mutex::new(None), leader_redirection_state: Mutex::new(LeaderRedirectionState::new()), current_server_address: Mutex::new(server_address), + #[cfg(feature = "vsr")] + consensus_session: Arc::new(Mutex::new(ConsensusSession::new())), }) } + #[cfg(not(feature = "vsr"))] async fn handle_response( status: u32, length: u32, @@ -510,6 +546,8 @@ impl TcpClient { info!("{NAME} client: {client_address} is disconnecting from server..."); self.set_state(ClientState::Disconnected).await; self.stream.lock().await.take(); + #[cfg(feature = "vsr")] + self.reset_vsr_session().await?; self.publish_event(DiagnosticEvent::Disconnected).await; let now = IggyTimestamp::now(); info!("{NAME} client: {client_address} has disconnected from server at: {now}."); @@ -527,6 +565,8 @@ impl TcpClient { if let Some(mut stream) = stream { stream.shutdown().await?; } + #[cfg(feature = "vsr")] + self.reset_vsr_session().await?; self.set_state(ClientState::Shutdown).await; self.publish_event(DiagnosticEvent::Shutdown).await; info!("{NAME} TCP client: {client_address} has been shutdown."); @@ -551,43 +591,103 @@ impl TcpClient { } let stream = self.stream.clone(); + #[cfg(feature = "vsr")] + let consensus_session = self.consensus_session.clone(); // SAFETY: we run code holding the `stream` lock in a task so we can't be cancelled while holding the lock. tokio::spawn(async move { let mut stream = stream.lock().await; if let Some(stream) = stream.as_mut() { + #[cfg(feature = "vsr")] + let request = { + let mut consensus_session = consensus_session.lock().await; + crate::vsr::encode_request(&mut consensus_session, code, &payload)? + }; + #[cfg(not(feature = "vsr"))] let payload_length = payload.len() + REQUEST_INITIAL_BYTES_LENGTH; + #[cfg(feature = "vsr")] + trace!( + "Sending a TCP VSR request of size {} with code: {code}", + request.len() + ); + #[cfg(not(feature = "vsr"))] trace!("Sending a TCP request of size {payload_length} with code: {code}"); + #[cfg(feature = "vsr")] + stream.write(&request).await?; + #[cfg(not(feature = "vsr"))] stream.write(&(payload_length as u32).to_le_bytes()).await?; + #[cfg(not(feature = "vsr"))] stream.write(&code.to_le_bytes()).await?; + #[cfg(not(feature = "vsr"))] stream.write(&payload).await?; stream.flush().await?; trace!("Sent a TCP request with code: {code}, waiting for a response..."); - let mut response_buffer = [0u8; RESPONSE_INITIAL_BYTES_LENGTH]; - let read_bytes = stream.read(&mut response_buffer).await.map_err(|error| { - error!( - "Failed to read response for TCP request with code: {code}: {error}", - code = code, - error = error - ); - IggyError::Disconnected - })?; + #[cfg(feature = "vsr")] + { + let mut response_header = [0u8; iggy_binary_protocol::HEADER_SIZE]; + let read_bytes = stream.read(&mut response_header).await.map_err(|error| { + error!( + "Failed to read VSR response header for TCP request with code: {code}: {error}", + code = code, + error = error + ); + IggyError::Disconnected + })?; + + if read_bytes != iggy_binary_protocol::HEADER_SIZE { + error!("Received an invalid or empty VSR response header."); + return Err(IggyError::EmptyResponse); + } + + let response_size = crate::vsr::response_size(&response_header)?; + let mut response = BytesMut::with_capacity(response_size); + response.put_slice(&response_header); + + if response_size > iggy_binary_protocol::HEADER_SIZE { + let body_size = response_size - iggy_binary_protocol::HEADER_SIZE; + let mut body = vec![0u8; body_size]; + stream.read(&mut body).await.map_err(|error| { + error!( + "Failed to read VSR response body for TCP request with code: {code}: {error}", + code = code, + error = error + ); + IggyError::Disconnected + })?; + response.put_slice(&body); + } - if read_bytes != RESPONSE_INITIAL_BYTES_LENGTH { - error!("Received an invalid or empty response."); - return Err(IggyError::EmptyResponse); + return crate::vsr::decode_response(&response.freeze()); } - let status = u32::from_le_bytes( - response_buffer[..4] - .try_into() - .map_err(|_| IggyError::InvalidNumberEncoding)?, - ); - let length = u32::from_le_bytes( - response_buffer[4..] - .try_into() - .map_err(|_| IggyError::InvalidNumberEncoding)?, - ); - return TcpClient::handle_response(status, length, stream).await; + #[cfg(not(feature = "vsr"))] + { + let mut response_buffer = [0u8; RESPONSE_INITIAL_BYTES_LENGTH]; + let read_bytes = stream.read(&mut response_buffer).await.map_err(|error| { + error!( + "Failed to read response for TCP request with code: {code}: {error}", + code = code, + error = error + ); + IggyError::Disconnected + })?; + + if read_bytes != RESPONSE_INITIAL_BYTES_LENGTH { + error!("Received an invalid or empty response."); + return Err(IggyError::EmptyResponse); + } + + let status = u32::from_le_bytes( + response_buffer[..4] + .try_into() + .map_err(|_| IggyError::InvalidNumberEncoding)?, + ); + let length = u32::from_le_bytes( + response_buffer[4..] + .try_into() + .map_err(|_| IggyError::InvalidNumberEncoding)?, + ); + return TcpClient::handle_response(status, length, stream).await; + } } error!("Cannot send data. Client is not connected."); diff --git a/core/sdk/src/vsr.rs b/core/sdk/src/vsr.rs new file mode 100644 index 000000000..c7fcb934e --- /dev/null +++ b/core/sdk/src/vsr.rs @@ -0,0 +1,255 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use crate::session::ConsensusSession; +use bytes::{BufMut, Bytes, BytesMut}; +use iggy_binary_protocol::codec::WireDecode; +use iggy_binary_protocol::codes::{ + DELETE_CONSUMER_OFFSET_2_CODE, DELETE_CONSUMER_OFFSET_CODE, DELETE_SEGMENTS_CODE, + LOGIN_REGISTER_CODE, LOGIN_REGISTER_WITH_PAT_CODE, SEND_MESSAGES_CODE, + STORE_CONSUMER_OFFSET_2_CODE, STORE_CONSUMER_OFFSET_CODE, +}; +use iggy_binary_protocol::consensus::{ + Command2, HEADER_SIZE, Operation, ReplyHeader, RequestHeader, read_size_field, +}; +use iggy_binary_protocol::requests::consumer_offsets::{ + DeleteConsumerOffset2Request, DeleteConsumerOffsetRequest, StoreConsumerOffset2Request, + StoreConsumerOffsetRequest, +}; +use iggy_binary_protocol::requests::messages::SendMessagesHeader; +use iggy_binary_protocol::requests::segments::DeleteSegmentsRequest; +use iggy_binary_protocol::{WireIdentifier, WirePartitioning}; +use iggy_common::sharding::IggyNamespace; +use iggy_common::{IggyError, IggyTimestamp}; + +pub(crate) fn encode_request( + session: &mut ConsensusSession, + code: u32, + payload: &Bytes, +) -> Result<Bytes, IggyError> { + let (operation, request_id, session_id) = match code { + LOGIN_REGISTER_CODE | LOGIN_REGISTER_WITH_PAT_CODE => { + (Operation::Register, session.register_request_id(), 0) + } + _ => { + let operation = Operation::from_command_code(code).ok_or(IggyError::FeatureUnavailable)?; + let session_id = session.session().ok_or(IggyError::Unauthenticated)?; + (operation, session.next_request_id(), session_id) + } + }; + let namespace = namespace_for_request(code, payload, operation)?; + let total_size = HEADER_SIZE + .checked_add(payload.len()) + .ok_or(IggyError::InvalidConfiguration)?; + let header = RequestHeader { + command: Command2::Request, + operation, + size: total_size as u32, + client: session.client_id(), + request: request_id, + session: session_id, + namespace, + timestamp: IggyTimestamp::now().as_micros(), + ..Default::default() + }; + + let mut request = BytesMut::with_capacity(total_size); + request.put_slice(bytemuck::bytes_of(&header)); + request.put_slice(payload); + Ok(request.freeze()) +} + +pub(crate) fn response_size(header: &[u8]) -> Result<usize, IggyError> { + let size = read_size_field(header).ok_or(IggyError::InvalidCommand)? as usize; + if size < HEADER_SIZE { + return Err(IggyError::InvalidCommand); + } + Ok(size) +} + +pub(crate) fn decode_response(response: &[u8]) -> Result<Bytes, IggyError> { + if response.len() < HEADER_SIZE { + return Err(IggyError::EmptyResponse); + } + + let header = bytemuck::checked::try_from_bytes::<ReplyHeader>(&response[..HEADER_SIZE]) + .map_err(|_| IggyError::InvalidCommand)?; + if header.command != Command2::Reply { + return Err(IggyError::InvalidCommand); + } + + let total_size = header.size as usize; + if total_size < HEADER_SIZE || response.len() < total_size { + return Err(IggyError::InvalidCommand); + } + + Ok(Bytes::copy_from_slice(&response[HEADER_SIZE..total_size])) +} + +fn namespace_for_request( + code: u32, + payload: &Bytes, + operation: Operation, +) -> Result<u64, IggyError> { + // Control-plane requests do not target a concrete stream/topic/partition shard. + // They are encoded with namespace 0 and routed through metadata/shard 0. + if operation == Operation::Register || operation.is_metadata() { + return Ok(0); + } + + let namespace = match code { + SEND_MESSAGES_CODE => { + if payload.len() < 4 { + return Err(IggyError::InvalidCommand); + } + let metadata_length = + u32::from_le_bytes(payload[..4].try_into().map_err(|_| IggyError::InvalidNumberEncoding)?) + as usize; + if payload.len() < 4 + metadata_length { + return Err(IggyError::InvalidCommand); + } + let header = SendMessagesHeader::decode_from(&payload[4..4 + metadata_length]) + .map_err(|_| IggyError::InvalidCommand)?; + namespace_from_partitioning( + &header.stream_id, + &header.topic_id, + &header.partitioning, + )? + } + STORE_CONSUMER_OFFSET_CODE => { + let request = StoreConsumerOffsetRequest::decode_from(payload) + .map_err(|_| IggyError::InvalidCommand)?; + namespace_from_partition(&request.stream_id, &request.topic_id, request.partition_id)? + } + DELETE_CONSUMER_OFFSET_CODE => { + let request = DeleteConsumerOffsetRequest::decode_from(payload) + .map_err(|_| IggyError::InvalidCommand)?; + namespace_from_partition(&request.stream_id, &request.topic_id, request.partition_id)? + } + STORE_CONSUMER_OFFSET_2_CODE => { + let request = StoreConsumerOffset2Request::decode_from(payload) + .map_err(|_| IggyError::InvalidCommand)?; + namespace_from_partition(&request.stream_id, &request.topic_id, request.partition_id)? + } + DELETE_CONSUMER_OFFSET_2_CODE => { + let request = DeleteConsumerOffset2Request::decode_from(payload) + .map_err(|_| IggyError::InvalidCommand)?; + namespace_from_partition(&request.stream_id, &request.topic_id, request.partition_id)? + } + DELETE_SEGMENTS_CODE => { + let request = DeleteSegmentsRequest::decode_from(payload) + .map_err(|_| IggyError::InvalidCommand)?; + namespace_from_partition( + &request.stream_id, + &request.topic_id, + Some(request.partition_id), + )? + } + _ => return Err(IggyError::FeatureUnavailable), + }; + + Ok(namespace) +} + +fn namespace_from_partitioning( + stream_id: &WireIdentifier, + topic_id: &WireIdentifier, + partitioning: &WirePartitioning, +) -> Result<u64, IggyError> { + let WirePartitioning::PartitionId(partition_id) = partitioning else { + return Err(IggyError::FeatureUnavailable); + }; + namespace_from_partition(stream_id, topic_id, Some(*partition_id)) +} + +fn namespace_from_partition( + stream_id: &WireIdentifier, + topic_id: &WireIdentifier, + partition_id: Option<u32>, +) -> Result<u64, IggyError> { + let stream_id = stream_id.as_u32().ok_or(IggyError::FeatureUnavailable)?; + let topic_id = topic_id.as_u32().ok_or(IggyError::FeatureUnavailable)?; + let partition_id = partition_id.ok_or(IggyError::FeatureUnavailable)?; + Ok(IggyNamespace::new(stream_id as usize, topic_id as usize, partition_id as usize).inner()) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::session::ConsensusSession; + use iggy_binary_protocol::codes::{CREATE_STREAM_CODE, GET_STREAM_CODE}; + use iggy_binary_protocol::consensus::{Message, RequestHeader, iobuf::Owned}; + use iggy_binary_protocol::requests::streams::CreateStreamRequest; + use iggy_binary_protocol::requests::users::LoginRegisterRequest; + use iggy_binary_protocol::{WireEncode, WireName}; + use secrecy::SecretString; + + fn decode_request(bytes: &Bytes) -> Message<RequestHeader> { + Message::try_from(Owned::<4096>::copy_from_slice(bytes.as_ref())).unwrap() + } + + #[test] + fn register_request_uses_zero_request_and_session() { + let mut session = ConsensusSession::with_client_id(7); + let request = LoginRegisterRequest { + client_id: 7, + username: WireName::new("admin").unwrap(), + password: SecretString::from("secret"), + version: None, + client_context: None, + }; + + let bytes = encode_request(&mut session, LOGIN_REGISTER_CODE, &request.to_bytes()).unwrap(); + let request = decode_request(&bytes); + let header = request.header(); + + assert_eq!(header.operation, Operation::Register); + assert_eq!(header.request, 0); + assert_eq!(header.session, 0); + assert_eq!(header.client, 7); + assert_eq!(header.namespace, 0); + } + + #[test] + fn replicated_request_increments_request_counter() { + let mut session = ConsensusSession::with_client_id(42); + let _ = session.register_request_id(); + session.bind(99); + let payload = CreateStreamRequest { + name: WireName::new("stream").unwrap(), + } + .to_bytes(); + + let first = encode_request(&mut session, CREATE_STREAM_CODE, &payload).unwrap(); + let second = encode_request(&mut session, CREATE_STREAM_CODE, &payload).unwrap(); + + assert_eq!(decode_request(&first).header().request, 1); + assert_eq!(decode_request(&second).header().request, 2); + assert_eq!(decode_request(&second).header().session, 99); + assert_eq!(decode_request(&second).header().namespace, 0); + } + + #[test] + fn unsupported_non_replicated_request_is_rejected() { + let mut session = ConsensusSession::new(); + assert!(matches!( + encode_request(&mut session, GET_STREAM_CODE, &Bytes::new()), + Err(IggyError::FeatureUnavailable) + )); + } +} diff --git a/core/sdk/src/websocket/websocket_client.rs b/core/sdk/src/websocket/websocket_client.rs index 5dd187a88..84a849066 100644 --- a/core/sdk/src/websocket/websocket_client.rs +++ b/core/sdk/src/websocket/websocket_client.rs @@ -17,6 +17,8 @@ */ use crate::leader_aware::{LeaderRedirectionState, check_and_redirect_to_leader}; +#[cfg(feature = "vsr")] +use crate::session::ConsensusSession; use crate::websocket::websocket_connection_stream::WebSocketConnectionStream; use crate::websocket::websocket_stream_kind::WebSocketStreamKind; use crate::websocket::websocket_tls_connection_stream::WebSocketTlsConnectionStream; @@ -28,9 +30,10 @@ use async_trait::async_trait; use bytes::{BufMut, Bytes, BytesMut}; use iggy_common::{ AutoLogin, ClientState, ConnectionString, Credentials, DiagnosticEvent, IggyDuration, - IggyError, IggyErrorDiscriminants, IggyTimestamp, WebSocketClientConfig, - WebSocketConnectionStringOptions, + IggyError, IggyTimestamp, WebSocketClientConfig, WebSocketConnectionStringOptions, }; +#[cfg(not(feature = "vsr"))] +use iggy_common::IggyErrorDiscriminants; use iggy_common::{BinaryClient, BinaryTransport, PersonalAccessTokenClient, UserClient}; use secrecy::ExposeSecret; use std::net::SocketAddr; @@ -44,7 +47,9 @@ use tokio_tungstenite::{ }; use tracing::{debug, error, info, trace, warn}; +#[cfg(not(feature = "vsr"))] const REQUEST_INITIAL_BYTES_LENGTH: usize = 4; +#[cfg(not(feature = "vsr"))] const RESPONSE_INITIAL_BYTES_LENGTH: usize = 8; const NAME: &str = "WebSocket"; @@ -58,6 +63,8 @@ pub struct WebSocketClient { pub(crate) connected_at: Mutex<Option<IggyTimestamp>>, leader_redirection_state: Mutex<LeaderRedirectionState>, pub(crate) current_server_address: Mutex<String>, + #[cfg(feature = "vsr")] + consensus_session: Arc<Mutex<ConsensusSession>>, } impl Default for WebSocketClient { @@ -145,6 +152,32 @@ impl BinaryTransport for WebSocketClient { fn get_heartbeat_interval(&self) -> IggyDuration { self.config.heartbeat_interval } + + #[cfg(feature = "vsr")] + async fn get_vsr_client_id(&self) -> Result<u128, IggyError> { + Ok(self.consensus_session.lock().await.client_id()) + } + + #[cfg(feature = "vsr")] + async fn bind_vsr_session(&self, session: u64) -> Result<(), IggyError> { + if session == 0 { + return Err(IggyError::InvalidConfiguration); + } + + let mut consensus_session = self.consensus_session.lock().await; + if consensus_session.is_bound() { + return Err(IggyError::InvalidConfiguration); + } + + consensus_session.bind(session); + Ok(()) + } + + #[cfg(feature = "vsr")] + async fn reset_vsr_session(&self) -> Result<(), IggyError> { + *self.consensus_session.lock().await = ConsensusSession::new(); + Ok(()) + } } impl BinaryClient for WebSocketClient {} @@ -163,6 +196,8 @@ impl WebSocketClient { connected_at: Mutex::new(None), leader_redirection_state: Mutex::new(LeaderRedirectionState::new()), current_server_address: Mutex::new(server_address), + #[cfg(feature = "vsr")] + consensus_session: Arc::new(Mutex::new(ConsensusSession::new())), }) } @@ -520,6 +555,8 @@ impl WebSocketClient { self.set_state(ClientState::Disconnected).await; self.stream.lock().await.take(); + #[cfg(feature = "vsr")] + self.reset_vsr_session().await?; self.publish_event(DiagnosticEvent::Disconnected).await; let now = IggyTimestamp::now(); @@ -542,6 +579,8 @@ impl WebSocketClient { let _ = stream.shutdown().await; } + #[cfg(feature = "vsr")] + self.reset_vsr_session().await?; self.set_state(ClientState::Shutdown).await; self.publish_event(DiagnosticEvent::Shutdown).await; info!("{NAME} client: {client_address} has been shutdown."); @@ -571,10 +610,20 @@ impl WebSocketClient { IggyError::NotConnected })?; + #[cfg(feature = "vsr")] + let request = { + let mut consensus_session = self.consensus_session.lock().await; + crate::vsr::encode_request(&mut consensus_session, code, &payload)? + }; + #[cfg(not(feature = "vsr"))] let payload_length = payload.len() + REQUEST_INITIAL_BYTES_LENGTH; + #[cfg(not(feature = "vsr"))] let mut request = BytesMut::with_capacity(4 + REQUEST_INITIAL_BYTES_LENGTH + payload.len()); + #[cfg(not(feature = "vsr"))] request.put_u32_le(payload_length as u32); + #[cfg(not(feature = "vsr"))] request.put_u32_le(code); + #[cfg(not(feature = "vsr"))] request.put_slice(&payload); trace!( @@ -586,61 +635,82 @@ impl WebSocketClient { stream.write(&request).await?; stream.flush().await?; - let mut response_initial_buffer = vec![0u8; RESPONSE_INITIAL_BYTES_LENGTH]; - stream.read(&mut response_initial_buffer).await?; - - let status = u32::from_le_bytes([ - response_initial_buffer[0], - response_initial_buffer[1], - response_initial_buffer[2], - response_initial_buffer[3], - ]); + #[cfg(feature = "vsr")] + { + let mut response_header = vec![0u8; iggy_binary_protocol::HEADER_SIZE]; + stream.read(&mut response_header).await?; - let length = u32::from_le_bytes([ - response_initial_buffer[4], - response_initial_buffer[5], - response_initial_buffer[6], - response_initial_buffer[7], - ]) as usize; + let response_size = crate::vsr::response_size(&response_header)?; + let mut response = BytesMut::with_capacity(response_size); + response.put_slice(&response_header); - trace!( - "Received {NAME} response status: {}, length: {} bytes", - status, length - ); - - if status != 0 { - // TEMP: See https://github.com/apache/iggy/pull/604 for context. - if status == IggyErrorDiscriminants::TopicNameAlreadyExists as u32 - || status == IggyErrorDiscriminants::StreamNameAlreadyExists as u32 - || status == IggyErrorDiscriminants::UserAlreadyExists as u32 - || status == IggyErrorDiscriminants::PersonalAccessTokenAlreadyExists as u32 - || status == IggyErrorDiscriminants::ConsumerGroupNameAlreadyExists as u32 - { - debug!( - "Received a server resource already exists response: {} ({})", - status, - IggyError::from_code_as_string(status) - ) - } else { - error!( - "Received an invalid response with status: {} ({}).", - status, - IggyError::from_code_as_string(status), - ); + if response_size > iggy_binary_protocol::HEADER_SIZE { + let mut response_body = vec![0u8; response_size - iggy_binary_protocol::HEADER_SIZE]; + stream.read(&mut response_body).await?; + response.put_slice(&response_body); } - return Err(IggyError::from_code(status)); + return crate::vsr::decode_response(&response.freeze()); } - if length == 0 { - return Ok(Bytes::new()); - } + #[cfg(not(feature = "vsr"))] + { + let mut response_initial_buffer = vec![0u8; RESPONSE_INITIAL_BYTES_LENGTH]; + stream.read(&mut response_initial_buffer).await?; + + let status = u32::from_le_bytes([ + response_initial_buffer[0], + response_initial_buffer[1], + response_initial_buffer[2], + response_initial_buffer[3], + ]); + + let length = u32::from_le_bytes([ + response_initial_buffer[4], + response_initial_buffer[5], + response_initial_buffer[6], + response_initial_buffer[7], + ]) as usize; + + trace!( + "Received {NAME} response status: {}, length: {} bytes", + status, length + ); + + if status != 0 { + // TEMP: See https://github.com/apache/iggy/pull/604 for context. + if status == IggyErrorDiscriminants::TopicNameAlreadyExists as u32 + || status == IggyErrorDiscriminants::StreamNameAlreadyExists as u32 + || status == IggyErrorDiscriminants::UserAlreadyExists as u32 + || status == IggyErrorDiscriminants::PersonalAccessTokenAlreadyExists as u32 + || status == IggyErrorDiscriminants::ConsumerGroupNameAlreadyExists as u32 + { + debug!( + "Received a server resource already exists response: {} ({})", + status, + IggyError::from_code_as_string(status) + ) + } else { + error!( + "Received an invalid response with status: {} ({}).", + status, + IggyError::from_code_as_string(status), + ); + } + + return Err(IggyError::from_code(status)); + } + + if length == 0 { + return Ok(Bytes::new()); + } - let mut response_buffer = vec![0u8; length]; - stream.read(&mut response_buffer).await?; + let mut response_buffer = vec![0u8; length]; + stream.read(&mut response_buffer).await?; - trace!("Received {NAME} response payload, size: {} bytes", length); - Ok(Bytes::from(response_buffer)) + trace!("Received {NAME} response payload, size: {} bytes", length); + Ok(Bytes::from(response_buffer)) + } } }
