This is an automated email from the ASF dual-hosted git repository. bashirbekov pushed a commit to branch draft-sans in repository https://gitbox.apache.org/repos/asf/iggy.git
commit fcb843ffab6ba93e8b465891c7b8e5d6fb6a82e1 Merge: 22dbaae8 1e3fbb54 Author: haze518 <[email protected]> AuthorDate: Mon Jul 21 07:39:56 2025 +0600 Merge branch 'draft-sans' of https://github.com/apache/iggy into draft-sans core/sdk/src/proto/connection.rs | 104 +++++++++++++++------------ core/sdk/src/proto/mod.rs | 1 + core/sdk/src/proto/send_buffer.rs | 3 + core/sdk/src/transport_adapter/async_new.rs | 0 core/sdk/src/transport_adapter/connection.rs | 102 ++++++++++++++++++++++++++ core/sdk/src/transport_adapter/mod.rs | 2 + 6 files changed, 168 insertions(+), 44 deletions(-) diff --cc core/sdk/src/proto/connection.rs index 91a4e4fc,73147a3a..81c15699 --- a/core/sdk/src/proto/connection.rs +++ b/core/sdk/src/proto/connection.rs @@@ -23,22 -23,33 +23,33 @@@ const ALREADY_EXISTS_STATUSES: &[u32] IggyErrorDiscriminants::ConsumerGroupNameAlreadyExists as u32, ]; - pub enum StateKind { - Handshake, - } - pub struct Connection { server_address: SocketAddr, - state: StateKind, + state: ClientState, config: IggyCoreConfig, + - pub send_buf: VecDeque<(u32 /* code */, Bytes /* payload */)>, ++ pub(crate) send_buf: VecDeque<(u32 /* code */, Bytes /* payload */)>, + send_waker: Option<Waker>, + + pub recv_buf: VecDeque<Bytes>, + recv_waker: Option<Waker>, } impl Connection { pub fn new(config: IggyCoreConfig, server_address: SocketAddr) -> Self { - Self { server_address, state: StateKind::Handshake, config } + Self { + server_address, + state: ClientState::Disconnected, + config, + send_buf: VecDeque::new(), + send_waker: None, + recv_buf: VecDeque::new(), + recv_waker: None, + } } - pub fn poll_transmit(&mut self, buf: &mut Vec<u8>) -> Result<(), IggyError> { + // TODO add iggy code - pub fn write(&mut self, data: Bytes) -> Result<(), IggyError> { ++ pub fn write(&mut self, code: u32, data: Bytes) -> Result<(), IggyError> { match self.state { ClientState::Shutdown => { trace!("Cannot send data. Client is shutdown."); @@@ -55,51 -66,50 +66,56 @@@ _ => {} } - let (code, payload, id) = self.pending.pop_front()?; - let len = (payload.len() + REQUEST_INITIAL_BYTES_LENGTH) as u32; - self.send_buf.push_back(data); ++ self.send_buf.push_back((code, data)); + + Ok(()) + } - self.current_tx = Some(Arc::new(TxBuf{ - pub fn poll_transmit(&mut self, buf: &mut Vec<u8>) { ++ pub fn poll_transmit(&mut self) -> Option<TxBuf> { ++ let (code, payload) = self.send_buf.pop_front()?; ++ let len = (payload.len() + REQUEST_INITIAL_BYTES_LENGTH) as u32; ++ Some(TxBuf{ + hdr_len: len.to_le_bytes(), + hdr_code: code.to_le_bytes(), - payload, - id, - })); - ++ payload, ++ id: 1, // todo rm ++ }) + } - } - - // TODO убрать из протокола - pub struct Connecting { - conn: Connection, - events: mpsc::UnboundedSender<DiagnosticEvent>, - } - - pub struct ConnectionRef { - - } - - pub struct State { - pub(crate) inner: Connection, - driver: Option<Waker>, - on_connected: Option<oneshot::Sender<bool>>, - connected: bool, - events: mpsc::UnboundedSender<DiagnosticEvent>, - pub(crate) blocked_writers: VecDeque<Waker>, - pub(crate) blocked_readers: VecDeque<Waker>, - pub(crate) error: Option<IggyError>, - runtime: Arc<dyn Runtime>, - send_buffer: Vec<u8>, - socket: Box<dyn AsyncWrite> - } - - impl State { - fn drive_transmit(&mut self, cx: &mut Context) -> io::Result<bool> { - // todo парсим self.send_buffer через connection - self.socket::poll_write_buf(io, cx, buf) ++ pub fn is_drained(&self) -> bool { ++ matches!(self.state, ClientState::Shutdown | ClientState::Disconnected) } + + // pub fn poll_transmit(&mut self, buf: &mut Vec<u8>) -> Result<(), IggyError> { + // match self.state { + // ClientState::Shutdown => { + // trace!("Cannot send data. Client is shutdown."); + // return Err(IggyError::ClientShutdown); + // } + // ClientState::Disconnected => { + // trace!("Cannot send data. Client is not connected."); + // return Err(IggyError::NotConnected); + // } + // ClientState::Connecting => { + // trace!("Cannot send data. Client is still connecting."); + // return Err(IggyError::NotConnected); + // } + // _ => {} + // } + + // let (code, payload, id) = self.pending.pop_front()?; + // let len = (payload.len() + REQUEST_INITIAL_BYTES_LENGTH) as u32; + + // self.current_tx = Some(Arc::new(TxBuf{ + // hdr_len: len.to_le_bytes(), + // hdr_code: code.to_le_bytes(), + // payload, + // id, + // })); + + // } } -pub struct Connecting { - conn: Connection, - events: mpsc::UnboundedSender<DiagnosticEvent>, -} - #[derive(Debug)] pub struct IggyCoreConfig { max_retries: Option<u32>, diff --cc core/sdk/src/transport_adapter/connection.rs index 00000000,751cb4d4..f5a10448 mode 000000,100644..100644 --- a/core/sdk/src/transport_adapter/connection.rs +++ b/core/sdk/src/transport_adapter/connection.rs @@@ -1,0 -1,5 +1,102 @@@ -use crate::proto::connection::Connection; ++use std::{io, ops::Deref, sync::{Arc, Mutex}, task::{Context, Poll, Waker}}; ++use crate::{ ++ proto::{connection::Connection, runtime::{Runtime}}, ++}; + -pub struct ConnectionRef { - conn: Connection ++use iggy_common::{DiagnosticEvent, IggyError}; ++use tokio::{io::AsyncWrite, sync::{mpsc, oneshot}}; ++ ++use crate::proto::{connection::{TxBuf}}; ++ ++pub struct Connecting { ++ conn: Connection, ++ events: mpsc::UnboundedSender<DiagnosticEvent>, ++} ++ ++impl Connecting { ++ ++} ++ ++pub struct State { ++ pub(crate) inner: Connection, ++ driver: Option<Waker>, ++ on_connected: Option<oneshot::Sender<bool>>, ++ connected: bool, ++ // events: mpsc::UnboundedSender<DiagnosticEvent>, ++ pub(crate) blocked_writer: Option<Waker>, ++ pub(crate) blocked_reader: Option<Waker>, ++ pub(crate) error: Option<IggyError>, ++ runtime: Arc<dyn Runtime>, ++ send_buffer: Vec<u8>, ++ current_tx: Option<TxBuf>, ++ // socket: Box<dyn AsyncWrite> + } ++ ++impl State { ++ fn drive_transmit(&mut self, cx: &mut Context) -> io::Result<bool> { ++ let tx_buf = match &self.current_tx { ++ Some(tx) => tx, ++ None => { ++ let tx_buf = self.inner.poll_transmit(); ++ if tx_buf.is_none() { ++ return Ok(false) ++ } ++ self.current_tx = tx_buf; ++ self.current_tx.as_ref().unwrap() ++ } ++ }; ++ ++ // try_send to socket ++ ++ Ok(true) ++ } ++} ++ ++pub struct ConnectionInner { ++ pub(crate) state: Mutex<State>, ++} ++ ++pub struct ConnectionRef(Arc<ConnectionInner>); ++ ++impl Deref for ConnectionRef { ++ type Target = ConnectionInner; ++ fn deref(&self) -> &Self::Target { ++ &self.0 ++ } ++} ++ ++impl ConnectionRef { ++ fn new(conn: Connection, runtime: Arc<dyn Runtime>) -> Self { ++ Self(Arc::new(ConnectionInner { ++ state: Mutex::new(State{ ++ inner: conn, ++ driver: None, ++ on_connected: None, ++ connected: false, ++ blocked_writer: None, ++ blocked_reader: None, ++ error: None, ++ runtime, ++ send_buffer: Vec::new(), ++ current_tx: None, ++ }) ++ })) ++ } ++} ++ ++pub struct ConnectionDriver(ConnectionRef); ++ ++impl Future for ConnectionDriver { ++ type Output = Result<(), io::Error>; ++ ++ fn poll(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> std::task::Poll<Self::Output> { ++ let mut conn = self.0.state.lock().unwrap(); ++ let _ = conn.drive_transmit(cx)?; ++ ++ if !conn.inner.is_drained() { ++ conn.driver = Some(cx.waker().clone()); ++ return Poll::Pending; ++ } ++ Poll::Ready(Ok(())) ++ } ++} diff --cc core/sdk/src/transport_adapter/mod.rs index d1656f5f,9c958231..414d32df --- a/core/sdk/src/transport_adapter/mod.rs +++ b/core/sdk/src/transport_adapter/mod.rs @@@ -1,4 -1,7 +1,6 @@@ pub mod r#async; + pub mod async_new; + pub mod connection; -pub mod state; use std::{pin::Pin, sync::Arc};
