This is an automated email from the ASF dual-hosted git repository.

bashirbekov pushed a commit to branch draft-sans
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/draft-sans by this push:
     new e8510799 del
e8510799 is described below

commit e85107995bf1341342b7f02ea45272b53b44bdf8
Author: haze518 <[email protected]>
AuthorDate: Mon Jul 7 11:55:30 2025 +0600

    del
---
 core/common/src/types/command/mod.rs    |  4 +-
 core/sdk/src/connection/quic/mod.rs     | 76 +++++++++++++++++++++++++++++----
 core/sdk/src/driver/mod.rs              | 74 +++++++++++++++++++++-----------
 core/sdk/src/proto/connection.rs        | 37 +++++++++++-----
 core/sdk/src/proto/runtime.rs           |  3 +-
 core/sdk/src/quic/quick_client.rs       |  2 +-
 core/sdk/src/transport_adapter/async.rs | 76 ++++++++++++++++++++++-----------
 7 files changed, 197 insertions(+), 75 deletions(-)

diff --git a/core/common/src/types/command/mod.rs 
b/core/common/src/types/command/mod.rs
index 4c576381..380273da 100644
--- a/core/common/src/types/command/mod.rs
+++ b/core/common/src/types/command/mod.rs
@@ -19,9 +19,9 @@
 use crate::BytesSerializable;
 use crate::Validatable;
 use crate::error::IggyError;
-use std::fmt::Display;
+use std::fmt::{Display, Debug};
 
