This is an automated email from the ASF dual-hosted git repository. bashirbekov pushed a commit to branch draft-sans in repository https://gitbox.apache.org/repos/asf/iggy.git
commit b1d6fa54bb71b627bcdec7fe6049e84affc0b9d2 Author: haze518 <[email protected]> AuthorDate: Tue Jul 15 06:19:56 2025 +0600 test --- Cargo.lock | 1 + core/integration/Cargo.toml | 1 + core/integration/src/tcp_client.rs | 8 +- core/integration/tests/sdk/producer/mod.rs | 5 +- core/sdk/src/connection/tcp/mod.rs | 2 +- core/sdk/src/driver/tcp.rs | 145 +++++++++++++++++------------ core/sdk/src/proto/connection.rs | 51 +++++++++- core/sdk/src/transport_adapter/async.rs | 9 +- 8 files changed, 151 insertions(+), 71 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7bb6fafe..c9f41db2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4059,6 +4059,7 @@ dependencies = [ "ctor", "derive_more 2.0.1", "env_logger", + "flume", "futures", "humantime", "iggy", diff --git a/core/integration/Cargo.toml b/core/integration/Cargo.toml index b17d3d0b..673ff64a 100644 --- a/core/integration/Cargo.toml +++ b/core/integration/Cargo.toml @@ -35,6 +35,7 @@ chrono = { workspace = true } ctor = "0.4.2" derive_more = { workspace = true } env_logger = { workspace = true } +flume.workspace = true futures = { workspace = true } humantime = { workspace = true } iggy = { workspace = true } diff --git a/core/integration/src/tcp_client.rs b/core/integration/src/tcp_client.rs index 4a5f5491..96b90134 100644 --- a/core/integration/src/tcp_client.rs +++ b/core/integration/src/tcp_client.rs @@ -18,6 +18,7 @@ use crate::test_server::{ClientFactory, Transport}; use async_trait::async_trait; +use bytes::Bytes; use iggy::{connection::tcp::tcp::TokioTcpFactory, driver::tcp::TokioTcpDriver, prelude::{Client, IggyClient, TcpClient, TcpClientConfig}, proto::{connection::{IggyCore, IggyCoreConfig}, runtime::{sync, TokioRuntime}}, transport_adapter::r#async::AsyncTransportAdapter}; use std::sync::Arc; @@ -40,8 +41,11 @@ impl ClientFactory for TcpClientFactory { let core = Arc::new(sync::Mutex::new(IggyCore::new(IggyCoreConfig::default()))); let rt = Arc::new(TokioRuntime{}); let notify = Arc::new(sync::Notify::new()); - let dirver = TokioTcpDriver::new(core.clone(), rt.clone(), notify.clone(), tcp_factory.clone()); - let adapter = Box::new(AsyncTransportAdapter::new(tcp_factory, rt, core, dirver, notify)); + + let (tx, rx) = flume::bounded::<(u32, Bytes, u64)>(1); + + let dirver = TokioTcpDriver::new(core.clone(), rt.clone(), notify.clone(), tcp_factory.clone(), rx); + let adapter = Box::new(AsyncTransportAdapter::new(tcp_factory, rt, core, dirver, notify, tx)); let client = IggyClient::create(adapter, None, None); diff --git a/core/integration/tests/sdk/producer/mod.rs b/core/integration/tests/sdk/producer/mod.rs index 7834d903..a90f1948 100644 --- a/core/integration/tests/sdk/producer/mod.rs +++ b/core/integration/tests/sdk/producer/mod.rs @@ -88,8 +88,9 @@ async fn test_async_send() { let core = Arc::new(sync::Mutex::new(IggyCore::new(IggyCoreConfig::default()))); let rt: Arc<TokioRuntime> = Arc::new(TokioRuntime{}); let notify = Arc::new(sync::Notify::new()); - let dirver = TokioTcpDriver::new(core.clone(), rt.clone(), notify.clone(), tcp_factory.clone()); - let adapter = Box::new(AsyncTransportAdapter::new(tcp_factory, rt, core, dirver, notify)); + let (tx, rx) = flume::bounded::<(u32, Bytes, u64)>(1); + let dirver = TokioTcpDriver::new(core.clone(), rt.clone(), notify.clone(), tcp_factory.clone(), rx); + let adapter = Box::new(AsyncTransportAdapter::new(tcp_factory, rt, core, dirver, notify, tx)); let client = IggyClient::create(adapter, None, None); diff --git a/core/sdk/src/connection/tcp/mod.rs b/core/sdk/src/connection/tcp/mod.rs index 5f733990..3a6a15d8 100644 --- a/core/sdk/src/connection/tcp/mod.rs +++ b/core/sdk/src/connection/tcp/mod.rs @@ -36,7 +36,7 @@ impl StreamPair for TokioTcpStream { Box::pin(async move { let mut w = self.writer.lock().await; w.write_vectored(bufs).await.map_err(|_| IggyError::TcpError)?; - // w.flush().await.map_err(|_| IggyError::TcpError)?; + w.flush().await.map_err(|_| IggyError::TcpError)?; Ok(()) // for val in bufs { // self.writer.write(val).await.map_err(|e| { diff --git a/core/sdk/src/driver/tcp.rs b/core/sdk/src/driver/tcp.rs index 818f226b..75046a96 100644 --- a/core/sdk/src/driver/tcp.rs +++ b/core/sdk/src/driver/tcp.rs @@ -1,42 +1,56 @@ -use std::{io::Cursor, sync::Arc}; +use std::{io::{Cursor, IoSlice}, sync::Arc}; use bytes::{Buf, Bytes, BytesMut}; use dashmap::DashMap; use iggy_common::IggyError; use tracing::error; -use crate::{connection::{tcp::tcp::TokioTcpFactory, StreamPair}, driver::Driver, proto::{connection::{IggyCore, InboundResult}, runtime::{sync, Runtime}}}; +use crate::{ + connection::{tcp::tcp::TokioTcpFactory, StreamPair}, + driver::Driver, + proto::{ + self, connection::{feed_inbound, IggyCore, InboundResult}, runtime::{sync, Runtime} + }, +}; #[derive(Debug)] pub struct TokioTcpDriver<R> where - R: Runtime + R: Runtime, { core: Arc<sync::Mutex<IggyCore>>, rt: Arc<R>, notify: Arc<sync::Notify>, factory: Arc<TokioTcpFactory>, pending: Arc<DashMap<u64, sync::OneShotSender<Bytes>>>, + rx: flume::Receiver<(u32, Bytes, u64)>, } impl<R> TokioTcpDriver<R> where - R: Runtime + R: Runtime, { - pub fn new(core: Arc<sync::Mutex<IggyCore>>, runtime: Arc<R>, notify: Arc<sync::Notify>, factory: Arc<TokioTcpFactory>) -> Self { + pub fn new( + core: Arc<sync::Mutex<IggyCore>>, + runtime: Arc<R>, + notify: Arc<sync::Notify>, + factory: Arc<TokioTcpFactory>, + rx: flume::Receiver<(u32, Bytes, u64)>, + ) -> Self { Self { core, rt: runtime, notify, factory, pending: Arc::new(DashMap::new()), + rx, } } } impl<R> Driver for TokioTcpDriver<R> where - R: Runtime + R: Runtime, { fn start(&self) { let rt = self.rt.clone(); @@ -44,76 +58,85 @@ where let core = self.core.clone(); let factory = self.factory.clone(); let pending = self.pending.clone(); + let rx = self.rx.clone(); rt.spawn(Box::pin(async move { let mut rx_buf = BytesMut::new(); loop { - nt.notified().await; + // nt.notified().await; // TODO убирать txBuf, если не удается его прочитать и отсылать ошибку - while let Some(data) = { - let mut guard = core.lock().await; - guard.poll_transmit() - } { - if !pending.contains_key(&data.id) { - error!("Failed to get transport adapter id"); - continue; - } + match rx.recv_async().await { + Ok(data) => { + let (code, payload, id) = data; + if !pending.contains_key(&id) { + error!("Failed to get transport adapter id"); + continue; + } - let stream = match factory.stream.lock().await.clone() { - Some(s) => s, - None => { error!("Not connected"); break } - }; + let stream = match factory.stream.lock().await.clone() { + Some(s) => s, + None => { + error!("Not connected"); + break; + } + }; - if let Err(e) = stream.send_vectored(&data.as_slices()).await { - error!("Failed to send vectored: {e}"); - continue; - } + let payload_len = (payload.len() + proto::connection::REQUEST_INITIAL_BYTES_LENGTH).to_le_bytes(); + let code = code.to_le_bytes(); + let io_slices = [ + IoSlice::new(&payload_len), + IoSlice::new(&code), + IoSlice::new(&payload), + ]; + if let Err(e) = stream.send_vectored(&io_slices).await { + error!("Failed to send vectored: {e}"); + continue; + } - let init_len = core.lock().await.initial_bytes_len(); - let mut at_most = init_len; - loop { - rx_buf.reserve(at_most); + let init_len = proto::connection::REQUEST_INITIAL_BYTES_LENGTH; + let mut at_most = init_len; + loop { + rx_buf.reserve(at_most); - match stream.read_buf(&mut rx_buf).await { - Ok(0) => { - error!("EOF before header/body"); - panic!("EOF before header/body"); - break - } - Ok(n) => n, - Err(e) => { - error!("read_buf failed: {e}"); - panic!("read_buf failed: {e}"); - break - } - }; + match stream.read_buf(&mut rx_buf).await { + Ok(0) => { + error!("EOF before header/body"); + panic!("EOF before header/body"); + break; + } + Ok(n) => n, + Err(e) => { + error!("read_buf failed: {e}"); + panic!("read_buf failed: {e}"); + break; + } + }; - let inbound = { - let mut guard = core.lock().await; - guard.feed_inbound(&rx_buf[..]) - }; + let inbound = feed_inbound(&rx_buf[..]); - match inbound { - InboundResult::Need(need) => at_most = need, - InboundResult::Ready(start, end) => { - rx_buf.advance(start); - let frame = rx_buf.split_to(end - start).freeze(); - if let Some((_k, tx)) = pending.remove(&data.id) { - let _ = tx.send(frame); + match inbound { + InboundResult::Need(need) => at_most = need, + InboundResult::Ready(start, end) => { + rx_buf.advance(start); + let frame = rx_buf.split_to(end - start).freeze(); + if let Some((_k, tx)) = pending.remove(&id) { + let _ = tx.send(frame); + } + // core.try_lock().unwrap().mark_tx_done(); + at_most = init_len; + rx_buf.clear(); + break; + } + InboundResult::Error(e) => { + pending.remove(&id); + // core.lock().await.mark_tx_done(); + panic!("read_buf failed: {e}"); + break; } - core.try_lock().unwrap().mark_tx_done(); - at_most = init_len; - rx_buf.clear(); - break; - } - InboundResult::Error(e) => { - pending.remove(&data.id); - core.lock().await.mark_tx_done(); - panic!("read_buf failed: {e}"); - break; } } } + Err(e) => {} } } })); diff --git a/core/sdk/src/proto/connection.rs b/core/sdk/src/proto/connection.rs index 62e86264..509986fe 100644 --- a/core/sdk/src/proto/connection.rs +++ b/core/sdk/src/proto/connection.rs @@ -5,7 +5,7 @@ use iggy_common::{ClientState, Command, IggyDuration, IggyError, IggyErrorDiscri use std::io::IoSlice; use tracing::{error, trace}; -const REQUEST_INITIAL_BYTES_LENGTH: usize = 4; +pub const REQUEST_INITIAL_BYTES_LENGTH: usize = 4; const RESPONSE_INITIAL_BYTES_LENGTH: usize = 8; const ALREADY_EXISTS_STATUSES: &[u32] = &[ IggyErrorDiscriminants::TopicIdAlreadyExists as u32, @@ -183,7 +183,7 @@ impl IggyCore { RESPONSE_INITIAL_BYTES_LENGTH } - pub fn feed_inbound(&mut self, cur: &[u8]) -> InboundResult { + pub fn feed_inbound(&self, cur: &[u8]) -> InboundResult { let buf_len = cur.len(); if buf_len < RESPONSE_INITIAL_BYTES_LENGTH { return InboundResult::Need(RESPONSE_INITIAL_BYTES_LENGTH - buf_len); @@ -239,3 +239,50 @@ impl IggyCore { self.state = ClientState::Disconnected; } } + +pub fn feed_inbound(cur: &[u8]) -> InboundResult { + let buf_len = cur.len(); + if buf_len < RESPONSE_INITIAL_BYTES_LENGTH { + return InboundResult::Need(RESPONSE_INITIAL_BYTES_LENGTH - buf_len); + } + + let status = match cur[..4].try_into() { + Ok(bytes) => u32::from_le_bytes(bytes), + Err(_) => return InboundResult::Error(IggyError::InvalidNumberEncoding), + }; + + let length = match cur[4..8].try_into() { + Ok(bytes) => u32::from_le_bytes(bytes), + Err(_) => return InboundResult::Error(IggyError::InvalidNumberEncoding), + }; + + if status != 0 { + if ALREADY_EXISTS_STATUSES.contains(&status) { + tracing::debug!( + "Received a server resource already exists response: {} ({})", + status, + IggyError::from_code_as_string(status) + ) + } else { + error!( + "Received an invalid response with status: {} ({}).", + status, + IggyError::from_code_as_string(status), + ); + } + return InboundResult::Error(IggyError::from_code(status)); + } + + trace!("Status: OK. Response length: {}", length); + if length <= 1 { + return InboundResult::Ready(0, 0); + } + + let total = RESPONSE_INITIAL_BYTES_LENGTH + length as usize; + if buf_len < total { + return InboundResult::Need(total - buf_len); + } + + InboundResult::Ready(8, total) +} + diff --git a/core/sdk/src/transport_adapter/async.rs b/core/sdk/src/transport_adapter/async.rs index d5602944..dfda5105 100644 --- a/core/sdk/src/transport_adapter/async.rs +++ b/core/sdk/src/transport_adapter/async.rs @@ -29,6 +29,7 @@ pub struct AsyncTransportAdapter<F: ConnectionFactory, R: Runtime, D: Driver> { id: AtomicU64, driver: Arc<D>, events: (Sender<DiagnosticEvent>, Receiver<DiagnosticEvent>), + tx: flume::Sender<(u32, Bytes, u64),> } impl<F, R, D> AsyncTransportAdapter<F, R, D> @@ -37,7 +38,7 @@ where R: Runtime + Send + Sync + 'static, D: Driver + Send + Sync, { - pub fn new(factory: Arc<F>, runtime: Arc<R>, core: Arc<sync::Mutex<IggyCore>>, driver: D, notify: Arc<Notify>) -> Self { + pub fn new(factory: Arc<F>, runtime: Arc<R>, core: Arc<sync::Mutex<IggyCore>>, driver: D, notify: Arc<Notify>, tx: flume::Sender<(u32, Bytes, u64)>) -> Self { driver.start(); Self { factory: factory, @@ -47,6 +48,7 @@ where id: AtomicU64::new(0), driver: Arc::new(driver), events: broadcast(1000), + tx, } } // async fn send_with_response<T: Command>(&self, command: &T) -> Result<RespFut, IggyError> { @@ -142,9 +144,10 @@ where let (tx, rx) = runtime::oneshot::<Bytes>(); let current_id = self.id.fetch_add(1, std::sync::atomic::Ordering::SeqCst); - self.core.lock().await.write(code, payload, current_id)?; + // self.core.lock().await.write(code, payload, current_id)?; self.driver.register(current_id, tx); - self.notify.notify_waiters(); + self.tx.send_async((code, payload, current_id)).await; + // self.notify.notify_waiters(); let resp = RespFut{rx}; resp.await
