This is an automated email from the ASF dual-hosted git repository.

bashirbekov pushed a commit to branch draft-sans
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/draft-sans by this push:
     new b9e15339 del
b9e15339 is described below

commit b9e1533989cffe7b90fc87e313614385e5338bef
Author: haze518 <[email protected]>
AuthorDate: Thu Jul 10 07:23:07 2025 +0600

    del
---
 core/sdk/src/connection/mod.rs          | 15 ++++++++++++++-
 core/sdk/src/connection/quic/mod.rs     | 26 ++++++++++++--------------
 core/sdk/src/connection/tcp/mod.rs      | 13 +++++++++++++
 core/sdk/src/driver/mod.rs              | 29 ++++++++++++++---------------
 core/sdk/src/proto/connection.rs        |  7 +++++--
 core/sdk/src/transport_adapter/async.rs |  2 +-
 6 files changed, 59 insertions(+), 33 deletions(-)

diff --git a/core/sdk/src/connection/mod.rs b/core/sdk/src/connection/mod.rs
index d65a1318..f6e06522 100644
--- a/core/sdk/src/connection/mod.rs
+++ b/core/sdk/src/connection/mod.rs
@@ -1,5 +1,6 @@
-use std::pin::Pin;
+use std::{io::IoSlice, pin::Pin};
 
+use bytes::Bytes;
 use iggy_common::IggyError;
 
 pub mod tcp;
@@ -10,3 +11,15 @@ pub trait ConnectionFactory {
     fn is_alive(&self) -> Pin<Box<dyn Future<Output = bool>>>;
     fn shutdown(&self) -> Pin<Box<dyn Future<Output = Result<(), IggyError>> + 
Send + Sync>>;
 }
+
+pub trait StreamConnectionFactory: ConnectionFactory {
+    type Stream: StreamPair;
+
+    fn open_stream(&self) -> Pin<Box<dyn Future<Output = Result<Self::Stream, 
IggyError>> + Send + '_>>;
+}
+
+pub trait StreamPair: Send {
+    fn send_vectored<'a>(&'a mut self, bufs: &'a [IoSlice<'_>]) -> Pin<Box<dyn 
Future<Output = Result<(), IggyError>> + Send + 'a>>;
+    fn read_chunk<'a>(&'a mut self, at_most: usize) -> Pin<Box<dyn 
Future<Output = Result<Option<Bytes>, IggyError>> + Send + 'a>>;
+    fn read_buf<'a>(&'a mut self, buf: &'a mut [u8]) -> Pin<Box<dyn 
Future<Output = Result<usize, IggyError>> + Send + 'a>>;
+}
diff --git a/core/sdk/src/connection/quic/mod.rs 
b/core/sdk/src/connection/quic/mod.rs
index 422e7f38..647c8531 100644
--- a/core/sdk/src/connection/quic/mod.rs
+++ b/core/sdk/src/connection/quic/mod.rs
@@ -4,26 +4,15 @@ use std::{io, net::SocketAddr, pin::Pin, time::Duration};
 use bytes::Bytes;
 use iggy_common::{IggyError, QuicClientConfig};
 use rustls::crypto::CryptoProvider;
-use tokio::io::AsyncWriteExt;
+use tokio::io::{AsyncReadExt, AsyncWriteExt};
 use tracing::{error, warn};
-use crate::connection::ConnectionFactory;
+use crate::connection::{ConnectionFactory, StreamConnectionFactory, 
StreamPair};
 use crate::proto::runtime::sync;
 use crate::quic::skip_server_verification::SkipServerVerification;
 
 use quinn::crypto::rustls::QuicClientConfig as QuinnQuicClientConfig;
 use quinn::{ClientConfig, Connection, Endpoint, IdleTimeout, RecvStream, 
SendStream, VarInt};
 
