This is an automated email from the ASF dual-hosted git repository.
bashirbekov pushed a commit to branch draft-sans
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/draft-sans by this push:
new 853d3077 del
853d3077 is described below
commit 853d3077e89e544026ae85582ab273ce5f851119
Author: haze518 <[email protected]>
AuthorDate: Tue Jul 1 06:26:28 2025 +0600
del
---
core/sdk/src/clients/client.rs | 4 ++
core/sdk/src/proto/connection.rs | 105 ++++++++++++++++++++++++--------------
core/sdk/src/proto/mod.rs | 3 +-
core/sdk/src/proto/runtime.rs | 18 +++++++
core/sdk/src/proto/send_buffer.rs | 85 ++++++++++++++++++++++++++++++
core/sdk/src/proto/tcp_adapter.rs | 18 +++++++
6 files changed, 195 insertions(+), 38 deletions(-)
diff --git a/core/sdk/src/clients/client.rs b/core/sdk/src/clients/client.rs
index 4896e629..5c4aff2d 100644
--- a/core/sdk/src/clients/client.rs
+++ b/core/sdk/src/clients/client.rs
@@ -39,6 +39,10 @@ use tokio::time::sleep;
use tracing::log::warn;
use tracing::{debug, error, info};
+// pub struct IggyClientNew {
+// pub(crate)
+// }
+
/// The main client struct which implements all the `Client` traits and wraps
the underlying low-level client for the specific transport.
///
/// It also provides the additional builders for the standalone consumer,
consumer group, and producer.
diff --git a/core/sdk/src/proto/connection.rs b/core/sdk/src/proto/connection.rs
index 48821011..9cd11354 100644
--- a/core/sdk/src/proto/connection.rs
+++ b/core/sdk/src/proto/connection.rs
@@ -1,11 +1,22 @@
use std::{collections::VecDeque, pin::Pin, sync::Arc};
-use bytes::Bytes;
-use iggy_common::{ClientState, Command, IggyDuration, IggyError,
IggyTimestamp};
+use bytes::{Bytes, BytesMut};
+use iggy_common::{ClientState, Command, IggyDuration, IggyError,
IggyErrorDiscriminants, IggyTimestamp};
use std::io::IoSlice;
-use tracing::trace;
+use tracing::{error, trace};
const REQUEST_INITIAL_BYTES_LENGTH: usize = 4;
+const RESPONSE_INITIAL_BYTES_LENGTH: usize = 8;
+const ALREADY_EXISTS_STATUSES: &[u32] = &[
+ IggyErrorDiscriminants::TopicIdAlreadyExists as u32,
+ IggyErrorDiscriminants::TopicNameAlreadyExists as u32,
+ IggyErrorDiscriminants::StreamIdAlreadyExists as u32,
+ IggyErrorDiscriminants::StreamNameAlreadyExists as u32,
+ IggyErrorDiscriminants::UserAlreadyExists as u32,
+ IggyErrorDiscriminants::PersonalAccessTokenAlreadyExists as u32,
+ IggyErrorDiscriminants::ConsumerGroupIdAlreadyExists as u32,
+ IggyErrorDiscriminants::ConsumerGroupNameAlreadyExists as u32,
+];
pub trait TransportConfig {
fn resstablish_after(&self) -> IggyDuration;
@@ -38,18 +49,24 @@ pub enum Order {
Noop,
}
+pub enum InboundResult {
+ Need(usize),
+ Response(Bytes),
+ Error(IggyError),
+}
+
pub struct IggyCore {
state: ClientState,
last_connect: Option<IggyTimestamp>,
- reconnect_us: u64,
- pending: VecDeque<Box<dyn Command>>,
- config: Box<dyn TransportConfig>, // todo rewrite via generic
+ pending: VecDeque<(u32 /* code */, Bytes /* payload */)>,
+ config: Arc<dyn TransportConfig + Send + Sync + 'static>, // todo rewrite
via generic
retry_count: u32,
current_tx: Option<TxBuf>,
+ rx_buf: BytesMut,
}
impl IggyCore {
- pub fn write(&mut self, cmd: impl Command) -> Result<_, IggyError> {
+ pub fn write(&mut self, cmd: impl Command) -> Result<(), IggyError> {
match self.state {
ClientState::Shutdown => {
trace!("Cannot send data. Client is shutdown.");
@@ -65,7 +82,7 @@ impl IggyCore {
}
_ => {}
}
- self.pending.push_back(Box::new(cmd));
+ self.pending.push_back((cmd.code(), cmd.to_bytes()));
Ok(())
}
@@ -125,46 +142,60 @@ impl IggyCore {
// TODO вызывать при async fn poll
pub fn poll_transmit(&mut self) -> Option<&TxBuf> {
if self.current_tx.is_none() {
- let cmd = self.pending.pop_front()?;
- let payload = cmd.to_bytes();
+ let (code, payload) = self.pending.pop_front()?;
let len = (payload.len() + REQUEST_INITIAL_BYTES_LENGTH) as u32;
self.current_tx = Some(TxBuf {
hdr_len: len.to_le_bytes(),
- hdr_code: cmd.code().to_le_bytes(),
+ hdr_code: code.to_le_bytes(),
payload,
});
}
self.current_tx.as_ref()
}
-}
-
-// pub trait ConnectionAdapter: Send + Sync + 'static {
-// fn write(&mut self, buf: &[u8]) -> Pin<Box<dyn Future<Output =
Result<(), IggyError>> + Send>>;
-// }
-// pub trait Runtime {
-
-// }
-
-// // pin_project! {
-// pub struct OpenConn<'a> {
-// conn: &'a Connection,
-// }
-// // }
-
-// impl Future for OpenConn<'_> {
-// type Output = Result<Connection, IggyError>;
-// fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) ->
std::task::Poll<Self::Output> {
+ pub fn feed_inbound(&mut self, bytes: &[u8]) -> InboundResult {
+ if self.rx_buf.len() < RESPONSE_INITIAL_BYTES_LENGTH {
+ let need = RESPONSE_INITIAL_BYTES_LENGTH - self.rx_buf.len();
+ self.rx_buf.extend_from_slice(bytes);
+ if self.rx_buf.len() < RESPONSE_INITIAL_BYTES_LENGTH {
+ return InboundResult::Need(need);
+ }
+ }
-// }
-// }
+ let status = u32::from_le_bytes(self.rx_buf[0..4].try_into().unwrap());
+ let length = u32::from_le_bytes(self.rx_buf[4..8].try_into().unwrap());
+
+ if status != 0 {
+ if ALREADY_EXISTS_STATUSES.contains(&status) {
+ tracing::debug!(
+ "Received a server resource already exists response: {}
({})",
+ status,
+ IggyError::from_code_as_string(status)
+ )
+ } else {
+ error!(
+ "Received an invalid response with status: {} ({}).",
+ status,
+ IggyError::from_code_as_string(status),
+ );
+ }
+ return InboundResult::Error(IggyError::from_code(status));
+ }
-// pub struct Connection {
-// adapter: Arc<dyn ConnectionAdapter>,
-// runtime: Arc<dyn Runtime>,
-// }
+ trace!("Status: OK. Response length: {}", length);
+ if length <= 1 {
+ return InboundResult::Response(Bytes::new());
+ }
-// impl Connection {
+ let body_len = length as usize;
+ let got = self.rx_buf.len() - RESPONSE_INITIAL_BYTES_LENGTH;
+ if got < body_len {
+ return InboundResult::Need(body_len - got);
+ }
-// }
+ let mut full = self.rx_buf.split_to(8 + body_len);
+ let body = full.split_off(8).freeze();
+ InboundResult::Response(body)
+ }
+}
diff --git a/core/sdk/src/proto/mod.rs b/core/sdk/src/proto/mod.rs
index fdacce0f..478f6fac 100644
--- a/core/sdk/src/proto/mod.rs
+++ b/core/sdk/src/proto/mod.rs
@@ -1,4 +1,6 @@
pub mod connection;
+pub mod tcp_adapter;
+pub mod runtime;
use std::collections::VecDeque;
@@ -16,4 +18,3 @@ impl IggyCore {
}
}
-
diff --git a/core/sdk/src/proto/runtime.rs b/core/sdk/src/proto/runtime.rs
new file mode 100644
index 00000000..7adb1b19
--- /dev/null
+++ b/core/sdk/src/proto/runtime.rs
@@ -0,0 +1,18 @@
+use std::{ops::{Deref, DerefMut}, pin::Pin};
+
+pub trait Runtime: Sync + Send + 'static {
+ type Mutex<T>: Lockable<T> + Send + Sync + 'static
+ where
+ T: Send + 'static;
+
+ fn mutex<T: Send + 'static>(&self, value: T) -> Self::Mutex<T>;
+}
+
+pub trait Lockable<T>: Send + Sync + 'static {
+ type Guard<'a>: Deref<Target = T> + DerefMut + 'a
+ where
+ Self: 'a,
+ T: 'a;
+
+ fn lock(&self) -> Pin<Box<dyn Future<Output = Self::Guard<'_>> + Send +
'_>>;
+}
diff --git a/core/sdk/src/proto/send_buffer.rs
b/core/sdk/src/proto/send_buffer.rs
new file mode 100644
index 00000000..60e0b7aa
--- /dev/null
+++ b/core/sdk/src/proto/send_buffer.rs
@@ -0,0 +1,85 @@
+use std::collections::VecDeque;
+
+use bytes::Bytes;
+use iggy_common::{ClientState, IggyDuration, IggyError, IggyTimestamp};
+use tracing::{info, trace};
+
+pub struct SendBuffer {
+ data: VecDeque<Bytes>
+}
+
+impl SendBuffer {
+ pub fn write(&mut self, payload: Bytes) {
+ self.data.push_back(payload);
+ }
+
+ pub fn poll_transmit(&mut self) -> Option<Bytes> {
+ self.data.pop_front()
+ }
+}
+
+pub struct IggyCore {
+ data: VecDeque<Bytes>,
+ state: ClientState,
+ connected_at: Option<IggyTimestamp>,
+ reconnect_interval: u64, // micros
+}
+
+impl IggyCore {
+ pub fn write(&mut self, payload: Bytes) {
+ self.data.push_back(payload);
+ }
+
+ /*
+
+ // Псевдо-код адаптера:
+ loop {
+ if let Some(wait) = core.reconnect_wait() {
+ // тут — await, если надо (или thread::sleep)
+ sleep(wait.get_duration()).await;
+ }
+ match core.connect() {
+ Ok(_) => break,
+ Err(e) => {
+ // обработать ошибку
+ break;
+ }
+ }
+ }
+ */
+ pub fn connect(&mut self) -> Result<(), IggyError> {
+ match self.state {
+ ClientState::Shutdown => {
+ trace!("Cannot connect. Client is shutdown.");
+ return Err(IggyError::ClientShutdown);
+ }
+ ClientState::Connected | ClientState::Authenticating |
ClientState::Authenticated => {
+ trace!("Client is already connected.");
+ return Ok(());
+ }
+ ClientState::Connecting => {
+ trace!("Client is already connecting.");
+ return Ok(());
+ }
+ _ => {}
+ };
+
+ self.state = ClientState::Connecting;
+ Ok(())
+ }
+
+ pub fn reconnect_wait(&self) -> Option<IggyDuration> {
+ if let Some(connected_at) = self.connected_at.as_ref() {
+ let now = IggyTimestamp::now();
+ let elapsed = now.as_micros() - connected_at.as_micros();
+ if elapsed < self.reconnect_interval {
+ Some(IggyDuration::from(self.reconnect_interval - elapsed))
+ } else {
+ None
+ }
+ } else {
+ None
+ }
+ }
+}
+
diff --git a/core/sdk/src/proto/tcp_adapter.rs
b/core/sdk/src/proto/tcp_adapter.rs
new file mode 100644
index 00000000..122a14f5
--- /dev/null
+++ b/core/sdk/src/proto/tcp_adapter.rs
@@ -0,0 +1,18 @@
+use bytes::Bytes;
+use iggy_common::IggyError;
+
+use crate::proto::{connection::IggyCore, runtime::{Lockable, Runtime}};
+
+pub struct TCPAdapter<R: Runtime> {
+ rt: R,
+ core: R::Mutex<IggyCore>,
+}
+
+impl<R: Runtime> TCPAdapter<R> {
+ async fn send_raw(&self, code: u32, payload: Bytes) -> Result<Bytes,
IggyError> {
+ let a = self.rt.mutex(payload);
+ let _ = a.lock().await;
+
+ Ok(Bytes::new())
+ }
+}