This is an automated email from the ASF dual-hosted git repository.

bashirbekov pushed a commit to branch draft-sans
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit fcb843ffab6ba93e8b465891c7b8e5d6fb6a82e1
Merge: 22dbaae8 1e3fbb54
Author: haze518 <[email protected]>
AuthorDate: Mon Jul 21 07:39:56 2025 +0600

    Merge branch 'draft-sans' of https://github.com/apache/iggy into draft-sans

 core/sdk/src/proto/connection.rs             | 104 +++++++++++++++------------
 core/sdk/src/proto/mod.rs                    |   1 +
 core/sdk/src/proto/send_buffer.rs            |   3 +
 core/sdk/src/transport_adapter/async_new.rs  |   0
 core/sdk/src/transport_adapter/connection.rs | 102 ++++++++++++++++++++++++++
 core/sdk/src/transport_adapter/mod.rs        |   2 +
 6 files changed, 168 insertions(+), 44 deletions(-)

diff --cc core/sdk/src/proto/connection.rs
index 91a4e4fc,73147a3a..81c15699
--- a/core/sdk/src/proto/connection.rs
+++ b/core/sdk/src/proto/connection.rs
@@@ -23,22 -23,33 +23,33 @@@ const ALREADY_EXISTS_STATUSES: &[u32] 
      IggyErrorDiscriminants::ConsumerGroupNameAlreadyExists as u32,
  ];
  
