This is an automated email from the ASF dual-hosted git repository.

bashirbekov pushed a commit to branch draft-sans
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit 853d3077e89e544026ae85582ab273ce5f851119
Author: haze518 <[email protected]>
AuthorDate: Tue Jul 1 06:26:28 2025 +0600

    del
---
 core/sdk/src/clients/client.rs    |   4 ++
 core/sdk/src/proto/connection.rs  | 105 ++++++++++++++++++++++++--------------
 core/sdk/src/proto/mod.rs         |   3 +-
 core/sdk/src/proto/runtime.rs     |  18 +++++++
 core/sdk/src/proto/send_buffer.rs |  85 ++++++++++++++++++++++++++++++
 core/sdk/src/proto/tcp_adapter.rs |  18 +++++++
 6 files changed, 195 insertions(+), 38 deletions(-)

diff --git a/core/sdk/src/clients/client.rs b/core/sdk/src/clients/client.rs
index 4896e629..5c4aff2d 100644
--- a/core/sdk/src/clients/client.rs
+++ b/core/sdk/src/clients/client.rs
@@ -39,6 +39,10 @@ use tokio::time::sleep;
 use tracing::log::warn;
 use tracing::{debug, error, info};
 
+// pub struct IggyClientNew {
+//     pub(crate) 
+// }
+
 /// The main client struct which implements all the `Client` traits and wraps 
the underlying low-level client for the specific transport.
 ///
 /// It also provides the additional builders for the standalone consumer, 
consumer group, and producer.
diff --git a/core/sdk/src/proto/connection.rs b/core/sdk/src/proto/connection.rs
index 48821011..9cd11354 100644
--- a/core/sdk/src/proto/connection.rs
+++ b/core/sdk/src/proto/connection.rs
@@ -1,11 +1,22 @@
 use std::{collections::VecDeque, pin::Pin, sync::Arc};
 
-use bytes::Bytes;
-use iggy_common::{ClientState, Command, IggyDuration, IggyError, 
IggyTimestamp};
+use bytes::{Bytes, BytesMut};
+use iggy_common::{ClientState, Command, IggyDuration, IggyError, 
IggyErrorDiscriminants, IggyTimestamp};
 use std::io::IoSlice;
-use tracing::trace;
+use tracing::{error, trace};
 
 const REQUEST_INITIAL_BYTES_LENGTH: usize = 4;