-pub trait StreamPair: Send {
-    fn send_vectored<'a>(&'a mut self, bufs: &'a [IoSlice<'_>]) -> Pin<Box<dyn 
Future<Output = Result<(), IggyError>> + Send + 'a>>;
-    fn read_chunk<'a>(&'a mut self, at_most: usize) -> Pin<Box<dyn 
Future<Output = Result<Option<Bytes>, IggyError>> + Send + 'a>>;
-}
-
-pub trait QuicFactory: ConnectionFactory {
-    type Stream: StreamPair;
-
-    fn open_stream(&self) -> Pin<Box<dyn Future<Output = Result<Self::Stream, 
IggyError>> + Send + '_>>;
-}
-
 pub struct QuinnStreamPair {
     send: SendStream,
     recv: RecvStream,
@@ -56,6 +45,15 @@ impl StreamPair for QuinnStreamPair {
             Ok(None)
         })
     }
+
+    fn read_buf<'a>(&'a mut self, mut buf: &'a mut [u8]) -> Pin<Box<dyn 
Future<Output = Result<usize, IggyError>> + Send + 'a>> {
+        Box::pin(async move {
+            self.recv.read_buf(&mut buf).await.map_err(|e| {
+                error!("Failed to read chunk: {e}");
+                IggyError::QuicError
+            })
+        })
+    }
 }
 
 pub struct QuinnFactory {
@@ -109,7 +107,7 @@ impl ConnectionFactory for QuinnFactory {
     }
 }
 
-impl QuicFactory for QuinnFactory {
+impl StreamConnectionFactory for QuinnFactory {
     type Stream = QuinnStreamPair;
 
     fn open_stream(&self) -> Pin<Box<dyn Future<Output = Result<Self::Stream, 
IggyError>> + Send + '_>> {
diff --git a/core/sdk/src/connection/tcp/mod.rs 
b/core/sdk/src/connection/tcp/mod.rs
index f4774631..c4f77003 100644
--- a/core/sdk/src/connection/tcp/mod.rs
+++ b/core/sdk/src/connection/tcp/mod.rs
@@ -3,6 +3,9 @@ pub mod tls;
 
 use std::{io, net::SocketAddr, pin::Pin};
 use futures::{AsyncRead, AsyncWrite};
+use tokio::{io::{AsyncReadExt, BufReader}, net::tcp::OwnedReadHalf};
+
+use crate::connection::StreamPair;
 
 pub trait AsyncStream: AsyncRead + AsyncWrite + Unpin + Send + 'static {}
 impl<T> AsyncStream for T where T: AsyncRead + AsyncWrite + Unpin + Send + 
'static {}
@@ -13,3 +16,13 @@ pub trait SocketFactory {
     fn connect(&self) -> Pin<Box<dyn Future<Output = io::Result<Self::Stream>> 
+ Send>>;
 }
 
+pub struct TokioTcpStream {
+    reader: BufReader<OwnedReadHalf>,
+    buf: 
+}
+
+impl StreamPair for TokioTcpStream {
+    fn read_chunk<'a>(&'a mut self, at_most: usize) -> Pin<Box<dyn 
Future<Output = Result<Option<bytes::Bytes>, iggy_common::IggyError>> + Send + 
'a>> {
+
+    }
+}
diff --git a/core/sdk/src/driver/mod.rs b/core/sdk/src/driver/mod.rs
index 114a12f7..1d794f53 100644
--- a/core/sdk/src/driver/mod.rs
+++ b/core/sdk/src/driver/mod.rs
@@ -1,13 +1,14 @@
 use std::sync::Arc;
 
