This is an automated email from the ASF dual-hosted git repository.

bashirbekov pushed a commit to branch draft-sans
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/draft-sans by this push:
     new d8b48b3f test
d8b48b3f is described below

commit d8b48b3f6137e71d7ce24f5038d4f4f9b93f1182
Author: haze518 <[email protected]>
AuthorDate: Sat Jul 12 19:09:07 2025 +0600

    test
---
 core/sdk/src/connection/mod.rs      |   4 +-
 core/sdk/src/connection/quic/mod.rs | 392 ++++++++++++++++++------------------
 core/sdk/src/connection/tcp/mod.rs  |  52 ++---
 core/sdk/src/connection/tcp/tcp.rs  |  16 +-
 core/sdk/src/driver/mod.rs          | 172 ++++++++--------
 core/sdk/src/driver/tcp.rs          |  23 ++-
 6 files changed, 335 insertions(+), 324 deletions(-)

diff --git a/core/sdk/src/connection/mod.rs b/core/sdk/src/connection/mod.rs
index e028ad38..64fa975f 100644
--- a/core/sdk/src/connection/mod.rs
+++ b/core/sdk/src/connection/mod.rs
@@ -19,6 +19,6 @@ pub trait StreamConnectionFactory: ConnectionFactory {
 }
 
 pub trait StreamPair: Send {
-    fn send_vectored<'a>(&'a mut self, bufs: &'a [IoSlice<'_>]) -> Pin<Box<dyn 
Future<Output = Result<(), IggyError>> + Send + 'a>>;
-    fn read_buf<'a>(&'a mut self, buf: &'a mut BytesMut) -> Pin<Box<dyn 
Future<Output = Result<usize, IggyError>> + Send + 'a>>;
+    fn send_vectored<'a>(&'a self, bufs: &'a [IoSlice<'_>]) -> Pin<Box<dyn 
Future<Output = Result<(), IggyError>> + Send + 'a>>;
+    fn read_buf<'a>(&'a self, buf: &'a mut BytesMut) -> Pin<Box<dyn 
Future<Output = Result<usize, IggyError>> + Send + 'a>>;
 }
diff --git a/core/sdk/src/connection/quic/mod.rs 
b/core/sdk/src/connection/quic/mod.rs
index 0f58b4a1..3305654e 100644
--- a/core/sdk/src/connection/quic/mod.rs
+++ b/core/sdk/src/connection/quic/mod.rs
@@ -18,199 +18,199 @@ pub struct QuinnStreamPair {
     recv: RecvStream,
 }
 