+const RESPONSE_INITIAL_BYTES_LENGTH: usize = 8;
+const ALREADY_EXISTS_STATUSES: &[u32] = &[
+    IggyErrorDiscriminants::TopicIdAlreadyExists as u32,
+    IggyErrorDiscriminants::TopicNameAlreadyExists as u32,
+    IggyErrorDiscriminants::StreamIdAlreadyExists as u32,
+    IggyErrorDiscriminants::StreamNameAlreadyExists as u32,
+    IggyErrorDiscriminants::UserAlreadyExists as u32,
+    IggyErrorDiscriminants::PersonalAccessTokenAlreadyExists as u32,
+    IggyErrorDiscriminants::ConsumerGroupIdAlreadyExists as u32,
+    IggyErrorDiscriminants::ConsumerGroupNameAlreadyExists as u32,
+];
 
 pub trait TransportConfig {
     fn resstablish_after(&self) -> IggyDuration;
@@ -38,18 +49,24 @@ pub enum Order {
     Noop,
 }
 
+pub enum InboundResult {
+    Need(usize),        
+    Response(Bytes),    
+    Error(IggyError),   
+}
+
 pub struct IggyCore {
     state: ClientState,
     last_connect: Option<IggyTimestamp>,
-    reconnect_us: u64,
-    pending: VecDeque<Box<dyn Command>>,
-    config: Box<dyn TransportConfig>, // todo rewrite via generic
+    pending: VecDeque<(u32 /* code */, Bytes /* payload */)>,
+    config: Arc<dyn TransportConfig + Send + Sync + 'static>, // todo rewrite 
via generic
     retry_count: u32,
     current_tx: Option<TxBuf>,
+    rx_buf: BytesMut,
 }
 
 impl IggyCore {
-    pub fn write(&mut self, cmd: impl Command) -> Result<_, IggyError> {
+    pub fn write(&mut self, cmd: impl Command) -> Result<(), IggyError> {
         match self.state {
             ClientState::Shutdown => {
                 trace!("Cannot send data. Client is shutdown.");
@@ -65,7 +82,7 @@ impl IggyCore {
             }
             _ => {}
         }
-        self.pending.push_back(Box::new(cmd));
+        self.pending.push_back((cmd.code(), cmd.to_bytes()));
         Ok(())
     }
 
@@ -125,46 +142,60 @@ impl IggyCore {
     // TODO вызывать при async fn poll
     pub fn poll_transmit(&mut self) -> Option<&TxBuf> {
         if self.current_tx.is_none() {
-            let cmd = self.pending.pop_front()?;
-            let payload = cmd.to_bytes();
+            let (code, payload) = self.pending.pop_front()?;
             let len = (payload.len() + REQUEST_INITIAL_BYTES_LENGTH) as u32;
 
             self.current_tx = Some(TxBuf {
                 hdr_len: len.to_le_bytes(),
-                hdr_code: cmd.code().to_le_bytes(),
+                hdr_code: code.to_le_bytes(),
                 payload,
             });
         }
         self.current_tx.as_ref()
     }
-}
-
-// pub trait ConnectionAdapter: Send + Sync + 'static {
-//     fn write(&mut self, buf: &[u8]) -> Pin<Box<dyn Future<Output = 
Result<(), IggyError>> + Send>>;
-// }
 
-// pub trait Runtime {
-
-// }
-
-// // pin_project! {
-// pub struct OpenConn<'a> {
-//     conn: &'a Connection,
-// }
-// // }
-
-// impl Future for OpenConn<'_> {
-//     type Output = Result<Connection, IggyError>;
-//     fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> 
std::task::Poll<Self::Output> {
+    pub fn feed_inbound(&mut self, bytes: &[u8]) -> InboundResult {
+        if self.rx_buf.len() < RESPONSE_INITIAL_BYTES_LENGTH {
+            let need = RESPONSE_INITIAL_BYTES_LENGTH - self.rx_buf.len();
+            self.rx_buf.extend_from_slice(bytes);
+            if self.rx_buf.len() < RESPONSE_INITIAL_BYTES_LENGTH {
+                return InboundResult::Need(need);
+            }
+        }
 
-//     }
-// }
+        let status = u32::from_le_bytes(self.rx_buf[0..4].try_into().unwrap());
+        let length = u32::from_le_bytes(self.rx_buf[4..8].try_into().unwrap());
+
+        if status != 0 {
+            if ALREADY_EXISTS_STATUSES.contains(&status) {
+                tracing::debug!(
+                    "Received a server resource already exists response: {} 
({})",
+                    status,
+                    IggyError::from_code_as_string(status)
+                )
+            } else {
+                error!(
+                    "Received an invalid response with status: {} ({}).",
+                    status,
+                    IggyError::from_code_as_string(status),
+                );
+            }
+            return InboundResult::Error(IggyError::from_code(status));
+        }
 
-// pub struct Connection {
-//     adapter: Arc<dyn ConnectionAdapter>,
-//     runtime: Arc<dyn Runtime>,
-// }
+        trace!("Status: OK. Response length: {}", length);
+        if length <= 1 {
+            return InboundResult::Response(Bytes::new());
+        }
 
-// impl Connection {
+        let body_len = length as usize;
+        let got = self.rx_buf.len() - RESPONSE_INITIAL_BYTES_LENGTH;
+        if got < body_len {
+            return InboundResult::Need(body_len - got);
+        }
 
-// }
+        let mut full = self.rx_buf.split_to(8 + body_len);
+        let body = full.split_off(8).freeze();
+        InboundResult::Response(body)
+    }
+}
diff --git a/core/sdk/src/proto/mod.rs b/core/sdk/src/proto/mod.rs
index fdacce0f..478f6fac 100644
--- a/core/sdk/src/proto/mod.rs
+++ b/core/sdk/src/proto/mod.rs
@@ -1,4 +1,6 @@
 pub mod connection;
+pub mod tcp_adapter;
+pub mod runtime;
 
 use std::collections::VecDeque;
 
@@ -16,4 +18,3 @@ impl IggyCore {
     }
 
 }
-
diff --git a/core/sdk/src/proto/runtime.rs b/core/sdk/src/proto/runtime.rs
new file mode 100644
index 00000000..7adb1b19
--- /dev/null
+++ b/core/sdk/src/proto/runtime.rs
@@ -0,0 +1,18 @@
+use std::{ops::{Deref, DerefMut}, pin::Pin};
+
+pub trait Runtime: Sync + Send + 'static {
+    type Mutex<T>: Lockable<T> + Send + Sync + 'static
+    where
+        T: Send + 'static;
+
+    fn mutex<T: Send + 'static>(&self, value: T) -> Self::Mutex<T>;
+}
+
+pub trait Lockable<T>: Send + Sync + 'static {
+    type Guard<'a>: Deref<Target = T> + DerefMut + 'a
+    where
+        Self: 'a,
+        T: 'a;
+
+    fn lock(&self) -> Pin<Box<dyn Future<Output = Self::Guard<'_>> + Send + 
'_>>;
+}
diff --git a/core/sdk/src/proto/send_buffer.rs 
b/core/sdk/src/proto/send_buffer.rs
new file mode 100644
index 00000000..60e0b7aa
--- /dev/null
+++ b/core/sdk/src/proto/send_buffer.rs
@@ -0,0 +1,85 @@
+use std::collections::VecDeque;
+
+use bytes::Bytes;
+use iggy_common::{ClientState, IggyDuration, IggyError, IggyTimestamp};
+use tracing::{info, trace};
+
+pub struct SendBuffer {
+    data: VecDeque<Bytes>
+}
+
+impl SendBuffer {
+    pub fn write(&mut self, payload: Bytes) {
+        self.data.push_back(payload);
+    }
+
+    pub fn poll_transmit(&mut self) -> Option<Bytes> {
+        self.data.pop_front()
+    }
+}
+
+pub struct IggyCore {
+    data: VecDeque<Bytes>,
+    state: ClientState,
+    connected_at: Option<IggyTimestamp>,
+    reconnect_interval: u64, // micros
+}
+
+impl IggyCore {
+    pub fn write(&mut self, payload: Bytes) {
+        self.data.push_back(payload);
+    }
+
+    /*
+    
+    // Псевдо-код адаптера:
+    loop {
+        if let Some(wait) = core.reconnect_wait() {
+            // тут — await, если надо (или thread::sleep)
+            sleep(wait.get_duration()).await;
+        }
+        match core.connect() {
+            Ok(_) => break,
+            Err(e) => {
+                // обработать ошибку
+                break;
+            }
+        }
+    }
+ */
+    pub fn connect(&mut self) -> Result<(), IggyError> {
+        match self.state {
+            ClientState::Shutdown => {
+                trace!("Cannot connect. Client is shutdown.");
+                return Err(IggyError::ClientShutdown);
+            }
+            ClientState::Connected | ClientState::Authenticating | 
ClientState::Authenticated => {
+                trace!("Client is already connected.");
+                return Ok(());
+            }
+            ClientState::Connecting => {
+                trace!("Client is already connecting.");
+                return Ok(());
+            }
+            _ => {}
+        };
+
+        self.state = ClientState::Connecting;
+        Ok(())
+    }
+
+    pub fn reconnect_wait(&self) -> Option<IggyDuration> {
+        if let Some(connected_at) = self.connected_at.as_ref() {
+            let now = IggyTimestamp::now();
+            let elapsed = now.as_micros() - connected_at.as_micros();
+            if elapsed < self.reconnect_interval {
+                Some(IggyDuration::from(self.reconnect_interval - elapsed))
+            } else {
+                None
+            }
+        } else {
+            None
+        }
+    }
+}
+
diff --git a/core/sdk/src/proto/tcp_adapter.rs 
b/core/sdk/src/proto/tcp_adapter.rs
new file mode 100644
index 00000000..122a14f5
--- /dev/null
+++ b/core/sdk/src/proto/tcp_adapter.rs
@@ -0,0 +1,18 @@
+use bytes::Bytes;
+use iggy_common::IggyError;
+
+use crate::proto::{connection::IggyCore, runtime::{Lockable, Runtime}};
+
+pub struct TCPAdapter<R: Runtime> {
+    rt: R,
+    core: R::Mutex<IggyCore>,
+}
+
+impl<R: Runtime> TCPAdapter<R> {
+    async fn send_raw(&self, code: u32, payload: Bytes) -> Result<Bytes, 
IggyError> {
+        let a = self.rt.mutex(payload);
+        let _ = a.lock().await;
+        
+        Ok(Bytes::new())
+    }
+}

Reply via email to