This is an automated email from the ASF dual-hosted git repository.
bashirbekov pushed a commit to branch draft-sans
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/draft-sans by this push:
new b1d6fa54 test
b1d6fa54 is described below
commit b1d6fa54bb71b627bcdec7fe6049e84affc0b9d2
Author: haze518 <[email protected]>
AuthorDate: Tue Jul 15 06:19:56 2025 +0600
test
---
Cargo.lock | 1 +
core/integration/Cargo.toml | 1 +
core/integration/src/tcp_client.rs | 8 +-
core/integration/tests/sdk/producer/mod.rs | 5 +-
core/sdk/src/connection/tcp/mod.rs | 2 +-
core/sdk/src/driver/tcp.rs | 145 +++++++++++++++++------------
core/sdk/src/proto/connection.rs | 51 +++++++++-
core/sdk/src/transport_adapter/async.rs | 9 +-
8 files changed, 151 insertions(+), 71 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index 7bb6fafe..c9f41db2 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -4059,6 +4059,7 @@ dependencies = [
"ctor",
"derive_more 2.0.1",
"env_logger",
+ "flume",
"futures",
"humantime",
"iggy",
diff --git a/core/integration/Cargo.toml b/core/integration/Cargo.toml
index b17d3d0b..673ff64a 100644
--- a/core/integration/Cargo.toml
+++ b/core/integration/Cargo.toml
@@ -35,6 +35,7 @@ chrono = { workspace = true }
ctor = "0.4.2"
derive_more = { workspace = true }
env_logger = { workspace = true }
+flume.workspace = true
futures = { workspace = true }
humantime = { workspace = true }
iggy = { workspace = true }
diff --git a/core/integration/src/tcp_client.rs
b/core/integration/src/tcp_client.rs
index 4a5f5491..96b90134 100644
--- a/core/integration/src/tcp_client.rs
+++ b/core/integration/src/tcp_client.rs
@@ -18,6 +18,7 @@
use crate::test_server::{ClientFactory, Transport};
use async_trait::async_trait;
+use bytes::Bytes;
use iggy::{connection::tcp::tcp::TokioTcpFactory, driver::tcp::TokioTcpDriver,
prelude::{Client, IggyClient, TcpClient, TcpClientConfig},
proto::{connection::{IggyCore, IggyCoreConfig}, runtime::{sync, TokioRuntime}},
transport_adapter::r#async::AsyncTransportAdapter};
use std::sync::Arc;
@@ -40,8 +41,11 @@ impl ClientFactory for TcpClientFactory {
let core =
Arc::new(sync::Mutex::new(IggyCore::new(IggyCoreConfig::default())));
let rt = Arc::new(TokioRuntime{});
let notify = Arc::new(sync::Notify::new());
- let dirver = TokioTcpDriver::new(core.clone(), rt.clone(),
notify.clone(), tcp_factory.clone());
- let adapter = Box::new(AsyncTransportAdapter::new(tcp_factory, rt,
core, dirver, notify));
+
+ let (tx, rx) = flume::bounded::<(u32, Bytes, u64)>(1);
+
+ let dirver = TokioTcpDriver::new(core.clone(), rt.clone(),
notify.clone(), tcp_factory.clone(), rx);
+ let adapter = Box::new(AsyncTransportAdapter::new(tcp_factory, rt,
core, dirver, notify, tx));
let client = IggyClient::create(adapter, None, None);
diff --git a/core/integration/tests/sdk/producer/mod.rs
b/core/integration/tests/sdk/producer/mod.rs
index 7834d903..a90f1948 100644
--- a/core/integration/tests/sdk/producer/mod.rs
+++ b/core/integration/tests/sdk/producer/mod.rs
@@ -88,8 +88,9 @@ async fn test_async_send() {
let core =
Arc::new(sync::Mutex::new(IggyCore::new(IggyCoreConfig::default())));
let rt: Arc<TokioRuntime> = Arc::new(TokioRuntime{});
let notify = Arc::new(sync::Notify::new());
- let dirver = TokioTcpDriver::new(core.clone(), rt.clone(), notify.clone(),
tcp_factory.clone());
- let adapter = Box::new(AsyncTransportAdapter::new(tcp_factory, rt, core,
dirver, notify));
+ let (tx, rx) = flume::bounded::<(u32, Bytes, u64)>(1);
+ let dirver = TokioTcpDriver::new(core.clone(), rt.clone(), notify.clone(),
tcp_factory.clone(), rx);
+ let adapter = Box::new(AsyncTransportAdapter::new(tcp_factory, rt, core,
dirver, notify, tx));
let client = IggyClient::create(adapter, None, None);
diff --git a/core/sdk/src/connection/tcp/mod.rs
b/core/sdk/src/connection/tcp/mod.rs
index 5f733990..3a6a15d8 100644
--- a/core/sdk/src/connection/tcp/mod.rs
+++ b/core/sdk/src/connection/tcp/mod.rs
@@ -36,7 +36,7 @@ impl StreamPair for TokioTcpStream {
Box::pin(async move {
let mut w = self.writer.lock().await;
w.write_vectored(bufs).await.map_err(|_| IggyError::TcpError)?;
- // w.flush().await.map_err(|_| IggyError::TcpError)?;
+ w.flush().await.map_err(|_| IggyError::TcpError)?;
Ok(())
// for val in bufs {
// self.writer.write(val).await.map_err(|e| {
diff --git a/core/sdk/src/driver/tcp.rs b/core/sdk/src/driver/tcp.rs
index 818f226b..75046a96 100644
--- a/core/sdk/src/driver/tcp.rs
+++ b/core/sdk/src/driver/tcp.rs
@@ -1,42 +1,56 @@
-use std::{io::Cursor, sync::Arc};
+use std::{io::{Cursor, IoSlice}, sync::Arc};
use bytes::{Buf, Bytes, BytesMut};
use dashmap::DashMap;
use iggy_common::IggyError;
use tracing::error;
-use crate::{connection::{tcp::tcp::TokioTcpFactory, StreamPair},
driver::Driver, proto::{connection::{IggyCore, InboundResult}, runtime::{sync,
Runtime}}};
+use crate::{
+ connection::{tcp::tcp::TokioTcpFactory, StreamPair},
+ driver::Driver,
+ proto::{
+ self, connection::{feed_inbound, IggyCore, InboundResult},
runtime::{sync, Runtime}
+ },
+};
#[derive(Debug)]
pub struct TokioTcpDriver<R>
where
- R: Runtime
+ R: Runtime,
{
core: Arc<sync::Mutex<IggyCore>>,
rt: Arc<R>,
notify: Arc<sync::Notify>,
factory: Arc<TokioTcpFactory>,
pending: Arc<DashMap<u64, sync::OneShotSender<Bytes>>>,
+ rx: flume::Receiver<(u32, Bytes, u64)>,
}
impl<R> TokioTcpDriver<R>
where
- R: Runtime
+ R: Runtime,
{
- pub fn new(core: Arc<sync::Mutex<IggyCore>>, runtime: Arc<R>, notify:
Arc<sync::Notify>, factory: Arc<TokioTcpFactory>) -> Self {
+ pub fn new(
+ core: Arc<sync::Mutex<IggyCore>>,
+ runtime: Arc<R>,
+ notify: Arc<sync::Notify>,
+ factory: Arc<TokioTcpFactory>,
+ rx: flume::Receiver<(u32, Bytes, u64)>,
+ ) -> Self {
Self {
core,
rt: runtime,
notify,
factory,
pending: Arc::new(DashMap::new()),
+ rx,
}
}
}
impl<R> Driver for TokioTcpDriver<R>
where
- R: Runtime
+ R: Runtime,
{
fn start(&self) {
let rt = self.rt.clone();
@@ -44,76 +58,85 @@ where
let core = self.core.clone();
let factory = self.factory.clone();
let pending = self.pending.clone();
+ let rx = self.rx.clone();
rt.spawn(Box::pin(async move {
let mut rx_buf = BytesMut::new();
loop {
- nt.notified().await;
+ // nt.notified().await;
// TODO убирать txBuf, если не удается его прочитать и
отсылать ошибку
- while let Some(data) = {
- let mut guard = core.lock().await;
- guard.poll_transmit()
- } {
- if !pending.contains_key(&data.id) {
- error!("Failed to get transport adapter id");
- continue;
- }
+ match rx.recv_async().await {
+ Ok(data) => {
+ let (code, payload, id) = data;
+ if !pending.contains_key(&id) {
+ error!("Failed to get transport adapter id");
+ continue;
+ }
- let stream = match factory.stream.lock().await.clone() {
- Some(s) => s,
- None => { error!("Not connected"); break }
- };
+ let stream = match factory.stream.lock().await.clone()
{
+ Some(s) => s,
+ None => {
+ error!("Not connected");
+ break;
+ }
+ };
- if let Err(e) =
stream.send_vectored(&data.as_slices()).await {
- error!("Failed to send vectored: {e}");
- continue;
- }
+ let payload_len = (payload.len() +
proto::connection::REQUEST_INITIAL_BYTES_LENGTH).to_le_bytes();
+ let code = code.to_le_bytes();
+ let io_slices = [
+ IoSlice::new(&payload_len),
+ IoSlice::new(&code),
+ IoSlice::new(&payload),
+ ];
+ if let Err(e) = stream.send_vectored(&io_slices).await
{
+ error!("Failed to send vectored: {e}");
+ continue;
+ }
- let init_len = core.lock().await.initial_bytes_len();
- let mut at_most = init_len;
- loop {
- rx_buf.reserve(at_most);
+ let init_len =
proto::connection::REQUEST_INITIAL_BYTES_LENGTH;
+ let mut at_most = init_len;
+ loop {
+ rx_buf.reserve(at_most);
- match stream.read_buf(&mut rx_buf).await {
- Ok(0) => {
- error!("EOF before header/body");
- panic!("EOF before header/body");
- break
- }
- Ok(n) => n,
- Err(e) => {
- error!("read_buf failed: {e}");
- panic!("read_buf failed: {e}");
- break
- }
- };
+ match stream.read_buf(&mut rx_buf).await {
+ Ok(0) => {
+ error!("EOF before header/body");
+ panic!("EOF before header/body");
+ break;
+ }
+ Ok(n) => n,
+ Err(e) => {
+ error!("read_buf failed: {e}");
+ panic!("read_buf failed: {e}");
+ break;
+ }
+ };
- let inbound = {
- let mut guard = core.lock().await;
- guard.feed_inbound(&rx_buf[..])
- };
+ let inbound = feed_inbound(&rx_buf[..]);
- match inbound {
- InboundResult::Need(need) => at_most = need,
- InboundResult::Ready(start, end) => {
- rx_buf.advance(start);
- let frame = rx_buf.split_to(end -
start).freeze();
- if let Some((_k, tx)) =
pending.remove(&data.id) {
- let _ = tx.send(frame);
+ match inbound {
+ InboundResult::Need(need) => at_most = need,
+ InboundResult::Ready(start, end) => {
+ rx_buf.advance(start);
+ let frame = rx_buf.split_to(end -
start).freeze();
+ if let Some((_k, tx)) =
pending.remove(&id) {
+ let _ = tx.send(frame);
+ }
+ // core.try_lock().unwrap().mark_tx_done();
+ at_most = init_len;
+ rx_buf.clear();
+ break;
+ }
+ InboundResult::Error(e) => {
+ pending.remove(&id);
+ // core.lock().await.mark_tx_done();
+ panic!("read_buf failed: {e}");
+ break;
}
- core.try_lock().unwrap().mark_tx_done();
- at_most = init_len;
- rx_buf.clear();
- break;
- }
- InboundResult::Error(e) => {
- pending.remove(&data.id);
- core.lock().await.mark_tx_done();
- panic!("read_buf failed: {e}");
- break;
}
}
}
+ Err(e) => {}
}
}
}));
diff --git a/core/sdk/src/proto/connection.rs b/core/sdk/src/proto/connection.rs
index 62e86264..509986fe 100644
--- a/core/sdk/src/proto/connection.rs
+++ b/core/sdk/src/proto/connection.rs
@@ -5,7 +5,7 @@ use iggy_common::{ClientState, Command, IggyDuration,
IggyError, IggyErrorDiscri
use std::io::IoSlice;
use tracing::{error, trace};
-const REQUEST_INITIAL_BYTES_LENGTH: usize = 4;
+pub const REQUEST_INITIAL_BYTES_LENGTH: usize = 4;
const RESPONSE_INITIAL_BYTES_LENGTH: usize = 8;
const ALREADY_EXISTS_STATUSES: &[u32] = &[
IggyErrorDiscriminants::TopicIdAlreadyExists as u32,
@@ -183,7 +183,7 @@ impl IggyCore {
RESPONSE_INITIAL_BYTES_LENGTH
}
- pub fn feed_inbound(&mut self, cur: &[u8]) -> InboundResult {
+ pub fn feed_inbound(&self, cur: &[u8]) -> InboundResult {
let buf_len = cur.len();
if buf_len < RESPONSE_INITIAL_BYTES_LENGTH {
return InboundResult::Need(RESPONSE_INITIAL_BYTES_LENGTH -
buf_len);
@@ -239,3 +239,50 @@ impl IggyCore {
self.state = ClientState::Disconnected;
}
}
+
+pub fn feed_inbound(cur: &[u8]) -> InboundResult {
+ let buf_len = cur.len();
+ if buf_len < RESPONSE_INITIAL_BYTES_LENGTH {
+ return InboundResult::Need(RESPONSE_INITIAL_BYTES_LENGTH - buf_len);
+ }
+
+ let status = match cur[..4].try_into() {
+ Ok(bytes) => u32::from_le_bytes(bytes),
+ Err(_) => return
InboundResult::Error(IggyError::InvalidNumberEncoding),
+ };
+
+ let length = match cur[4..8].try_into() {
+ Ok(bytes) => u32::from_le_bytes(bytes),
+ Err(_) => return
InboundResult::Error(IggyError::InvalidNumberEncoding),
+ };
+
+ if status != 0 {
+ if ALREADY_EXISTS_STATUSES.contains(&status) {
+ tracing::debug!(
+ "Received a server resource already exists response: {} ({})",
+ status,
+ IggyError::from_code_as_string(status)
+ )
+ } else {
+ error!(
+ "Received an invalid response with status: {} ({}).",
+ status,
+ IggyError::from_code_as_string(status),
+ );
+ }
+ return InboundResult::Error(IggyError::from_code(status));
+ }
+
+ trace!("Status: OK. Response length: {}", length);
+ if length <= 1 {
+ return InboundResult::Ready(0, 0);
+ }
+
+ let total = RESPONSE_INITIAL_BYTES_LENGTH + length as usize;
+ if buf_len < total {
+ return InboundResult::Need(total - buf_len);
+ }
+
+ InboundResult::Ready(8, total)
+}
+
diff --git a/core/sdk/src/transport_adapter/async.rs
b/core/sdk/src/transport_adapter/async.rs
index d5602944..dfda5105 100644
--- a/core/sdk/src/transport_adapter/async.rs
+++ b/core/sdk/src/transport_adapter/async.rs
@@ -29,6 +29,7 @@ pub struct AsyncTransportAdapter<F: ConnectionFactory, R:
Runtime, D: Driver> {
id: AtomicU64,
driver: Arc<D>,
events: (Sender<DiagnosticEvent>, Receiver<DiagnosticEvent>),
+ tx: flume::Sender<(u32, Bytes, u64),>
}
impl<F, R, D> AsyncTransportAdapter<F, R, D>
@@ -37,7 +38,7 @@ where
R: Runtime + Send + Sync + 'static,
D: Driver + Send + Sync,
{
- pub fn new(factory: Arc<F>, runtime: Arc<R>, core:
Arc<sync::Mutex<IggyCore>>, driver: D, notify: Arc<Notify>) -> Self {
+ pub fn new(factory: Arc<F>, runtime: Arc<R>, core:
Arc<sync::Mutex<IggyCore>>, driver: D, notify: Arc<Notify>, tx:
flume::Sender<(u32, Bytes, u64)>) -> Self {
driver.start();
Self {
factory: factory,
@@ -47,6 +48,7 @@ where
id: AtomicU64::new(0),
driver: Arc::new(driver),
events: broadcast(1000),
+ tx,
}
}
// async fn send_with_response<T: Command>(&self, command: &T) ->
Result<RespFut, IggyError> {
@@ -142,9 +144,10 @@ where
let (tx, rx) = runtime::oneshot::<Bytes>();
let current_id = self.id.fetch_add(1,
std::sync::atomic::Ordering::SeqCst);
- self.core.lock().await.write(code, payload, current_id)?;
+ // self.core.lock().await.write(code, payload, current_id)?;
self.driver.register(current_id, tx);
- self.notify.notify_waiters();
+ self.tx.send_async((code, payload, current_id)).await;
+ // self.notify.notify_waiters();
let resp = RespFut{rx};
resp.await