-impl StreamPair for QuinnStreamPair {
-    fn send_vectored<'a>(&'a mut self, bufs: &'a [IoSlice<'_>]) -> Pin<Box<dyn 
Future<Output = Result<(), IggyError>> + Send + 'a>> {
-        Box::pin(async move {
-            self.send.write_vectored(bufs).await.map_err(|e| {
-                error!("Failed to write vectored buffs to quic conn: {e}");
-                IggyError::QuicError
-            })?;
-            self.send.finish().map_err(|e: quinn::ClosedStream| {
-                error!("Failed to finish sending data: {e}");
-                IggyError::QuicError
-            })?;
-            Ok(())
-        })
-    }
-
-    fn read_buf<'a>(&'a mut self, mut buf: &'a mut BytesMut) -> Pin<Box<dyn 
Future<Output = Result<usize, IggyError>> + Send + 'a>> {
-        Box::pin(async move {
-            self.recv.read_buf(&mut buf).await.map_err(|e| {
-                error!("Failed to read chunk: {e}");
-                IggyError::QuicError
-            })
-        })
-    }
-}
-
-pub struct QuinnFactory {
-    config: Arc<QuicClientConfig>,
-    ep: Arc<Endpoint>,
-    connection: Arc<sync::Mutex<Option<Connection>>>,
-    server_address: SocketAddr,
-}
-
-impl ConnectionFactory for QuinnFactory {
-    fn connect(&self) -> Pin<Box<dyn Future<Output = Result<(), IggyError>> + 
Send + Sync>> {
-        let ep  = self.ep.clone();
-        let sn  = self.config.server_name.clone();
-        let sa = self.server_address.clone();
-        let conn = self.connection.clone();
-        Box::pin(async move {
-            let mut connection = conn.lock().await;
-            let connecting = ep
-                .connect(sa, &sn)
-                .map_err(|_| IggyError::CannotEstablishConnection)?;
-
-            let new_conn = connecting
-                .await
-                .map_err(|_| IggyError::CannotEstablishConnection)?;
-            let _ = connection.insert(new_conn);
-            Ok(())
-        })
-    }
-
-    fn is_alive(&self) -> Pin<Box<dyn Future<Output = bool> + Send + Sync>> {
-        let conn = self.connection.clone();
-        Box::pin(async move {
-            let conn = conn.lock().await;
-            match conn.as_ref() {
-                Some(c) => c.close_reason().is_some(),
-                None => false,
-            }
-        })
-    }
-
-    fn shutdown(&self) -> Pin<Box<dyn Future<Output = Result<(), IggyError>> + 
Send + Sync>> {
-        let conn = self.connection.clone();
-        let ep  = self.ep.clone();
-        Box::pin(async move {
-            if let Some(conn) = conn.lock().await.take() {
-                conn.close(0u32.into(), b"");
-            }
-            ep.wait_idle().await;
-            Ok(())
-        })
-    }
-}
-
-impl StreamConnectionFactory for QuinnFactory {
-    type Stream = QuinnStreamPair;
-
-    fn open_stream(&self) -> Pin<Box<dyn Future<Output = Result<Self::Stream, 
IggyError>> + Send + '_>> {
-        let conn = self.connection.clone();
-        Box::pin(async move {
-            let conn = {
-                let guard = conn.lock().await;
-                guard.clone().ok_or(IggyError::NotConnected)?
-            };
-            let (send, recv) = conn.open_bi().await.map_err(|e| {
-                error!("Failed to open a bidirectional stream: {e}");
-                IggyError::QuicError 
-            })?;
-            Ok(QuinnStreamPair { send, recv })
-        })
-    }
-}
-
-impl QuinnFactory {
-    pub fn new(config: QuicClientConfig) -> Result<Self, IggyError> {
-        let cfg = Arc::new(config);
-
-        let server_address = cfg
-            .server_address
-            .parse::<SocketAddr>()
-            .map_err(|error| {
-                error!("Invalid server address: {error}");
-                IggyError::InvalidServerAddress
-            })?;
-        let client_address = if server_address.is_ipv6()
-            && cfg.client_address == QuicClientConfig::default().client_address
-        {
-            "[::1]:0"
-        } else {
-            &cfg.client_address
-        }
-        .parse::<SocketAddr>()
-        .map_err(|error| {
-            error!("Invalid client address: {error}");
-            IggyError::InvalidClientAddress
-        })?;
-
-        let quic_config = configure(&cfg)?;
-        let endpoint = Endpoint::client(client_address);
-        if endpoint.is_err() {
-            error!("Cannot create client endpoint");
-            return Err(IggyError::CannotCreateEndpoint);
-        }
-
-        let mut endpoint = endpoint.unwrap();
-        endpoint.set_default_client_config(quic_config);
-
-        Ok(Self { config: cfg, ep: Arc::new(endpoint), server_address, 
connection: Arc::new(sync::Mutex::new(None)) })
-    }
-}
-
-fn configure(config: &QuicClientConfig) -> Result<ClientConfig, IggyError> {
-    let max_concurrent_bidi_streams = 
VarInt::try_from(config.max_concurrent_bidi_streams);
-    if max_concurrent_bidi_streams.is_err() {
-        error!(
-            "Invalid 'max_concurrent_bidi_streams': {}",
-            config.max_concurrent_bidi_streams
-        );
-        return Err(IggyError::InvalidConfiguration);
-    }
-
-    let receive_window = VarInt::try_from(config.receive_window);
-    if receive_window.is_err() {
-        error!("Invalid 'receive_window': {}", config.receive_window);
-        return Err(IggyError::InvalidConfiguration);
-    }
-
-    let mut transport = quinn::TransportConfig::default();
-    transport.initial_mtu(config.initial_mtu);
-    transport.send_window(config.send_window);
-    transport.receive_window(receive_window.unwrap());
-    transport.datagram_send_buffer_size(config.datagram_send_buffer_size as 
usize);
-    
transport.max_concurrent_bidi_streams(max_concurrent_bidi_streams.unwrap());
-    if config.keep_alive_interval > 0 {
-        
transport.keep_alive_interval(Some(Duration::from_millis(config.keep_alive_interval)));
-    }
-    if config.max_idle_timeout > 0 {
-        let max_idle_timeout =
-            
IdleTimeout::try_from(Duration::from_millis(config.max_idle_timeout));
-        if max_idle_timeout.is_err() {
-            error!("Invalid 'max_idle_timeout': {}", config.max_idle_timeout);
-            return Err(IggyError::InvalidConfiguration);
-        }
-        transport.max_idle_timeout(Some(max_idle_timeout.unwrap()));
-    }
-
-    if CryptoProvider::get_default().is_none() {
-        if let Err(e) = 
rustls::crypto::ring::default_provider().install_default() {
-            warn!(
-                "Failed to install rustls crypto provider. Error: {:?}. This 
may be normal if another thread installed it first.",
-                e
-            );
-        }
-    }
-    let mut client_config = match config.validate_certificate {
-        true => ClientConfig::with_platform_verifier(),
-        false => {
-            match QuinnQuicClientConfig::try_from(
-                rustls::ClientConfig::builder()
-                    .dangerous()
-                    
.with_custom_certificate_verifier(SkipServerVerification::new())
-                    .with_no_client_auth(),
-            ) {
-                Ok(config) => ClientConfig::new(Arc::new(config)),
-                Err(error) => {
-                    error!("Failed to create QUIC client configuration: 
{error}");
-                    return Err(IggyError::InvalidConfiguration);
-                }
-            }
-        }
-    };
-    client_config.transport_config(Arc::new(transport));
-    Ok(client_config)
-}
+// impl StreamPair for QuinnStreamPair {
+//     fn send_vectored<'a>(&'a mut self, bufs: &'a [IoSlice<'_>]) -> 
Pin<Box<dyn Future<Output = Result<(), IggyError>> + Send + 'a>> {
+//         Box::pin(async move {
+//             self.send.write_vectored(bufs).await.map_err(|e| {
+//                 error!("Failed to write vectored buffs to quic conn: {e}");
+//                 IggyError::QuicError
+//             })?;
+//             self.send.finish().map_err(|e: quinn::ClosedStream| {
+//                 error!("Failed to finish sending data: {e}");
+//                 IggyError::QuicError
+//             })?;
+//             Ok(())
+//         })
+//     }
+
+//     fn read_buf<'a>(&'a mut self, mut buf: &'a mut BytesMut) -> Pin<Box<dyn 
Future<Output = Result<usize, IggyError>> + Send + 'a>> {
+//         Box::pin(async move {
+//             self.recv.read_buf(&mut buf).await.map_err(|e| {
+//                 error!("Failed to read chunk: {e}");
+//                 IggyError::QuicError
+//             })
+//         })
+//     }
+// }
+
+// pub struct QuinnFactory {
+//     config: Arc<QuicClientConfig>,
+//     ep: Arc<Endpoint>,
+//     connection: Arc<sync::Mutex<Option<Connection>>>,
+//     server_address: SocketAddr,
+// }
+
+// impl ConnectionFactory for QuinnFactory {
+//     fn connect(&self) -> Pin<Box<dyn Future<Output = Result<(), IggyError>> 
+ Send + Sync>> {
+//         let ep  = self.ep.clone();
+//         let sn  = self.config.server_name.clone();
+//         let sa = self.server_address.clone();
+//         let conn = self.connection.clone();
+//         Box::pin(async move {
+//             let mut connection = conn.lock().await;
+//             let connecting = ep
+//                 .connect(sa, &sn)
+//                 .map_err(|_| IggyError::CannotEstablishConnection)?;
+
+//             let new_conn = connecting
+//                 .await
+//                 .map_err(|_| IggyError::CannotEstablishConnection)?;
+//             let _ = connection.insert(new_conn);
+//             Ok(())
+//         })
+//     }
+
+//     fn is_alive(&self) -> Pin<Box<dyn Future<Output = bool> + Send + Sync>> 
{
+//         let conn = self.connection.clone();
+//         Box::pin(async move {
+//             let conn = conn.lock().await;
+//             match conn.as_ref() {
+//                 Some(c) => c.close_reason().is_some(),
+//                 None => false,
+//             }
+//         })
+//     }
+
+//     fn shutdown(&self) -> Pin<Box<dyn Future<Output = Result<(), 
IggyError>> + Send + Sync>> {
+//         let conn = self.connection.clone();
+//         let ep  = self.ep.clone();
+//         Box::pin(async move {
+//             if let Some(conn) = conn.lock().await.take() {
+//                 conn.close(0u32.into(), b"");
+//             }
+//             ep.wait_idle().await;
+//             Ok(())
+//         })
+//     }
+// }
+
+// impl StreamConnectionFactory for QuinnFactory {
+//     type Stream = QuinnStreamPair;
+
+//     fn open_stream(&self) -> Pin<Box<dyn Future<Output = 
Result<Self::Stream, IggyError>> + Send + '_>> {
+//         let conn = self.connection.clone();
+//         Box::pin(async move {
+//             let conn = {
+//                 let guard = conn.lock().await;
+//                 guard.clone().ok_or(IggyError::NotConnected)?
+//             };
+//             let (send, recv) = conn.open_bi().await.map_err(|e| {
+//                 error!("Failed to open a bidirectional stream: {e}");
+//                 IggyError::QuicError 
+//             })?;
+//             Ok(QuinnStreamPair { send, recv })
+//         })
+//     }
+// }
+
+// impl QuinnFactory {
+//     pub fn new(config: QuicClientConfig) -> Result<Self, IggyError> {
+//         let cfg = Arc::new(config);
+
+//         let server_address = cfg
+//             .server_address
+//             .parse::<SocketAddr>()
+//             .map_err(|error| {
+//                 error!("Invalid server address: {error}");
+//                 IggyError::InvalidServerAddress
+//             })?;
+//         let client_address = if server_address.is_ipv6()
+//             && cfg.client_address == 
QuicClientConfig::default().client_address
+//         {
+//             "[::1]:0"
+//         } else {
+//             &cfg.client_address
+//         }
+//         .parse::<SocketAddr>()
+//         .map_err(|error| {
+//             error!("Invalid client address: {error}");
+//             IggyError::InvalidClientAddress
+//         })?;
+
+//         let quic_config = configure(&cfg)?;
+//         let endpoint = Endpoint::client(client_address);
+//         if endpoint.is_err() {
+//             error!("Cannot create client endpoint");
+//             return Err(IggyError::CannotCreateEndpoint);
+//         }
+
+//         let mut endpoint = endpoint.unwrap();
+//         endpoint.set_default_client_config(quic_config);
+
+//         Ok(Self { config: cfg, ep: Arc::new(endpoint), server_address, 
connection: Arc::new(sync::Mutex::new(None)) })
+//     }
+// }
+
+// fn configure(config: &QuicClientConfig) -> Result<ClientConfig, IggyError> {
+//     let max_concurrent_bidi_streams = 
VarInt::try_from(config.max_concurrent_bidi_streams);
+//     if max_concurrent_bidi_streams.is_err() {
+//         error!(
+//             "Invalid 'max_concurrent_bidi_streams': {}",
+//             config.max_concurrent_bidi_streams
+//         );
+//         return Err(IggyError::InvalidConfiguration);
+//     }
+
+//     let receive_window = VarInt::try_from(config.receive_window);
+//     if receive_window.is_err() {
+//         error!("Invalid 'receive_window': {}", config.receive_window);
+//         return Err(IggyError::InvalidConfiguration);
+//     }
+
+//     let mut transport = quinn::TransportConfig::default();
+//     transport.initial_mtu(config.initial_mtu);
+//     transport.send_window(config.send_window);
+//     transport.receive_window(receive_window.unwrap());
+//     transport.datagram_send_buffer_size(config.datagram_send_buffer_size as 
usize);
+//     
transport.max_concurrent_bidi_streams(max_concurrent_bidi_streams.unwrap());
+//     if config.keep_alive_interval > 0 {
+//         
transport.keep_alive_interval(Some(Duration::from_millis(config.keep_alive_interval)));
+//     }
+//     if config.max_idle_timeout > 0 {
+//         let max_idle_timeout =
+//             
IdleTimeout::try_from(Duration::from_millis(config.max_idle_timeout));
+//         if max_idle_timeout.is_err() {
+//             error!("Invalid 'max_idle_timeout': {}", 
config.max_idle_timeout);
+//             return Err(IggyError::InvalidConfiguration);
+//         }
+//         transport.max_idle_timeout(Some(max_idle_timeout.unwrap()));
+//     }
+
+//     if CryptoProvider::get_default().is_none() {
+//         if let Err(e) = 
rustls::crypto::ring::default_provider().install_default() {
+//             warn!(
+//                 "Failed to install rustls crypto provider. Error: {:?}. 
This may be normal if another thread installed it first.",
+//                 e
+//             );
+//         }
+//     }
+//     let mut client_config = match config.validate_certificate {
+//         true => ClientConfig::with_platform_verifier(),
+//         false => {
+//             match QuinnQuicClientConfig::try_from(
+//                 rustls::ClientConfig::builder()
+//                     .dangerous()
+//                     
.with_custom_certificate_verifier(SkipServerVerification::new())
+//                     .with_no_client_auth(),
+//             ) {
+//                 Ok(config) => ClientConfig::new(Arc::new(config)),
+//                 Err(error) => {
+//                     error!("Failed to create QUIC client configuration: 
{error}");
+//                     return Err(IggyError::InvalidConfiguration);
+//                 }
+//             }
+//         }
+//     };
+//     client_config.transport_config(Arc::new(transport));
+//     Ok(client_config)
+// }
diff --git a/core/sdk/src/connection/tcp/mod.rs 
b/core/sdk/src/connection/tcp/mod.rs
index 51993511..3a6a15d8 100644
--- a/core/sdk/src/connection/tcp/mod.rs
+++ b/core/sdk/src/connection/tcp/mod.rs
@@ -11,7 +11,7 @@ use tokio::{
 };
 use iggy_common::IggyError;
 
-use crate::connection::StreamPair;
+use crate::{connection::StreamPair, proto::runtime::sync};
 
 pub trait AsyncStream: AsyncRead + AsyncWrite + Unpin + Send + 'static {}
 impl<T> AsyncStream for T where T: AsyncRead + AsyncWrite + Unpin + Send + 
'static {}
@@ -24,46 +24,50 @@ pub trait SocketFactory {
 
 #[derive(Debug)]
 pub struct TokioTcpStream {
-    reader: BufReader<OwnedReadHalf>,
-    writer: BufWriter<OwnedWriteHalf>,
+    reader: sync::Mutex<BufReader<OwnedReadHalf>>,
+    writer: sync::Mutex<BufWriter<OwnedWriteHalf>>,
 }
 
 impl StreamPair for TokioTcpStream {
     fn send_vectored<'a>(
-        &'a mut self,
+        &'a self,
         bufs: &'a [io::IoSlice<'_>],
     ) -> Pin<Box<dyn Future<Output = Result<(), iggy_common::IggyError>> + 
Send + 'a>> {
         Box::pin(async move {
-            for val in bufs {
-                self.writer.write(val).await.map_err(|e| {
-                    error!(
-                        "Failed to write data to the TCP connection: {e}",
-                    );
-                    IggyError::TcpError
-                })?;
-            }
-            // self.writer.write_vectored(bufs).await.map_err(|e| {
+            let mut w = self.writer.lock().await;
+            w.write_vectored(bufs).await.map_err(|_| IggyError::TcpError)?;
+            w.flush().await.map_err(|_| IggyError::TcpError)?;
+            Ok(())
+            // for val in bufs {
+            //     self.writer.write(val).await.map_err(|e| {
+            //         error!(
+            //             "Failed to write data to the TCP connection: {e}",
+            //         );
+            //         IggyError::TcpError
+            //     })?;
+            // }
+            // // self.writer.write_vectored(bufs).await.map_err(|e| {
+            // //     error!(
+            // //         "Failed to write data to the TCP connection: {e}",
+            // //     );
+            // //     IggyError::TcpError
+            // // })?;
+            // self.writer.flush().await.map_err(|e| {
             //     error!(
             //         "Failed to write data to the TCP connection: {e}",
             //     );
             //     IggyError::TcpError
             // })?;
-            self.writer.flush().await.map_err(|e| {
-                error!(
-                    "Failed to write data to the TCP connection: {e}",
-                );
-                IggyError::TcpError
-            })?;
-            Ok(())
+            // Ok(())
         })
     }
 
     fn read_buf<'a>(