- pub enum StateKind {
-     Handshake,
- }
- 
  pub struct Connection {
      server_address: SocketAddr,
-     state: StateKind,
+     state: ClientState,
      config: IggyCoreConfig,
+ 
 -    pub send_buf: VecDeque<(u32 /* code */, Bytes /* payload */)>,
++    pub(crate) send_buf: VecDeque<(u32 /* code */, Bytes /* payload */)>,
+     send_waker: Option<Waker>,
+ 
+     pub recv_buf: VecDeque<Bytes>,
+     recv_waker: Option<Waker>,
  }
  
  impl Connection {
      pub fn new(config: IggyCoreConfig, server_address: SocketAddr) -> Self {
-         Self { server_address, state: StateKind::Handshake, config }
+         Self {
+             server_address,
+             state: ClientState::Disconnected,
+             config,
+             send_buf: VecDeque::new(),
+             send_waker: None,
+             recv_buf: VecDeque::new(),
+             recv_waker: None,
+         }
      }
  
-     pub fn poll_transmit(&mut self, buf: &mut Vec<u8>) -> Result<(), 
IggyError> {
+     // TODO add iggy code
 -    pub fn write(&mut self, data: Bytes) -> Result<(), IggyError> {
++    pub fn write(&mut self, code: u32, data: Bytes) -> Result<(), IggyError> {
          match self.state {
              ClientState::Shutdown => {
                  trace!("Cannot send data. Client is shutdown.");
@@@ -55,51 -66,50 +66,56 @@@
              _ => {}
          }
  
-         let (code, payload, id) = self.pending.pop_front()?;
-         let len = (payload.len() + REQUEST_INITIAL_BYTES_LENGTH) as u32;
 -        self.send_buf.push_back(data);
++        self.send_buf.push_back((code, data));
+ 
+         Ok(())
+     }
  
-         self.current_tx = Some(Arc::new(TxBuf{
 -    pub fn poll_transmit(&mut self, buf: &mut Vec<u8>) {
++    pub fn poll_transmit(&mut self) -> Option<TxBuf> {
++        let (code, payload) = self.send_buf.pop_front()?;
++        let len = (payload.len() + REQUEST_INITIAL_BYTES_LENGTH) as u32;
++        Some(TxBuf{
 +            hdr_len: len.to_le_bytes(),
 +            hdr_code: code.to_le_bytes(),
-             payload, 
-             id,
-         }));
- 
++            payload,
++            id: 1, // todo rm
++        })
 +    }
- }
- 
- // TODO убрать из протокола
- pub struct Connecting {
-     conn: Connection,
-     events: mpsc::UnboundedSender<DiagnosticEvent>,
- }
- 
- pub struct ConnectionRef {
- 
- }
- 
  
- pub struct State {
-     pub(crate) inner: Connection,
-     driver: Option<Waker>,
-     on_connected: Option<oneshot::Sender<bool>>,
-     connected: bool,
-     events: mpsc::UnboundedSender<DiagnosticEvent>,
-     pub(crate) blocked_writers: VecDeque<Waker>,
-     pub(crate) blocked_readers: VecDeque<Waker>,
-     pub(crate) error: Option<IggyError>,
-     runtime: Arc<dyn Runtime>,
-     send_buffer: Vec<u8>,
-     socket: Box<dyn AsyncWrite>
- }
- 
- impl State {
-     fn drive_transmit(&mut self, cx: &mut Context) -> io::Result<bool> {
-         // todo парсим self.send_buffer через connection
-         self.socket::poll_write_buf(io, cx, buf)
++    pub fn is_drained(&self) -> bool {
++        matches!(self.state, ClientState::Shutdown | 
ClientState::Disconnected)
      }
+ 
+     // pub fn poll_transmit(&mut self, buf: &mut Vec<u8>) -> Result<(), 
IggyError> {
+     //     match self.state {
+     //         ClientState::Shutdown => {
+     //             trace!("Cannot send data. Client is shutdown.");
+     //             return Err(IggyError::ClientShutdown);
+     //         }
+     //         ClientState::Disconnected => {
+     //             trace!("Cannot send data. Client is not connected.");
+     //             return Err(IggyError::NotConnected);
+     //         }
+     //         ClientState::Connecting => {
+     //             trace!("Cannot send data. Client is still connecting.");
+     //             return Err(IggyError::NotConnected);
+     //         }
+     //         _ => {}
+     //     }
+ 
+     //     let (code, payload, id) = self.pending.pop_front()?;
+     //     let len = (payload.len() + REQUEST_INITIAL_BYTES_LENGTH) as u32;
+ 
+     //     self.current_tx = Some(Arc::new(TxBuf{
+     //         hdr_len: len.to_le_bytes(),
+     //         hdr_code: code.to_le_bytes(),
+     //         payload, 
+     //         id,
+     //     }));
+ 
+     // }
  }
  
 -pub struct Connecting {
 -    conn: Connection,
 -    events: mpsc::UnboundedSender<DiagnosticEvent>,
 -}
 -
  #[derive(Debug)]
  pub struct IggyCoreConfig {
      max_retries: Option<u32>,
diff --cc core/sdk/src/transport_adapter/connection.rs
index 00000000,751cb4d4..f5a10448
mode 000000,100644..100644
--- a/core/sdk/src/transport_adapter/connection.rs
+++ b/core/sdk/src/transport_adapter/connection.rs
@@@ -1,0 -1,5 +1,102 @@@
 -use crate::proto::connection::Connection;
++use std::{io, ops::Deref, sync::{Arc, Mutex}, task::{Context, Poll, Waker}};
++use crate::{
++    proto::{connection::Connection, runtime::{Runtime}},
++};
+ 
 -pub struct ConnectionRef {
 -    conn: Connection
++use iggy_common::{DiagnosticEvent, IggyError};
++use tokio::{io::AsyncWrite, sync::{mpsc, oneshot}};
++
++use crate::proto::{connection::{TxBuf}};
++
++pub struct Connecting {
++    conn: Connection,
++    events: mpsc::UnboundedSender<DiagnosticEvent>,
++}
++
++impl Connecting {
++    
++}
++
++pub struct State {
++    pub(crate) inner: Connection,
++    driver: Option<Waker>,
++    on_connected: Option<oneshot::Sender<bool>>,
++    connected: bool,
++    // events: mpsc::UnboundedSender<DiagnosticEvent>,
++    pub(crate) blocked_writer: Option<Waker>,
++    pub(crate) blocked_reader: Option<Waker>,
++    pub(crate) error: Option<IggyError>,
++    runtime: Arc<dyn Runtime>,
++    send_buffer: Vec<u8>,
++    current_tx: Option<TxBuf>,
++    // socket: Box<dyn AsyncWrite>
+ }
++
++impl State {
++    fn drive_transmit(&mut self, cx: &mut Context) -> io::Result<bool> {
++        let tx_buf = match &self.current_tx {
++            Some(tx) => tx,
++            None => {
++                let tx_buf = self.inner.poll_transmit();
++                if tx_buf.is_none() {
++                    return Ok(false)
++                }
++                self.current_tx = tx_buf;
++                self.current_tx.as_ref().unwrap()
++            }
++        };
++
++        // try_send to socket
++
++        Ok(true)
++    }
++}
++
++pub struct ConnectionInner {
++    pub(crate) state: Mutex<State>,
++}
++
++pub struct ConnectionRef(Arc<ConnectionInner>);
++
++impl Deref for ConnectionRef {
++    type Target = ConnectionInner;
++    fn deref(&self) -> &Self::Target {
++        &self.0
++    }
++}
++
++impl ConnectionRef {
++    fn new(conn: Connection, runtime: Arc<dyn Runtime>) -> Self {
++        Self(Arc::new(ConnectionInner {
++            state: Mutex::new(State{
++                inner: conn,
++                driver: None,
++                on_connected: None,
++                connected: false,
++                blocked_writer: None,
++                blocked_reader: None,
++                error: None,
++                runtime,
++                send_buffer: Vec::new(),
++                current_tx: None,
++            })
++        }))
++    }
++}
++
++pub struct ConnectionDriver(ConnectionRef);
++
++impl Future for ConnectionDriver {
++    type Output = Result<(), io::Error>;
++
++    fn poll(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> 
std::task::Poll<Self::Output> {
++        let mut conn = self.0.state.lock().unwrap();
++        let _ = conn.drive_transmit(cx)?;
++
++        if !conn.inner.is_drained() {
++            conn.driver = Some(cx.waker().clone());
++            return Poll::Pending;
++        }
++        Poll::Ready(Ok(()))
++    }
++}
diff --cc core/sdk/src/transport_adapter/mod.rs
index d1656f5f,9c958231..414d32df
--- a/core/sdk/src/transport_adapter/mod.rs
+++ b/core/sdk/src/transport_adapter/mod.rs
@@@ -1,4 -1,7 +1,6 @@@
  pub mod r#async;
+ pub mod async_new;
+ pub mod connection;
 -pub mod state;
  
  use std::{pin::Pin, sync::Arc};
  

Reply via email to