This is an automated email from the ASF dual-hosted git repository.

bashirbekov pushed a commit to branch draft-sans
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/draft-sans by this push:
     new b1d6fa54 test
b1d6fa54 is described below

commit b1d6fa54bb71b627bcdec7fe6049e84affc0b9d2
Author: haze518 <[email protected]>
AuthorDate: Tue Jul 15 06:19:56 2025 +0600

    test
---
 Cargo.lock                                 |   1 +
 core/integration/Cargo.toml                |   1 +
 core/integration/src/tcp_client.rs         |   8 +-
 core/integration/tests/sdk/producer/mod.rs |   5 +-
 core/sdk/src/connection/tcp/mod.rs         |   2 +-
 core/sdk/src/driver/tcp.rs                 | 145 +++++++++++++++++------------
 core/sdk/src/proto/connection.rs           |  51 +++++++++-
 core/sdk/src/transport_adapter/async.rs    |   9 +-
 8 files changed, 151 insertions(+), 71 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 7bb6fafe..c9f41db2 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -4059,6 +4059,7 @@ dependencies = [
  "ctor",
  "derive_more 2.0.1",
  "env_logger",
+ "flume",
  "futures",
  "humantime",
  "iggy",
diff --git a/core/integration/Cargo.toml b/core/integration/Cargo.toml
index b17d3d0b..673ff64a 100644
--- a/core/integration/Cargo.toml
+++ b/core/integration/Cargo.toml
@@ -35,6 +35,7 @@ chrono = { workspace = true }
 ctor = "0.4.2"
 derive_more = { workspace = true }
 env_logger = { workspace = true }
+flume.workspace = true
 futures = { workspace = true }
 humantime = { workspace = true }
 iggy = { workspace = true }
diff --git a/core/integration/src/tcp_client.rs 
b/core/integration/src/tcp_client.rs
index 4a5f5491..96b90134 100644
--- a/core/integration/src/tcp_client.rs
+++ b/core/integration/src/tcp_client.rs
@@ -18,6 +18,7 @@
 
 use crate::test_server::{ClientFactory, Transport};
 use async_trait::async_trait;
+use bytes::Bytes;
 use iggy::{connection::tcp::tcp::TokioTcpFactory, driver::tcp::TokioTcpDriver, 
prelude::{Client, IggyClient, TcpClient, TcpClientConfig}, 
proto::{connection::{IggyCore, IggyCoreConfig}, runtime::{sync, TokioRuntime}}, 
transport_adapter::r#async::AsyncTransportAdapter};
 use std::sync::Arc;
 
@@ -40,8 +41,11 @@ impl ClientFactory for TcpClientFactory {
         let core = 
Arc::new(sync::Mutex::new(IggyCore::new(IggyCoreConfig::default())));
         let rt = Arc::new(TokioRuntime{});
         let notify = Arc::new(sync::Notify::new());
-        let dirver = TokioTcpDriver::new(core.clone(), rt.clone(), 
notify.clone(), tcp_factory.clone());
-        let adapter = Box::new(AsyncTransportAdapter::new(tcp_factory, rt, 
core, dirver, notify));
+    
+        let (tx, rx) = flume::bounded::<(u32, Bytes, u64)>(1);
+
+        let dirver = TokioTcpDriver::new(core.clone(), rt.clone(), 
notify.clone(), tcp_factory.clone(), rx);
+        let adapter = Box::new(AsyncTransportAdapter::new(tcp_factory, rt, 
core, dirver, notify, tx));
 
         let client = IggyClient::create(adapter, None, None);
 
diff --git a/core/integration/tests/sdk/producer/mod.rs 
b/core/integration/tests/sdk/producer/mod.rs
index 7834d903..a90f1948 100644
--- a/core/integration/tests/sdk/producer/mod.rs
+++ b/core/integration/tests/sdk/producer/mod.rs
@@ -88,8 +88,9 @@ async fn test_async_send() {
     let core = 
Arc::new(sync::Mutex::new(IggyCore::new(IggyCoreConfig::default())));
     let rt: Arc<TokioRuntime> = Arc::new(TokioRuntime{});
     let notify = Arc::new(sync::Notify::new());
-    let dirver = TokioTcpDriver::new(core.clone(), rt.clone(), notify.clone(), 
tcp_factory.clone());
-    let adapter = Box::new(AsyncTransportAdapter::new(tcp_factory, rt, core, 
dirver, notify));
+    let (tx, rx) = flume::bounded::<(u32, Bytes, u64)>(1);
+    let dirver = TokioTcpDriver::new(core.clone(), rt.clone(), notify.clone(), 
tcp_factory.clone(), rx);
+    let adapter = Box::new(AsyncTransportAdapter::new(tcp_factory, rt, core, 
dirver, notify, tx));
 
     let client = IggyClient::create(adapter, None, None);
 
diff --git a/core/sdk/src/connection/tcp/mod.rs 
b/core/sdk/src/connection/tcp/mod.rs
index 5f733990..3a6a15d8 100644
--- a/core/sdk/src/connection/tcp/mod.rs
+++ b/core/sdk/src/connection/tcp/mod.rs
@@ -36,7 +36,7 @@ impl StreamPair for TokioTcpStream {
         Box::pin(async move {
             let mut w = self.writer.lock().await;
             w.write_vectored(bufs).await.map_err(|_| IggyError::TcpError)?;
-            // w.flush().await.map_err(|_| IggyError::TcpError)?;
+            w.flush().await.map_err(|_| IggyError::TcpError)?;
             Ok(())
             // for val in bufs {
             //     self.writer.write(val).await.map_err(|e| {
diff --git a/core/sdk/src/driver/tcp.rs b/core/sdk/src/driver/tcp.rs
index 818f226b..75046a96 100644
--- a/core/sdk/src/driver/tcp.rs
+++ b/core/sdk/src/driver/tcp.rs
@@ -1,42 +1,56 @@
-use std::{io::Cursor, sync::Arc};
+use std::{io::{Cursor, IoSlice}, sync::Arc};
 
 use bytes::{Buf, Bytes, BytesMut};
 use dashmap::DashMap;
 use iggy_common::IggyError;
 use tracing::error;
 
-use crate::{connection::{tcp::tcp::TokioTcpFactory, StreamPair}, 
driver::Driver, proto::{connection::{IggyCore, InboundResult}, runtime::{sync, 
Runtime}}};
+use crate::{
+    connection::{tcp::tcp::TokioTcpFactory, StreamPair},
+    driver::Driver,
+    proto::{
+        self, connection::{feed_inbound, IggyCore, InboundResult}, 
runtime::{sync, Runtime}
+    },
+};
 
 #[derive(Debug)]
 pub struct TokioTcpDriver<R>
 where
-    R: Runtime
+    R: Runtime,
 {
     core: Arc<sync::Mutex<IggyCore>>,
     rt: Arc<R>,
     notify: Arc<sync::Notify>,
     factory: Arc<TokioTcpFactory>,
     pending: Arc<DashMap<u64, sync::OneShotSender<Bytes>>>,
+    rx: flume::Receiver<(u32, Bytes, u64)>,
 }
 
 impl<R> TokioTcpDriver<R>
 where
-    R: Runtime
+    R: Runtime,
 {
-    pub fn new(core: Arc<sync::Mutex<IggyCore>>, runtime: Arc<R>, notify: 
Arc<sync::Notify>, factory: Arc<TokioTcpFactory>) -> Self {
+    pub fn new(
+        core: Arc<sync::Mutex<IggyCore>>,
+        runtime: Arc<R>,
+        notify: Arc<sync::Notify>,
+        factory: Arc<TokioTcpFactory>,
+        rx: flume::Receiver<(u32, Bytes, u64)>,
+    ) -> Self {
         Self {
             core,
             rt: runtime,
             notify,
             factory,
             pending: Arc::new(DashMap::new()),
+            rx,
         }
     }
 }
 
 impl<R> Driver for TokioTcpDriver<R>
 where
-    R: Runtime
+    R: Runtime,
 {
     fn start(&self) {
         let rt = self.rt.clone();
@@ -44,76 +58,85 @@ where
         let core = self.core.clone();
         let factory = self.factory.clone();
         let pending = self.pending.clone();
+        let rx = self.rx.clone();
 
         rt.spawn(Box::pin(async move {
             let mut rx_buf = BytesMut::new();
             loop {
-                nt.notified().await;
+                // nt.notified().await;
                 // TODO убирать txBuf, если не удается его прочитать и 
отсылать ошибку
-                while let Some(data) = {
-                    let mut guard = core.lock().await;
-                    guard.poll_transmit()
-                } {
-                    if !pending.contains_key(&data.id) {
-                        error!("Failed to get transport adapter id");
-                        continue;
-                    }
+                match rx.recv_async().await {
+                    Ok(data) => {
+                        let (code, payload, id) = data;
+                        if !pending.contains_key(&id) {
+                            error!("Failed to get transport adapter id");
+                            continue;
+                        }
 
-                    let stream = match factory.stream.lock().await.clone() {
-                        Some(s) => s,
-                        None => { error!("Not connected"); break }
-                    };
+                        let stream = match factory.stream.lock().await.clone() 
{
+                            Some(s) => s,
+                            None => {
+                                error!("Not connected");
+                                break;
+                            }
+                        };
 
-                    if let Err(e) = 
stream.send_vectored(&data.as_slices()).await {
-                        error!("Failed to send vectored: {e}");
-                        continue;
-                    }
+                        let payload_len = (payload.len() + 
proto::connection::REQUEST_INITIAL_BYTES_LENGTH).to_le_bytes();
+                        let code = code.to_le_bytes();
+                        let io_slices = [
+                            IoSlice::new(&payload_len),
+                            IoSlice::new(&code),
+                            IoSlice::new(&payload),
+                        ];
+                        if let Err(e) = stream.send_vectored(&io_slices).await 
{
+                            error!("Failed to send vectored: {e}");
+                            continue;
+                        }
 
-                    let init_len = core.lock().await.initial_bytes_len();
-                    let mut at_most = init_len;
-                    loop {
-                        rx_buf.reserve(at_most);
+                        let init_len = 
proto::connection::REQUEST_INITIAL_BYTES_LENGTH;
+                        let mut at_most = init_len;
+                        loop {
+                            rx_buf.reserve(at_most);
 
-                        match stream.read_buf(&mut rx_buf).await {
-                            Ok(0)   => {
-                                error!("EOF before header/body");
-                                panic!("EOF before header/body");
-                                break
-                            }
-                            Ok(n)   => n,
-                            Err(e)  => {
-                                error!("read_buf failed: {e}");
-                                panic!("read_buf failed: {e}");
-                                break
-                            }
-                        };
+                            match stream.read_buf(&mut rx_buf).await {
+                                Ok(0) => {
+                                    error!("EOF before header/body");
+                                    panic!("EOF before header/body");
+                                    break;
+                                }
+                                Ok(n) => n,
+                                Err(e) => {
+                                    error!("read_buf failed: {e}");
+                                    panic!("read_buf failed: {e}");
+                                    break;
+                                }
+                            };
 
-                        let inbound = {
-                            let mut guard = core.lock().await;
-                            guard.feed_inbound(&rx_buf[..])
-                        };
+                            let inbound = feed_inbound(&rx_buf[..]);
 
-                        match inbound {
-                            InboundResult::Need(need) => at_most = need,
-                            InboundResult::Ready(start, end) => {
-                                rx_buf.advance(start);
-                                let frame = rx_buf.split_to(end - 
start).freeze();
-                                if let Some((_k, tx)) = 
pending.remove(&data.id) {
-                                    let _ = tx.send(frame);
+                            match inbound {
+                                InboundResult::Need(need) => at_most = need,
+                                InboundResult::Ready(start, end) => {
+                                    rx_buf.advance(start);
+                                    let frame = rx_buf.split_to(end - 
start).freeze();
+                                    if let Some((_k, tx)) = 
pending.remove(&id) {
+                                        let _ = tx.send(frame);
+                                    }
+                                    // core.try_lock().unwrap().mark_tx_done();
+                                    at_most = init_len;
+                                    rx_buf.clear();
+                                    break;
+                                }
+                                InboundResult::Error(e) => {
+                                    pending.remove(&id);   
+                                    // core.lock().await.mark_tx_done();
+                                    panic!("read_buf failed: {e}");
+                                    break;
                                 }
-                                core.try_lock().unwrap().mark_tx_done();
-                                at_most = init_len;
-                                rx_buf.clear();
-                                break;
-                            }
-                            InboundResult::Error(e) => {
-                                pending.remove(&data.id);
-                                core.lock().await.mark_tx_done();
-                                panic!("read_buf failed: {e}");
-                                break;
                             }
                         }
                     }
+                    Err(e) => {}
                 }
             }
         }));
diff --git a/core/sdk/src/proto/connection.rs b/core/sdk/src/proto/connection.rs
index 62e86264..509986fe 100644
--- a/core/sdk/src/proto/connection.rs
+++ b/core/sdk/src/proto/connection.rs
@@ -5,7 +5,7 @@ use iggy_common::{ClientState, Command, IggyDuration, 
IggyError, IggyErrorDiscri
 use std::io::IoSlice;
 use tracing::{error, trace};
 
-const REQUEST_INITIAL_BYTES_LENGTH: usize = 4;
+pub const REQUEST_INITIAL_BYTES_LENGTH: usize = 4;
 const RESPONSE_INITIAL_BYTES_LENGTH: usize = 8;
 const ALREADY_EXISTS_STATUSES: &[u32] = &[
     IggyErrorDiscriminants::TopicIdAlreadyExists as u32,
@@ -183,7 +183,7 @@ impl IggyCore {
         RESPONSE_INITIAL_BYTES_LENGTH
     }
 
-    pub fn feed_inbound(&mut self, cur: &[u8]) -> InboundResult {
+    pub fn feed_inbound(&self, cur: &[u8]) -> InboundResult {
         let buf_len = cur.len();
         if buf_len < RESPONSE_INITIAL_BYTES_LENGTH {
             return InboundResult::Need(RESPONSE_INITIAL_BYTES_LENGTH - 
buf_len);
@@ -239,3 +239,50 @@ impl IggyCore {
         self.state = ClientState::Disconnected;
     }
 }
+
+pub fn feed_inbound(cur: &[u8]) -> InboundResult {
+    let buf_len = cur.len();
+    if buf_len < RESPONSE_INITIAL_BYTES_LENGTH {
+        return InboundResult::Need(RESPONSE_INITIAL_BYTES_LENGTH - buf_len);
+    }
+
+    let status = match cur[..4].try_into() {
+        Ok(bytes) => u32::from_le_bytes(bytes),
+        Err(_) => return 
InboundResult::Error(IggyError::InvalidNumberEncoding),
+    };
+
+    let length = match cur[4..8].try_into() {
+        Ok(bytes) => u32::from_le_bytes(bytes),
+        Err(_) => return 
InboundResult::Error(IggyError::InvalidNumberEncoding),
+    };
+
+    if status != 0 {
+        if ALREADY_EXISTS_STATUSES.contains(&status) {
+            tracing::debug!(
+                "Received a server resource already exists response: {} ({})",
+                status,
+                IggyError::from_code_as_string(status)
+            )
+        } else {
+            error!(
+                "Received an invalid response with status: {} ({}).",
+                status,
+                IggyError::from_code_as_string(status),
+            );
+        }
+        return InboundResult::Error(IggyError::from_code(status));
+    }
+
+    trace!("Status: OK. Response length: {}", length);
+    if length <= 1 {
+        return InboundResult::Ready(0, 0);
+    }
+
+    let total = RESPONSE_INITIAL_BYTES_LENGTH + length as usize;
+    if buf_len < total {
+        return InboundResult::Need(total - buf_len);
+    }
+
+    InboundResult::Ready(8, total)
+}
+
diff --git a/core/sdk/src/transport_adapter/async.rs 
b/core/sdk/src/transport_adapter/async.rs
index d5602944..dfda5105 100644
--- a/core/sdk/src/transport_adapter/async.rs
+++ b/core/sdk/src/transport_adapter/async.rs
@@ -29,6 +29,7 @@ pub struct AsyncTransportAdapter<F: ConnectionFactory, R: 
Runtime, D: Driver> {
     id: AtomicU64,
     driver: Arc<D>,
     events: (Sender<DiagnosticEvent>, Receiver<DiagnosticEvent>),
+    tx: flume::Sender<(u32, Bytes, u64),>
 }
 
 impl<F, R, D> AsyncTransportAdapter<F, R, D>
@@ -37,7 +38,7 @@ where
     R: Runtime + Send + Sync + 'static,
     D: Driver + Send + Sync,
 {
-    pub fn new(factory: Arc<F>, runtime: Arc<R>, core: 
Arc<sync::Mutex<IggyCore>>, driver: D, notify: Arc<Notify>) -> Self {
+    pub fn new(factory: Arc<F>, runtime: Arc<R>, core: 
Arc<sync::Mutex<IggyCore>>, driver: D, notify: Arc<Notify>, tx: 
flume::Sender<(u32, Bytes, u64)>) -> Self {
         driver.start();
         Self {
             factory: factory,
@@ -47,6 +48,7 @@ where
             id: AtomicU64::new(0),
             driver: Arc::new(driver),
             events: broadcast(1000),
+            tx,
         }
     }
     // async fn send_with_response<T: Command>(&self, command: &T) -> 
Result<RespFut, IggyError> {
@@ -142,9 +144,10 @@ where
             let (tx, rx) = runtime::oneshot::<Bytes>();
             let current_id = self.id.fetch_add(1, 
std::sync::atomic::Ordering::SeqCst);
 
-            self.core.lock().await.write(code, payload, current_id)?;
+            // self.core.lock().await.write(code, payload, current_id)?;
             self.driver.register(current_id, tx);
-            self.notify.notify_waiters();
+            self.tx.send_async((code, payload, current_id)).await;
+            // self.notify.notify_waiters();
 
             let resp = RespFut{rx};
             resp.await

Reply via email to