This is an automated email from the ASF dual-hosted git repository. bashirbekov pushed a commit to branch draft-sans in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 6baf14beb1e550522e09e44f04ed3f079faa1507 Author: haze518 <ashr...@gmail.com> AuthorDate: Fri Jun 27 20:18:58 2025 +0600 draft --- core/sdk/src/lib.rs | 1 + core/sdk/src/proto/connection.rs | 170 ++++++++++++++++++++++++++++++++++++++ core/sdk/src/proto/mod.rs | 19 +++++ core/sdk/src/quic/quick_client.rs | 1 + 4 files changed, 191 insertions(+) diff --git a/core/sdk/src/lib.rs b/core/sdk/src/lib.rs index 25c7bf58..d1c5397d 100644 --- a/core/sdk/src/lib.rs +++ b/core/sdk/src/lib.rs @@ -26,3 +26,4 @@ pub mod prelude; pub mod quic; pub mod stream_builder; pub mod tcp; +pub mod proto; diff --git a/core/sdk/src/proto/connection.rs b/core/sdk/src/proto/connection.rs new file mode 100644 index 00000000..48821011 --- /dev/null +++ b/core/sdk/src/proto/connection.rs @@ -0,0 +1,170 @@ +use std::{collections::VecDeque, pin::Pin, sync::Arc}; + +use bytes::Bytes; +use iggy_common::{ClientState, Command, IggyDuration, IggyError, IggyTimestamp}; +use std::io::IoSlice; +use tracing::trace; + +const REQUEST_INITIAL_BYTES_LENGTH: usize = 4; + +pub trait TransportConfig { + fn resstablish_after(&self) -> IggyDuration; + fn max_retries(&self) -> Option<u32>; +} + +pub struct TxBuf { + hdr_len: [u8; 4], + hdr_code: [u8; 4], + payload: Bytes, +} + +impl TxBuf { + pub fn as_slices(&self) -> [IoSlice<'_>; 3] { + [ + IoSlice::new(&self.hdr_len), + IoSlice::new(&self.hdr_code), + IoSlice::new(&self.payload), + ] + } +} + +pub enum Order { + Outbound(Box<dyn Command>), + State(ClientState), + Wait(IggyDuration), + Response(Bytes), + Reconnect, + InitialConnect, + Noop, +} + +pub struct IggyCore { + state: ClientState, + last_connect: Option<IggyTimestamp>, + reconnect_us: u64, + pending: VecDeque<Box<dyn Command>>, + config: Box<dyn TransportConfig>, // todo rewrite via generic + retry_count: u32, + current_tx: Option<TxBuf>, +} + +impl IggyCore { + pub fn write(&mut self, cmd: impl Command) -> Result<_, IggyError> { + match self.state { + ClientState::Shutdown => { + trace!("Cannot send data. Client is shutdown."); + return Err(IggyError::ClientShutdown); + } + ClientState::Disconnected => { + trace!("Cannot send data. Client is not connected."); + return Err(IggyError::NotConnected); + } + ClientState::Connecting => { + trace!("Cannot send data. Client is still connecting."); + return Err(IggyError::NotConnected); + } + _ => {} + } + self.pending.push_back(Box::new(cmd)); + Ok(()) + } + + pub fn start_connect(&mut self) -> Result<Order, IggyError> { + self.connect(Order::InitialConnect) + } + + pub fn poll_connect(&mut self) -> Result<Order, IggyError> { + self.connect(Order::Reconnect) + } + + fn connect(&mut self, request: Order) -> Result<Order, IggyError> { + match (self.state, request) { + (ClientState::Shutdown, _) => { + trace!("Cannot connect. Client is shutdown."); + return Err(IggyError::ClientShutdown); + } + ( + ClientState::Connected | ClientState::Authenticating | ClientState::Authenticated, + _, + ) => { + trace!("Client: client_address is already connected."); + return Ok(Order::Noop); + } + (ClientState::Connecting, Order::Reconnect) => {} + (ClientState::Connecting, _) => { + trace!("Client is already connecting."); + return Ok(Order::Noop); + } + _ => {} + }; + + self.state = ClientState::Connecting; + + if let Some(max_retries) = self.config.max_retries() { + if self.retry_count >= max_retries { + self.state = ClientState::Disconnected; + return Err(IggyError::CannotEstablishConnection); + } + } + + if let Some(last_connect) = self.last_connect { + let now = IggyTimestamp::now(); + let elapsed = now.as_micros() - last_connect.as_micros(); + let interval = self.config.resstablish_after().as_micros(); + if elapsed < interval { + let remaining = IggyDuration::from(interval - elapsed); + return Ok(Order::Wait(remaining)); + } + } + + self.retry_count += 1; + self.last_connect = Some(IggyTimestamp::now()); + Ok(Order::Reconnect) + } + + // TODO вызывать при async fn poll + pub fn poll_transmit(&mut self) -> Option<&TxBuf> { + if self.current_tx.is_none() { + let cmd = self.pending.pop_front()?; + let payload = cmd.to_bytes(); + let len = (payload.len() + REQUEST_INITIAL_BYTES_LENGTH) as u32; + + self.current_tx = Some(TxBuf { + hdr_len: len.to_le_bytes(), + hdr_code: cmd.code().to_le_bytes(), + payload, + }); + } + self.current_tx.as_ref() + } +} + +// pub trait ConnectionAdapter: Send + Sync + 'static { +// fn write(&mut self, buf: &[u8]) -> Pin<Box<dyn Future<Output = Result<(), IggyError>> + Send>>; +// } + +// pub trait Runtime { + +// } + +// // pin_project! { +// pub struct OpenConn<'a> { +// conn: &'a Connection, +// } +// // } + +// impl Future for OpenConn<'_> { +// type Output = Result<Connection, IggyError>; +// fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Self::Output> { + +// } +// } + +// pub struct Connection { +// adapter: Arc<dyn ConnectionAdapter>, +// runtime: Arc<dyn Runtime>, +// } + +// impl Connection { + +// } diff --git a/core/sdk/src/proto/mod.rs b/core/sdk/src/proto/mod.rs new file mode 100644 index 00000000..fdacce0f --- /dev/null +++ b/core/sdk/src/proto/mod.rs @@ -0,0 +1,19 @@ +pub mod connection; + +use std::collections::VecDeque; + +use bytes::Bytes; +use iggy_common::ClientState; + +pub struct IggyCore { + buffer: VecDeque<Bytes>, + current_state: ClientState, +} + +impl IggyCore { + pub fn write(&mut self, payload: Bytes) { + self.buffer.push_back(payload) + } + +} + diff --git a/core/sdk/src/quic/quick_client.rs b/core/sdk/src/quic/quick_client.rs index 7d0211ae..9317cd27 100644 --- a/core/sdk/src/quic/quick_client.rs +++ b/core/sdk/src/quic/quick_client.rs @@ -458,6 +458,7 @@ impl QuicClient { IggyError::QuicError })?; trace!("Sending a QUIC request with code: {code}"); + send.write_all(&(payload_length as u32).to_le_bytes()) .await .map_err(|error| {