This is an automated email from the ASF dual-hosted git repository.

bashirbekov pushed a commit to branch draft-sans
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/draft-sans by this push:
     new a9252e1b test
a9252e1b is described below

commit a9252e1b4c473f289216173396df358c27d10351
Author: haze518 <[email protected]>
AuthorDate: Mon Jul 14 11:00:31 2025 +0600

    test
---
 core/integration/tests/sdk/producer/mod.rs |  6 +++---
 core/sdk/src/connection/tcp/mod.rs         |  2 +-
 core/sdk/src/driver/tcp.rs                 |  8 +++-----
 core/sdk/src/proto/connection.rs           | 17 ++++++++++++-----
 4 files changed, 19 insertions(+), 14 deletions(-)

diff --git a/core/integration/tests/sdk/producer/mod.rs 
b/core/integration/tests/sdk/producer/mod.rs
index 90f9fe83..7834d903 100644
--- a/core/integration/tests/sdk/producer/mod.rs
+++ b/core/integration/tests/sdk/producer/mod.rs
@@ -76,11 +76,11 @@ async fn cleanup(system_client: &IggyClient) {
 
 #[tokio::test]
 async fn test_async_send() {
-    // let mut test_server = TestServer::default();
-    // test_server.start();
+    let mut test_server = TestServer::default();
+    test_server.start();
 
     let tcp_client_config = TcpClientConfig {
-        server_address: "127.0.0.1:8090".to_string(),
+        server_address: test_server.get_raw_tcp_addr().unwrap(),
         ..TcpClientConfig::default()
     };
     
diff --git a/core/sdk/src/connection/tcp/mod.rs 
b/core/sdk/src/connection/tcp/mod.rs
index 3a6a15d8..5f733990 100644
--- a/core/sdk/src/connection/tcp/mod.rs
+++ b/core/sdk/src/connection/tcp/mod.rs
@@ -36,7 +36,7 @@ impl StreamPair for TokioTcpStream {
         Box::pin(async move {
             let mut w = self.writer.lock().await;
             w.write_vectored(bufs).await.map_err(|_| IggyError::TcpError)?;
-            w.flush().await.map_err(|_| IggyError::TcpError)?;
+            // w.flush().await.map_err(|_| IggyError::TcpError)?;
             Ok(())
             // for val in bufs {
             //     self.writer.write(val).await.map_err(|e| {
diff --git a/core/sdk/src/driver/tcp.rs b/core/sdk/src/driver/tcp.rs
index 0e1eec26..818f226b 100644
--- a/core/sdk/src/driver/tcp.rs
+++ b/core/sdk/src/driver/tcp.rs
@@ -1,6 +1,6 @@
 use std::{io::Cursor, sync::Arc};
 
-use bytes::{Bytes, BytesMut};
+use bytes::{Buf, Bytes, BytesMut};
 use dashmap::DashMap;
 use iggy_common::IggyError;
 use tracing::error;
@@ -88,17 +88,15 @@ where
                             }
                         };
 
-                        let buf = Cursor::new(&rx_buf[..]);
-
                         let inbound = {
                             let mut guard = core.lock().await;
-                            guard.feed_inbound(buf)
+                            guard.feed_inbound(&rx_buf[..])
                         };
 
                         match inbound {
                             InboundResult::Need(need) => at_most = need,
                             InboundResult::Ready(start, end) => {
-                                let _ = rx_buf.split_to(start);
+                                rx_buf.advance(start);
                                 let frame = rx_buf.split_to(end - 
start).freeze();
                                 if let Some((_k, tx)) = 
pending.remove(&data.id) {
                                     let _ = tx.send(frame);
diff --git a/core/sdk/src/proto/connection.rs b/core/sdk/src/proto/connection.rs
index 4da78360..62e86264 100644
--- a/core/sdk/src/proto/connection.rs
+++ b/core/sdk/src/proto/connection.rs
@@ -183,14 +183,21 @@ impl IggyCore {
         RESPONSE_INITIAL_BYTES_LENGTH
     }
 
-    pub fn feed_inbound(&mut self, mut cur: Cursor<&[u8]>) -> InboundResult {
-        let buf_len = cur.get_ref().len();
+    pub fn feed_inbound(&mut self, cur: &[u8]) -> InboundResult {
+        let buf_len = cur.len();
         if buf_len < RESPONSE_INITIAL_BYTES_LENGTH {
             return InboundResult::Need(RESPONSE_INITIAL_BYTES_LENGTH - 
buf_len);
         }
 
-        let status  = cur.get_u32_le();
-        let length  = cur.get_u32_le();
+        let status = match cur[..4].try_into() {
+            Ok(bytes) => u32::from_le_bytes(bytes),
+            Err(_) => return 
InboundResult::Error(IggyError::InvalidNumberEncoding),
+        };
+
+        let length = match cur[4..8].try_into() {
+            Ok(bytes) => u32::from_le_bytes(bytes),
+            Err(_) => return 
InboundResult::Error(IggyError::InvalidNumberEncoding),
+        };
 
         if status != 0 {
             if ALREADY_EXISTS_STATUSES.contains(&status) {
@@ -219,7 +226,7 @@ impl IggyCore {
             return InboundResult::Need(total - buf_len);
         }
 
-        InboundResult::Ready(cur.position() as usize, total)
+        InboundResult::Ready(8, total)
     }
 
     pub fn on_transport_connected(&mut self) {

Reply via email to