-pub trait Command: BytesSerializable + Validatable<IggyError> + Send + Sync + 
Display {
+pub trait Command: BytesSerializable + Validatable<IggyError> + Send + Sync + 
Display + Debug {
     fn code(&self) -> u32;
 }
 
diff --git a/core/sdk/src/connection/quic/mod.rs 
b/core/sdk/src/connection/quic/mod.rs
index c5d9feed..c120f153 100644
--- a/core/sdk/src/connection/quic/mod.rs
+++ b/core/sdk/src/connection/quic/mod.rs
@@ -1,39 +1,99 @@
+use std::io::IoSlice;
 use std::sync::Arc;
 use std::{io, net::SocketAddr, pin::Pin, time::Duration};
+use bytes::Bytes;
 use iggy_common::{IggyError, QuicClientConfig};
 use rustls::crypto::CryptoProvider;
+use tokio::io::AsyncWriteExt;
 use tracing::{error, warn};
+use crate::proto::runtime::sync;
 use crate::quic::skip_server_verification::SkipServerVerification;
 
 use quinn::crypto::rustls::QuicClientConfig as QuinnQuicClientConfig;
-use quinn::{ClientConfig, Connection, Endpoint, IdleTimeout, VarInt};
+use quinn::{ClientConfig, Connection, Endpoint, IdleTimeout, RecvStream, 
SendStream, VarInt};
+
+pub trait StreamPair: Send {
+    fn send_vectored<'a>(&'a mut self, bufs: &'a [IoSlice<'_>]) -> Pin<Box<dyn 
Future<Output = Result<(), IggyError>> + Send + 'a>>;
+    fn read_chunk<'a>(&'a mut self, at_most: usize) -> Pin<Box<dyn 
Future<Output = Result<Option<Bytes>, IggyError>> + Send + 'a>>;
+}
 
 pub trait QuicFactory {
-    type Conn;
-    fn connect(&self) -> Pin<Box<dyn Future<Output = Result<Self::Conn, 
IggyError>> + Send>>;
+    type Stream: StreamPair;
+    
+    fn connect(&self) -> Pin<Box<dyn Future<Output = Result<(), IggyError>> + 
Send>>;
+    fn open_stream(&self) -> Pin<Box<dyn Future<Output = Result<Self::Stream, 
IggyError>> + Send + '_>>;
+}
+
+pub struct QuinnStreamPair {
+    send: SendStream,
+    recv: RecvStream,
+}
+
+impl StreamPair for QuinnStreamPair {
+    fn send_vectored<'a>(&'a mut self, bufs: &'a [IoSlice<'_>]) -> Pin<Box<dyn 
Future<Output = Result<(), IggyError>> + Send + 'a>> {
+        Box::pin(async move {
+            self.send.write_vectored(bufs).await.map_err(|e| {
+                error!("Failed to write vectored buffs to quic conn: {e}");
+                IggyError::QuicError
+            })?;
+            self.send.finish();
+            Ok(())
+        })
+    }
+
+    fn read_chunk<'a>(&'a mut self, at_most: usize) -> Pin<Box<dyn 
Future<Output = Result<Option<Bytes>, IggyError>> + Send + 'a>> {
+        Box::pin(async move {
+            let res = self.recv.read_chunk(at_most, true).await.map_err(|e| {
+                error!("Failed to read chunk: {e}");
+                IggyError::QuicError
+            })?;
+            if let Some(data) = res {
+                return Ok(Some(data.bytes));
+            }
+            Ok(None)
+        })
+    }
 }
 
 pub struct QuinnFactory {
     config: Arc<QuicClientConfig>,
     ep: Arc<Endpoint>,
+    connection: Arc<sync::Mutex<Option<Connection>>>,
     server_address: SocketAddr,
 }
 
 impl QuicFactory for QuinnFactory {
-    type Conn = Connection;
+    type Stream = QuinnStreamPair;
 
-    fn connect(&self) -> Pin<Box<dyn Future<Output = Result<Self::Conn, 
IggyError>> + Send>> {
+    fn connect(&self) -> Pin<Box<dyn Future<Output = Result<(), IggyError>> + 
Send>> {
         let ep  = self.ep.clone();
         let sn  = self.config.server_name.clone();
         let sa = self.server_address.clone();
+        let conn = self.connection.clone();
         Box::pin(async move {
+            let mut connection = conn.lock().await;
             let connecting = ep
                 .connect(sa, &sn)
                 .map_err(|_| IggyError::CannotEstablishConnection)?;
 
-            connecting
+            let new_conn = connecting
                 .await
-                .map_err(|_| IggyError::CannotEstablishConnection)
+                .map_err(|_| IggyError::CannotEstablishConnection)?;
+            let _ = connection.insert(new_conn);
+            Ok(())
+        })
+    }
+
+    fn open_stream(&self) -> Pin<Box<dyn Future<Output = Result<Self::Stream, 
IggyError>> + Send + '_>> {
+        let conn = self.connection.clone();
+        Box::pin(async move {
+            let guard = conn.lock().await;
+            let conn_ref = guard.as_ref().ok_or(IggyError::NotConnected)?;
+            let (send, recv) = conn_ref.open_bi().await.map_err(|e| {
+                error!("Failed to open a bidirectional stream: {e}");
+                IggyError::QuicError 
+            })?;
+            Ok(QuinnStreamPair { send, recv })
         })
     }
 }
@@ -72,7 +132,7 @@ impl QuinnFactory {
         let mut endpoint = endpoint.unwrap();
         endpoint.set_default_client_config(quic_config);
 
-        Ok(Self { config: cfg, ep: Arc::new(endpoint), server_address })
+        Ok(Self { config: cfg, ep: Arc::new(endpoint), server_address, 
connection: Arc::new(sync::Mutex::new(None)) })
     }
 }
 