-use bytes::Bytes;
+use bytes::{Bytes, BytesMut};
 use dashmap::DashMap;
 use iggy_common::{IggyError, QuicClientConfig};
 use tokio::io::AsyncWriteExt;
 use tracing::{error, info, trace, warn};
 
 use crate::{
-    connection::quic::{QuicFactory, QuinnFactory, StreamPair},
+    connection::{StreamConnectionFactory, StreamPair},
+    connection::quic::QuinnFactory,
     proto::{
         connection::{IggyCore, InboundResult},
         runtime::{Runtime, sync},
@@ -43,6 +44,7 @@ where
         let cfg = self.config.clone();
         let pending = self.pending.clone();
         rt.spawn(Box::pin(async move {
+            let mut rx_buf = BytesMut::with_capacity(cfg.response_buffer_size 
as usize);
             loop {
                 nt.notified().await;
 
@@ -70,31 +72,28 @@ where
 
                     let mut at_most = cfg.response_buffer_size as usize;
                     loop {
-                        let buffer = match stream.read_chunk(at_most).await {
-                            Ok(Some(buf)) => buf,
-                            Ok(None) => {
-                                error!("Unexpected EOF in stream");
-                                break;
-                            }
-                            Err(e) => {
-                                error!("Failed to read response data: {e}");
-                                break;
-                            }
+                        rx_buf.reserve(at_most);
+
+                        match stream.read_buf(&mut rx_buf).await {
+                            Ok(0)   => { error!("EOF before header/body"); 
break }
+                            Ok(n)   => n,
+                            Err(e)  => { error!("read_buf failed: {e}");   
break }
                         };
 
                         let inbound = {
                             let mut guard = core.lock().await;
-                            guard.feed_inbound(&buffer)
+                            guard.feed_inbound(&rx_buf[..])
                         };
 
                         match inbound {
                             InboundResult::Need(need) => at_most = need,
-                            InboundResult::Response(r) => {
+                            InboundResult::Ready => {
                                 if let Some((_key, tx)) = 
pending.remove(&data.id) {
-                                    let _ = tx.send(r);
+                                    let _ = tx.send(rx_buf);
                                 }
                                 let mut guard = core.lock().await;
                                 guard.mark_tx_done();
+                                rx_buf.clear();
                                 break;
                             }
                             InboundResult::Error(e) => {
diff --git a/core/sdk/src/proto/connection.rs b/core/sdk/src/proto/connection.rs
index 19c04ebb..d09a6fc9 100644
--- a/core/sdk/src/proto/connection.rs
+++ b/core/sdk/src/proto/connection.rs
@@ -1,6 +1,6 @@
-use std::{collections::VecDeque, pin::Pin, sync::Arc};
+use std::{collections::VecDeque, io::Cursor, pin::Pin, sync::Arc};
 
-use bytes::{Bytes, BytesMut};
+use bytes::{BufMut, Bytes, BytesMut};
 use iggy_common::{ClientState, Command, IggyDuration, IggyError, 
IggyErrorDiscriminants, IggyTimestamp};
 use std::io::IoSlice;
 use tracing::{error, trace};
@@ -160,6 +160,9 @@ impl IggyCore {
         self.current_tx = None
     }
 
+    // TODO делоает копию сейчас
+    // нужно сделать так, чтобы оно либо говорило с какого индекса по какой 
payload в ready статусе и при этом убрать rx_buf
+    // либо должно копить у себя и передавать свой буфер наверх, который будет 
заполняться
     pub fn feed_inbound(&mut self, bytes: &[u8]) -> InboundResult {
         if self.rx_buf.len() < RESPONSE_INITIAL_BYTES_LENGTH {
             let need = RESPONSE_INITIAL_BYTES_LENGTH - self.rx_buf.len();
diff --git a/core/sdk/src/transport_adapter/async.rs 
b/core/sdk/src/transport_adapter/async.rs
index e0a90446..33b0a27d 100644
--- a/core/sdk/src/transport_adapter/async.rs
+++ b/core/sdk/src/transport_adapter/async.rs
@@ -10,7 +10,7 @@ use tokio::sync::Notify;
 use tracing::{error, trace};
 
 use crate::{
-    connection::{quic::QuicFactory, ConnectionFactory},
+    connection::ConnectionFactory,
     driver::Driver,
     proto::{
         connection::{IggyCore, Order},

Reply via email to