-        &'a mut self,
+        &'a self,
         buf: &'a mut BytesMut,
     ) -> Pin<Box<dyn Future<Output = Result<usize, iggy_common::IggyError>> + 
Send + 'a>> {
         Box::pin(async move {
-            self.reader.read_buf(buf).await.map_err(|e| {
+            self.reader.lock().await.read_buf(buf).await.map_err(|e| {
                 error!(
                     "Failed to read data from the TCP connection: {e}",
                 );
@@ -77,8 +81,8 @@ impl TokioTcpStream {
     fn new(stream: TcpStream) -> Self {
         let (reader, writer) = stream.into_split();
         Self {
-            reader: BufReader::new(reader),
-            writer: BufWriter::new(writer),
+            reader: sync::Mutex::new(BufReader::new(reader)),
+            writer: sync::Mutex::new(BufWriter::new(writer)),
         }
     }
 }
diff --git a/core/sdk/src/connection/tcp/tcp.rs 
b/core/sdk/src/connection/tcp/tcp.rs
index cf5757f4..70e4049b 100644
--- a/core/sdk/src/connection/tcp/tcp.rs
+++ b/core/sdk/src/connection/tcp/tcp.rs
@@ -15,7 +15,7 @@ pub type TokioCompatStream = 
tokio_util::compat::Compat<tokio::net::TcpStream>;
 pub struct TokioTcpFactory {
     pub(crate) config: Arc<TcpClientConfig>,
     client_address: Arc<sync::Mutex<Option<SocketAddr>>>,
-    pub(crate) stream: Arc<sync::Mutex<Option<TokioTcpStream>>>,
+    pub(crate) stream: Arc<sync::Mutex<Option<Arc<TokioTcpStream>>>>,
 }
 
 impl ConnectionFactory for TokioTcpFactory {
@@ -50,7 +50,7 @@ impl ConnectionFactory for TokioTcpFactory {
                 error!("Failed to set the nodelay option on the client: {e}");
             }
             // TODO add tls
-            let _ = tokio_tcp_stream.insert(TokioTcpStream::new(conn));
+            let _ = 
tokio_tcp_stream.insert(Arc::new(TokioTcpStream::new(conn)));
 
             Ok(())
         })
@@ -58,21 +58,17 @@ impl ConnectionFactory for TokioTcpFactory {
 
     // TODO пока заглушка, нужно подумать насчет того, как это делать
     fn is_alive(&self) -> std::pin::Pin<Box<dyn Future<Output = bool> + Send + 
Sync>> {
-        let conn = self.stream.clone();
+        let slot = self.stream.clone();
         Box::pin(async move {
-            let conn = conn.lock().await;
-            match conn.as_ref() {
-                Some(c) => true,
-                None => false,
-            }
+            slot.lock().await.is_some()
         })
     }
 
     fn shutdown(&self) -> std::pin::Pin<Box<dyn Future<Output = Result<(), 
IggyError>> + Send + Sync>> {
         let conn = self.stream.clone();
         Box::pin(async move {
-            if let Some(mut conn) = conn.lock().await.take() {
-                conn.writer.shutdown().await.map_err(|e| {
+            if let Some(conn) = conn.lock().await.take() {
+                conn.writer.lock().await.shutdown().await.map_err(|e| {
                     error!(
                         "Failed to shutdown the TCP connection to the TCP 
connection: {e}",
                     );
diff --git a/core/sdk/src/driver/mod.rs b/core/sdk/src/driver/mod.rs
index dd7c7dc6..d77ae183 100644
--- a/core/sdk/src/driver/mod.rs
+++ b/core/sdk/src/driver/mod.rs
@@ -10,7 +10,7 @@ use tracing::{error, info, trace, warn};
 
 use crate::{
     connection::{StreamConnectionFactory, StreamPair},
-    connection::quic::QuinnFactory,
+    // connection::quic::QuinnFactory,
     proto::{
         connection::{IggyCore, InboundResult},
         runtime::{Runtime, sync},
@@ -22,98 +22,98 @@ pub trait Driver {
     fn register(&self, id: u64, tx: sync::OneShotSender<Bytes>);
 }
 
-pub struct QuicDriver<R>
-where
-    R: Runtime,
-{
-    core: Arc<sync::Mutex<IggyCore>>,
-    rt: Arc<R>,
-    notify: Arc<sync::Notify>,
-    factory: Arc<QuinnFactory>,
-    pub(crate) config: Arc<QuicClientConfig>, // todo change to 
driverQuicConfig
-    pending: Arc<DashMap<u64, sync::OneShotSender<Bytes>>>,
-}
+// pub struct QuicDriver<R>
+// where
+//     R: Runtime,
+// {
+//     core: Arc<sync::Mutex<IggyCore>>,
+//     rt: Arc<R>,
+//     notify: Arc<sync::Notify>,
+//     factory: Arc<QuinnFactory>,
+//     pub(crate) config: Arc<QuicClientConfig>, // todo change to 
driverQuicConfig
+//     pending: Arc<DashMap<u64, sync::OneShotSender<Bytes>>>,
+// }
 
-// TODO add errChan
-impl<R> Driver for QuicDriver<R>
-where
-    R: Runtime,
-{
-    fn start(&self) {
-        let rt = self.rt.clone();
-        let nt = self.notify.clone();
-        let core = self.core.clone();
-        let q = self.factory.clone();
-        let cfg = self.config.clone();
-        let pending = self.pending.clone();
-        rt.spawn(Box::pin(async move {
-            let mut rx_buf = BytesMut::new();
-            loop {
-                nt.notified().await;
+// // TODO add errChan
+// impl<R> Driver for QuicDriver<R>
+// where
+//     R: Runtime,
+// {
+//     fn start(&self) {
+//         let rt = self.rt.clone();
+//         let nt = self.notify.clone();
+//         let core = self.core.clone();
+//         let q = self.factory.clone();
+//         let cfg = self.config.clone();
+//         let pending = self.pending.clone();
+//         rt.spawn(Box::pin(async move {
+//             let mut rx_buf = BytesMut::new();
+//             loop {
+//                 nt.notified().await;
 
-                while let Some(data) = {
-                    let mut guard = core.lock().await;
-                    guard.poll_transmit()
-                } {
-                    if !pending.contains_key(&data.id) {
-                        error!("Failed to get transport adapter id");
-                        continue;
-                    }
+//                 while let Some(data) = {
+//                     let mut guard = core.lock().await;
+//                     guard.poll_transmit()
+//                 } {
+//                     if !pending.contains_key(&data.id) {
+//                         error!("Failed to get transport adapter id");
+//                         continue;
+//                     }
 
-                    let mut stream = match q.open_stream().await {
-                        Ok(s) => s,
-                        Err(e) => {
-                            error!("Failed to open a bidirectional stream: 
{e}");
-                            continue;
-                        }
-                    };
+//                     let mut stream = match q.open_stream().await {
+//                         Ok(s) => s,
+//                         Err(e) => {
+//                             error!("Failed to open a bidirectional stream: 
{e}");
+//                             continue;
+//                         }
+//                     };
 
-                    if let Err(e) = 
stream.send_vectored(&data.as_slices()).await {
-                        error!("Failed to send vectored: {e}");
-                        break;
-                    }
+//                     if let Err(e) = 
stream.send_vectored(&data.as_slices()).await {
+//                         error!("Failed to send vectored: {e}");
+//                         break;
+//                     }
 
-                    let mut at_most = cfg.response_buffer_size as usize;
-                    loop {
-                        rx_buf.reserve(at_most);
+//                     let mut at_most = cfg.response_buffer_size as usize;
+//                     loop {
+//                         rx_buf.reserve(at_most);
 
-                        match stream.read_buf(&mut rx_buf).await {
-                            Ok(0)   => { error!("EOF before header/body"); 
break }
-                            Ok(n)   => n,
-                            Err(e)  => { error!("read_buf failed: {e}");   
break }
-                        };
+//                         match stream.read_buf(&mut rx_buf).await {
+//                             Ok(0)   => { error!("EOF before header/body"); 
break }
+//                             Ok(n)   => n,
+//                             Err(e)  => { error!("read_buf failed: {e}");   
break }
+//                         };
 
-                        let buf = Cursor::new(&rx_buf[..]);
+//                         let buf = Cursor::new(&rx_buf[..]);
 
-                        let inbound = {
-                            let mut guard = core.lock().await;
-                            guard.feed_inbound(buf)
-                        };
+//                         let inbound = {
+//                             let mut guard = core.lock().await;
+//                             guard.feed_inbound(buf)
+//                         };
 
-                        match inbound {
-                            InboundResult::Need(need) => at_most = need,
-                            InboundResult::Ready(position) => {
-                                let frame = rx_buf.split_to(position).freeze();
-                                if let Some((_k, tx)) = 
pending.remove(&data.id) {
-                                    let _ = tx.send(frame);
-                                }
-                                core.lock().await.mark_tx_done();
-                                at_most = cfg.response_buffer_size as usize;
-                                continue;
-                            }
-                            InboundResult::Error(_) => {
-                                let mut guard = core.lock().await;
-                                guard.mark_tx_done();
-                                break;
-                            }
-                        }
-                    }
-                }
-            }
-        }));
-    }
+//                         match inbound {
+//                             InboundResult::Need(need) => at_most = need,
+//                             InboundResult::Ready(position) => {
+//                                 let frame = 
rx_buf.split_to(position).freeze();
+//                                 if let Some((_k, tx)) = 
pending.remove(&data.id) {
+//                                     let _ = tx.send(frame);
+//                                 }
+//                                 core.lock().await.mark_tx_done();
+//                                 at_most = cfg.response_buffer_size as usize;
+//                                 continue;
+//                             }
+//                             InboundResult::Error(_) => {
+//                                 let mut guard = core.lock().await;
+//                                 guard.mark_tx_done();
+//                                 break;
+//                             }
+//                         }
+//                     }
+//                 }
+//             }
+//         }));
+//     }
 
-    fn register(&self, id: u64, tx: sync::OneShotSender<Bytes>) {
-        self.pending.insert(id, tx);
-    }
-}
+//     fn register(&self, id: u64, tx: sync::OneShotSender<Bytes>) {
+//         self.pending.insert(id, tx);
+//     }
+// }
diff --git a/core/sdk/src/driver/tcp.rs b/core/sdk/src/driver/tcp.rs
index 2bfabe29..0b5a5a2b 100644
--- a/core/sdk/src/driver/tcp.rs
+++ b/core/sdk/src/driver/tcp.rs
@@ -59,8 +59,10 @@ where
                         continue;
                     }
 
-                    let mut guard = factory.stream.lock().await;
-                    let stream = guard.as_mut().unwrap();
+                    let stream = match factory.stream.lock().await.clone() {
+                        Some(s) => s,
+                        None => { error!("Not connected"); break }
+                    };
 
                     if let Err(e) = 
stream.send_vectored(&data.as_slices()).await {
                         error!("Failed to send vectored: {e}");
@@ -73,9 +75,17 @@ where
                         rx_buf.reserve(at_most);
 
                         match stream.read_buf(&mut rx_buf).await {
-                            Ok(0)   => { error!("EOF before header/body"); 
break }
+                            Ok(0)   => {
+                                error!("EOF before header/body");
+                                panic!("EOF before header/body");
+                                break
+                            }
                             Ok(n)   => n,
-                            Err(e)  => { error!("read_buf failed: {e}");   
break }
+                            Err(e)  => {
+                                error!("read_buf failed: {e}");
+                                panic!("read_buf failed");
+                                break
+                            }
                         };
 
                         let buf = Cursor::new(&rx_buf[..]);
@@ -92,13 +102,14 @@ where
                                 if let Some((_k, tx)) = 
pending.remove(&data.id) {
                                     let _ = tx.send(frame);
                                 }
-                                core.lock().await.mark_tx_done();
+                                core.try_lock().unwrap().mark_tx_done();
                                 at_most = init_len;
-                                continue;
+                                break;
                             }
                             InboundResult::Error(_) => {
                                 pending.remove(&data.id);
                                 core.lock().await.mark_tx_done();
+                                panic!("read_buf failed");
                                 break;
                             }
                         }

Reply via email to