This is an automated email from the ASF dual-hosted git repository. bashirbekov pushed a commit to branch draft-sans in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 3bd96e3c6f7ca179763fb1631adcde27b56256c5 Author: haze518 <[email protected]> AuthorDate: Tue Jul 1 09:43:33 2025 +0600 del --- core/sdk/src/clients/client.rs | 4 ++ core/sdk/src/proto/connection.rs | 105 ++++++++++++++++++++++++-------------- core/sdk/src/proto/mod.rs | 3 +- core/sdk/src/proto/runtime.rs | 18 +++++++ core/sdk/src/proto/send_buffer.rs | 85 ++++++++++++++++++++++++++++++ core/sdk/src/proto/tcp_adapter.rs | 31 +++++++++++ 6 files changed, 208 insertions(+), 38 deletions(-) diff --git a/core/sdk/src/clients/client.rs b/core/sdk/src/clients/client.rs index 4896e629..5c4aff2d 100644 --- a/core/sdk/src/clients/client.rs +++ b/core/sdk/src/clients/client.rs @@ -39,6 +39,10 @@ use tokio::time::sleep; use tracing::log::warn; use tracing::{debug, error, info}; +// pub struct IggyClientNew { +// pub(crate) +// } + /// The main client struct which implements all the `Client` traits and wraps the underlying low-level client for the specific transport. /// /// It also provides the additional builders for the standalone consumer, consumer group, and producer. diff --git a/core/sdk/src/proto/connection.rs b/core/sdk/src/proto/connection.rs index 48821011..69d0fd68 100644 --- a/core/sdk/src/proto/connection.rs +++ b/core/sdk/src/proto/connection.rs @@ -1,11 +1,22 @@ use std::{collections::VecDeque, pin::Pin, sync::Arc}; -use bytes::Bytes; -use iggy_common::{ClientState, Command, IggyDuration, IggyError, IggyTimestamp}; +use bytes::{Bytes, BytesMut}; +use iggy_common::{ClientState, Command, IggyDuration, IggyError, IggyErrorDiscriminants, IggyTimestamp}; use std::io::IoSlice; -use tracing::trace; +use tracing::{error, trace}; const REQUEST_INITIAL_BYTES_LENGTH: usize = 4; +const RESPONSE_INITIAL_BYTES_LENGTH: usize = 8; +const ALREADY_EXISTS_STATUSES: &[u32] = &[ + IggyErrorDiscriminants::TopicIdAlreadyExists as u32, + IggyErrorDiscriminants::TopicNameAlreadyExists as u32, + IggyErrorDiscriminants::StreamIdAlreadyExists as u32, + IggyErrorDiscriminants::StreamNameAlreadyExists as u32, + IggyErrorDiscriminants::UserAlreadyExists as u32, + IggyErrorDiscriminants::PersonalAccessTokenAlreadyExists as u32, + IggyErrorDiscriminants::ConsumerGroupIdAlreadyExists as u32, + IggyErrorDiscriminants::ConsumerGroupNameAlreadyExists as u32, +]; pub trait TransportConfig { fn resstablish_after(&self) -> IggyDuration; @@ -38,18 +49,24 @@ pub enum Order { Noop, } +pub enum InboundResult { + Need(usize), + Response(Bytes), + Error(IggyError), +} + pub struct IggyCore { state: ClientState, last_connect: Option<IggyTimestamp>, - reconnect_us: u64, - pending: VecDeque<Box<dyn Command>>, - config: Box<dyn TransportConfig>, // todo rewrite via generic + pending: VecDeque<(u32 /* code */, Bytes /* payload */)>, + config: Arc<dyn TransportConfig + Send + Sync + 'static>, // todo rewrite via generic retry_count: u32, current_tx: Option<TxBuf>, + rx_buf: BytesMut, } impl IggyCore { - pub fn write(&mut self, cmd: impl Command) -> Result<_, IggyError> { + pub fn write(&mut self, cmd: &impl Command) -> Result<(), IggyError> { match self.state { ClientState::Shutdown => { trace!("Cannot send data. Client is shutdown."); @@ -65,7 +82,7 @@ impl IggyCore { } _ => {} } - self.pending.push_back(Box::new(cmd)); + self.pending.push_back((cmd.code(), cmd.to_bytes())); Ok(()) } @@ -125,46 +142,60 @@ impl IggyCore { // TODO вызывать при async fn poll pub fn poll_transmit(&mut self) -> Option<&TxBuf> { if self.current_tx.is_none() { - let cmd = self.pending.pop_front()?; - let payload = cmd.to_bytes(); + let (code, payload) = self.pending.pop_front()?; let len = (payload.len() + REQUEST_INITIAL_BYTES_LENGTH) as u32; self.current_tx = Some(TxBuf { hdr_len: len.to_le_bytes(), - hdr_code: cmd.code().to_le_bytes(), + hdr_code: code.to_le_bytes(), payload, }); } self.current_tx.as_ref() } -} - -// pub trait ConnectionAdapter: Send + Sync + 'static { -// fn write(&mut self, buf: &[u8]) -> Pin<Box<dyn Future<Output = Result<(), IggyError>> + Send>>; -// } -// pub trait Runtime { - -// } - -// // pin_project! { -// pub struct OpenConn<'a> { -// conn: &'a Connection, -// } -// // } - -// impl Future for OpenConn<'_> { -// type Output = Result<Connection, IggyError>; -// fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Self::Output> { + pub fn feed_inbound(&mut self, bytes: &[u8]) -> InboundResult { + if self.rx_buf.len() < RESPONSE_INITIAL_BYTES_LENGTH { + let need = RESPONSE_INITIAL_BYTES_LENGTH - self.rx_buf.len(); + self.rx_buf.extend_from_slice(bytes); + if self.rx_buf.len() < RESPONSE_INITIAL_BYTES_LENGTH { + return InboundResult::Need(need); + } + } -// } -// } + let status = u32::from_le_bytes(self.rx_buf[0..4].try_into().unwrap()); + let length = u32::from_le_bytes(self.rx_buf[4..8].try_into().unwrap()); + + if status != 0 { + if ALREADY_EXISTS_STATUSES.contains(&status) { + tracing::debug!( + "Received a server resource already exists response: {} ({})", + status, + IggyError::from_code_as_string(status) + ) + } else { + error!( + "Received an invalid response with status: {} ({}).", + status, + IggyError::from_code_as_string(status), + ); + } + return InboundResult::Error(IggyError::from_code(status)); + } -// pub struct Connection { -// adapter: Arc<dyn ConnectionAdapter>, -// runtime: Arc<dyn Runtime>, -// } + trace!("Status: OK. Response length: {}", length); + if length <= 1 { + return InboundResult::Response(Bytes::new()); + } -// impl Connection { + let body_len = length as usize; + let got = self.rx_buf.len() - RESPONSE_INITIAL_BYTES_LENGTH; + if got < body_len { + return InboundResult::Need(body_len - got); + } -// } + let mut full = self.rx_buf.split_to(8 + body_len); + let body = full.split_off(8).freeze(); + InboundResult::Response(body) + } +} diff --git a/core/sdk/src/proto/mod.rs b/core/sdk/src/proto/mod.rs index fdacce0f..478f6fac 100644 --- a/core/sdk/src/proto/mod.rs +++ b/core/sdk/src/proto/mod.rs @@ -1,4 +1,6 @@ pub mod connection; +pub mod tcp_adapter; +pub mod runtime; use std::collections::VecDeque; @@ -16,4 +18,3 @@ impl IggyCore { } } - diff --git a/core/sdk/src/proto/runtime.rs b/core/sdk/src/proto/runtime.rs new file mode 100644 index 00000000..7adb1b19 --- /dev/null +++ b/core/sdk/src/proto/runtime.rs @@ -0,0 +1,18 @@ +use std::{ops::{Deref, DerefMut}, pin::Pin}; + +pub trait Runtime: Sync + Send + 'static { + type Mutex<T>: Lockable<T> + Send + Sync + 'static + where + T: Send + 'static; + + fn mutex<T: Send + 'static>(&self, value: T) -> Self::Mutex<T>; +} + +pub trait Lockable<T>: Send + Sync + 'static { + type Guard<'a>: Deref<Target = T> + DerefMut + 'a + where + Self: 'a, + T: 'a; + + fn lock(&self) -> Pin<Box<dyn Future<Output = Self::Guard<'_>> + Send + '_>>; +} diff --git a/core/sdk/src/proto/send_buffer.rs b/core/sdk/src/proto/send_buffer.rs new file mode 100644 index 00000000..60e0b7aa --- /dev/null +++ b/core/sdk/src/proto/send_buffer.rs @@ -0,0 +1,85 @@ +use std::collections::VecDeque; + +use bytes::Bytes; +use iggy_common::{ClientState, IggyDuration, IggyError, IggyTimestamp}; +use tracing::{info, trace}; + +pub struct SendBuffer { + data: VecDeque<Bytes> +} + +impl SendBuffer { + pub fn write(&mut self, payload: Bytes) { + self.data.push_back(payload); + } + + pub fn poll_transmit(&mut self) -> Option<Bytes> { + self.data.pop_front() + } +} + +pub struct IggyCore { + data: VecDeque<Bytes>, + state: ClientState, + connected_at: Option<IggyTimestamp>, + reconnect_interval: u64, // micros +} + +impl IggyCore { + pub fn write(&mut self, payload: Bytes) { + self.data.push_back(payload); + } + + /* + + // Псевдо-код адаптера: + loop { + if let Some(wait) = core.reconnect_wait() { + // тут — await, если надо (или thread::sleep) + sleep(wait.get_duration()).await; + } + match core.connect() { + Ok(_) => break, + Err(e) => { + // обработать ошибку + break; + } + } + } + */ + pub fn connect(&mut self) -> Result<(), IggyError> { + match self.state { + ClientState::Shutdown => { + trace!("Cannot connect. Client is shutdown."); + return Err(IggyError::ClientShutdown); + } + ClientState::Connected | ClientState::Authenticating | ClientState::Authenticated => { + trace!("Client is already connected."); + return Ok(()); + } + ClientState::Connecting => { + trace!("Client is already connecting."); + return Ok(()); + } + _ => {} + }; + + self.state = ClientState::Connecting; + Ok(()) + } + + pub fn reconnect_wait(&self) -> Option<IggyDuration> { + if let Some(connected_at) = self.connected_at.as_ref() { + let now = IggyTimestamp::now(); + let elapsed = now.as_micros() - connected_at.as_micros(); + if elapsed < self.reconnect_interval { + Some(IggyDuration::from(self.reconnect_interval - elapsed)) + } else { + None + } + } else { + None + } + } +} + diff --git a/core/sdk/src/proto/tcp_adapter.rs b/core/sdk/src/proto/tcp_adapter.rs new file mode 100644 index 00000000..1fc06f3e --- /dev/null +++ b/core/sdk/src/proto/tcp_adapter.rs @@ -0,0 +1,31 @@ +use std::task::Context; + +use bytes::Bytes; +use futures::future::poll_fn; +use iggy_common::{Command, IggyError}; + +use crate::proto::{connection::IggyCore, runtime::{Lockable, Runtime}}; +use crate::tcp::tcp_connection_stream_kind::ConnectionStreamKind; + +pub struct TCPAdapter<R: Runtime> { + rt: R, + core: R::Mutex<IggyCore>, + pub(crate) stream: R::Mutex<Option<ConnectionStreamKind>>, +} + +impl<R: Runtime> TCPAdapter<R> { + pub async fn send_with_response(&self, command: &impl Command) -> Result<Bytes, IggyError> { + // poll_fn(|cx| ) + Ok(Bytes::new()) + } + + async fn write(&self, command: &impl Command) -> Result<(), IggyError> { + command.validate()?; + self.core.lock().await.write(command) + } + + fn execute_poll(&mut self, cx: &mut Context, command: &impl Command) -> Result<Bytes, IggyError> { + self.write(command).await?; + + } +}