diff --git a/core/sdk/src/driver/mod.rs b/core/sdk/src/driver/mod.rs
index 2472c64c..6357a7e3 100644
--- a/core/sdk/src/driver/mod.rs
+++ b/core/sdk/src/driver/mod.rs
@@ -7,7 +7,7 @@ use tokio::io::AsyncWriteExt;
 use tracing::{error, info, trace, warn};
 
 use crate::{
-    connection::quic::{QuicFactory, QuinnFactory},
+    connection::quic::{QuicFactory, QuinnFactory, StreamPair},
     proto::{
         connection::{IggyCore, InboundResult},
         runtime::{Runtime, sync},
@@ -40,49 +40,71 @@ where
         let nt = self.notify.clone();
         let core = self.core.clone();
         let q = self.factory.clone();
-        let cfg: Arc<QuicClientConfig> = self.config.clone();
+        let cfg = self.config.clone();
         let pending = self.pending.clone();
         rt.spawn(Box::pin(async move {
+            if let Err(e) = q.connect().await {
+                error!("Failed to connect: {e}");
+                return;
+            }
             loop {
                 nt.notified().await;
 
-                while let Some(data) = core.lock().await.poll_transmit() {
+                while let Some(data) = {
+                    let mut guard = core.lock().await;
+                    guard.poll_transmit()
+                } {
                     if !pending.contains_key(&data.id) {
                         error!("Failed to get transport adapter id");
                         continue;
                     }
 
-                    let connection = q.connect().await.unwrap(); // todo 
дорогая операция, перенести хранение connection в структуру quic
-                    let (mut send, mut recv) = connection
-                        .open_bi()
-                        .await
-                        .map_err(|error| {
-                            error!("Failed to open a bidirectional stream: 
{error}");
-                            IggyError::QuicError
-                        })
-                        .unwrap();
-                    send.write_vectored(&data.as_slices()).await; // TODO add 
map_err
-                    send.finish().unwrap();
+                    let mut stream = match q.open_stream().await {
+                        Ok(s) => s,
+                        Err(e) => {
+                            error!("Failed to open a bidirectional stream: 
{e}");
+                            continue;
+                        }
+                    };
+
+                    if let Err(e) = 
stream.send_vectored(&data.as_slices()).await {
+                        error!("Failed to send vectored: {e}");
+                        continue;
+                    }
 
-                    let mut n = cfg.response_buffer_size as usize;
+                    let mut at_most = cfg.response_buffer_size as usize;
                     loop {
-                        let buffer = recv
-                            .read_to_end(n)
-                            .await
-                            .map_err(|error| {
-                                error!("Failed to read response data: 
{error}");
-                                IggyError::QuicError
-                            })
-                            .unwrap();
-                        match core.lock().await.feed_inbound(&buffer) {
-                            InboundResult::Need(need) => n = need,
+                        let buffer = match stream.read_chunk(at_most).await {
+                            Ok(Some(buf)) => buf,
+                            Ok(None) => {
+                                error!("Unexpected EOF in stream");
+                                break;
+                            }
+                            Err(e) => {
+                                error!("Failed to read response data: {e}");
+                                break;
+                            }
+                        };
+
+                        let inbound = {
+                            let mut guard = core.lock().await;
+                            guard.feed_inbound(&buffer)
+                        };
+
+                        match inbound {
+                            InboundResult::Need(need) => at_most = need,
                             InboundResult::Response(r) => {
                                 if let Some((_key, tx)) = 
pending.remove(&data.id) {
                                     let _ = tx.send(r);
                                 }
+                                let mut guard = core.lock().await;
+                                guard.mark_tx_done();
+                                break;
                             }
                             InboundResult::Error(e) => {
-                                // todo add handle error
+                                let mut guard = core.lock().await;
+                                guard.mark_tx_done();
+                                break;
                             }
                         }
                     }
diff --git a/core/sdk/src/proto/connection.rs b/core/sdk/src/proto/connection.rs
index 1bcc581f..19c04ebb 100644
--- a/core/sdk/src/proto/connection.rs
+++ b/core/sdk/src/proto/connection.rs
@@ -40,6 +40,7 @@ impl TxBuf {
     }
 }
 
+#[derive(Debug)]
 pub enum Order {
     Outbound(Box<dyn Command>),
     State(ClientState),
@@ -59,15 +60,15 @@ pub enum InboundResult {
 pub struct IggyCore {
     state: ClientState,
     last_connect: Option<IggyTimestamp>,
-    pending: VecDeque<(u32 /* code */, Bytes /* payload */)>,
+    pending: VecDeque<(u32 /* code */, Bytes /* payload */, u64 /* 
transport_id */)>,
     config: Arc<dyn TransportConfig + Send + Sync + 'static>, // todo rewrite 
via generic
     retry_count: u32,
-    current_tx: Option<TxBuf>,
+    current_tx: Option<Arc<TxBuf>>,
     rx_buf: BytesMut,
 }
 
 impl IggyCore {
-    pub fn write(&mut self, cmd: &impl Command) -> Result<(), IggyError> {
+    pub fn write(&mut self, cmd: &impl Command, id: u64) -> Result<(), 
IggyError> {
         match self.state {
             ClientState::Shutdown => {
                 trace!("Cannot send data. Client is shutdown.");
@@ -83,7 +84,7 @@ impl IggyCore {
             }
             _ => {}
         }
-        self.pending.push_back((cmd.code(), cmd.to_bytes()));
+        self.pending.push_back((cmd.code(), cmd.to_bytes(), id));
         Ok(())
     }
 
@@ -140,19 +141,23 @@ impl IggyCore {
         Ok(Order::Reconnect)
     }
 
-    // TODO вызывать при async fn poll
-    pub fn poll_transmit(&mut self) -> Option<&TxBuf> {
+    pub fn poll_transmit(&mut self) -> Option<Arc<TxBuf>> {
         if self.current_tx.is_none() {
-            let (code, payload) = self.pending.pop_front()?;
+            let (code, payload, id) = self.pending.pop_front()?;
             let len = (payload.len() + REQUEST_INITIAL_BYTES_LENGTH) as u32;
 
-            self.current_tx = Some(TxBuf {
+            self.current_tx = Some(Arc::new(TxBuf{
                 hdr_len: len.to_le_bytes(),
                 hdr_code: code.to_le_bytes(),
-                payload,
-            });
+                payload, 
+                id,
+            }));
         }
-        self.current_tx.as_ref()
+        self.current_tx.as_ref().cloned()
+    }
+
+    pub fn mark_tx_done(&mut self) {
+        self.current_tx = None
     }
 
     pub fn feed_inbound(&mut self, bytes: &[u8]) -> InboundResult {
@@ -199,4 +204,14 @@ impl IggyCore {
         let body = full.split_off(8).freeze();
         InboundResult::Response(body)
     }
+
+    pub fn on_transport_connected(&mut self) {
+        self.state        = ClientState::Connected;
+        self.retry_count  = 0;
+        self.last_connect = Some(IggyTimestamp::now());
+    }
+
+    pub fn on_transport_disconnected(&mut self) {
+        self.state = ClientState::Disconnected;
+    }
 }
diff --git a/core/sdk/src/proto/runtime.rs b/core/sdk/src/proto/runtime.rs
index 6f4981be..b3227bf2 100644
--- a/core/sdk/src/proto/runtime.rs
+++ b/core/sdk/src/proto/runtime.rs
@@ -1,4 +1,4 @@
-use std::{ops::{Deref, DerefMut}, pin::Pin};
+use std::{ops::{Deref, DerefMut}, pin::Pin, time::Duration};
 
 use iggy_common::IggyError;
 
@@ -13,6 +13,7 @@ pub mod sync {
 
 pub trait Runtime: Sync + Send + 'static {
     fn spawn(&self, future: Pin<Box<dyn Future<Output = ()> + Send>>);
+    fn sleep(&self, duration: Duration) -> Pin<Box<dyn Future<Output = ()> + 
Send>>;
 }
 
 #[cfg(feature = "runtime_tokio")]
diff --git a/core/sdk/src/quic/quick_client.rs 
b/core/sdk/src/quic/quick_client.rs
index 9317cd27..e3ce4419 100644
--- a/core/sdk/src/quic/quick_client.rs
+++ b/core/sdk/src/quic/quick_client.rs
@@ -20,6 +20,7 @@ use crate::prelude::AutoLogin;
 use iggy_binary_protocol::{
     BinaryClient, BinaryTransport, Client, PersonalAccessTokenClient, 
UserClient,
 };
+use tokio::io::AsyncWriteExt;
 
 use crate::prelude::{IggyDuration, IggyError, IggyTimestamp, QuicClientConfig};
 use crate::quic::skip_server_verification::SkipServerVerification;
@@ -458,7 +459,6 @@ impl QuicClient {
                 IggyError::QuicError
             })?;
             trace!("Sending a QUIC request with code: {code}");
-            
             send.write_all(&(payload_length as u32).to_le_bytes())
                 .await
                 .map_err(|error| {
diff --git a/core/sdk/src/transport_adapter/async.rs 
b/core/sdk/src/transport_adapter/async.rs
index 7271ed1a..78960bbc 100644
--- a/core/sdk/src/transport_adapter/async.rs
+++ b/core/sdk/src/transport_adapter/async.rs
@@ -1,57 +1,81 @@
-use std::{pin::Pin, sync::{atomic::AtomicU64, Arc, Mutex}};
+use std::{
+    pin::Pin,
+    sync::{Arc, Mutex, atomic::AtomicU64},
+};
 
 use bytes::Bytes;
 use iggy_common::{Command, IggyError};
 use tokio::sync::Notify;
+use tracing::{error, trace};
 
-use crate::{connection::quic::QuicFactory, proto::{connection::{IggyCore, 
Order}, runtime::{self, sync, Lockable, Runtime}}, transport_adapter::{RespFut, 
TransportAdapter}};
+use crate::{
+    connection::quic::QuicFactory,
+    driver::Driver,
+    proto::{
+        connection::{IggyCore, Order},
+        runtime::{self, Runtime, sync},
+    },
+    transport_adapter::RespFut,
+};
 
-pub struct AsyncTransportAdapter<F: QuicFactory, R: Runtime> {
+pub struct AsyncTransportAdapter<F: QuicFactory, R: Runtime, D: Driver> {
     factory: Arc<F>,
     rt: Arc<R>,
     core: sync::Mutex<IggyCore>,
     notify: Arc<Notify>,
     id: AtomicU64,
-    driver: Driver,
+    driver: Arc<D>,
 }
 
-impl<F, R> AsyncTransportAdapter<F, R>
+impl<F, R, D> AsyncTransportAdapter<F, R, D>
 where
     F: QuicFactory + Send + Sync + 'static,
-    R: Runtime,
+    R: Runtime + Send + Sync + 'static,
+    D: Driver + Send + Sync,
 {
-    pub fn send_with_response<'a, T: Command>(&'a self, command: &'a T) -> 
Pin<Box<dyn Future<Output = Result<RespFut, IggyError>> + Send + 'a >> {
+    pub fn send_with_response<'a, T: Command>(
+        &'a self,
+        command: &'a T,
+    ) -> Pin<Box<dyn Future<Output = Result<RespFut, IggyError>> + Send + Sync 
+ 'a>> {
         Box::pin(async move {
             let (tx, rx) = runtime::oneshot::<Bytes>();
             let current_id = self.id.fetch_add(1, 
std::sync::atomic::Ordering::SeqCst);
-            
+
             self.core.lock().await.write(command, current_id)?;
-            self.driver.register(current_id);
-            self.notify.notify_one();
+            self.driver.register(current_id, tx);
+            self.notify.notify_waiters();
 
-            Ok(RespFut{rx: rx})
+            Ok(RespFut { rx: rx })
         })
     }
 
     pub async fn connect(&self) -> Result<(), IggyError> {
-        let connect = self.core.lock().await.start_connect()?;
+        let mut order = self.core.lock().await.start_connect()?;
         loop {
-            match connect {
-                Order::Reconnect => {
-                    
+            match order {
+                Order::Wait(dur) => {
+                    self.rt.sleep(dur.get_duration()).await;
+                    order = self.core.lock().await.poll_connect()?;
                 }
+
+                Order::Reconnect => match self.factory.connect().await {
+                    Ok(()) => {
+                        self.core.lock().await.on_transport_connected();
+                        return Ok(());
+                    }
+                    Err(e) => {
+                        self.core.lock().await.on_transport_disconnected();
+                        order = self.core.lock().await.poll_connect()?;
+                        if matches!(order, Order::Noop) {
+                            return Err(e);
+                        }
+                    }
+                },
+
+                Order::Noop => return Ok(()),
+
+                _ => return Err(IggyError::CannotEstablishConnection),
             }
         }
     }
 }
-/*
-// tood для transprot
-    fn send_with_response<'a, T: iggy_common::Command>(&'a self, command: &'a 
T) -> Pin<Box<dyn Future<Output = Result<bytes::Bytes, iggy_common::IggyError>> 
+ Send + 'a>> {
-        Box::pin(async move {
-            self.core.lock().await.write(command)?;
-
-            Ok(Bytes::new())
-        })
-    }
-
-*/
\ No newline at end of file

Reply via email to