This is an automated email from the ASF dual-hosted git repository.

bashirbekov pushed a commit to branch draft-sans
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/draft-sans by this push:
     new c5e8c170 del
c5e8c170 is described below

commit c5e8c1703f244f67086d2929c3634caec6448cdd
Author: haze518 <[email protected]>
AuthorDate: Wed Jul 9 10:10:47 2025 +0600

    del
---
 core/sdk/src/connection/mod.rs          | 10 ++++++++
 core/sdk/src/connection/quic/mod.rs     | 40 ++++++++++++++++++++++++------
 core/sdk/src/connection/tcp/mod.rs      |  1 +
 core/sdk/src/transport_adapter/async.rs | 35 ++++++++++++++++++---------
 core/sdk/src/transport_adapter/quic.rs  | 43 ---------------------------------
 5 files changed, 67 insertions(+), 62 deletions(-)

diff --git a/core/sdk/src/connection/mod.rs b/core/sdk/src/connection/mod.rs
index 85a4c13f..d65a1318 100644
--- a/core/sdk/src/connection/mod.rs
+++ b/core/sdk/src/connection/mod.rs
@@ -1,2 +1,12 @@
+use std::pin::Pin;
+
+use iggy_common::IggyError;
+
 pub mod tcp;
 pub mod quic;
+
+pub trait ConnectionFactory {
+    fn connect(&self) -> Pin<Box<dyn Future<Output = Result<(), IggyError>> + 
Send + Sync>>;
+    fn is_alive(&self) -> Pin<Box<dyn Future<Output = bool>>>;
+    fn shutdown(&self) -> Pin<Box<dyn Future<Output = Result<(), IggyError>> + 
Send + Sync>>;
+}
diff --git a/core/sdk/src/connection/quic/mod.rs 
b/core/sdk/src/connection/quic/mod.rs
index 67d1683c..422e7f38 100644
--- a/core/sdk/src/connection/quic/mod.rs
+++ b/core/sdk/src/connection/quic/mod.rs
@@ -6,6 +6,7 @@ use iggy_common::{IggyError, QuicClientConfig};
 use rustls::crypto::CryptoProvider;
 use tokio::io::AsyncWriteExt;
 use tracing::{error, warn};
+use crate::connection::ConnectionFactory;
 use crate::proto::runtime::sync;
 use crate::quic::skip_server_verification::SkipServerVerification;
 
@@ -17,11 +18,9 @@ pub trait StreamPair: Send {
     fn read_chunk<'a>(&'a mut self, at_most: usize) -> Pin<Box<dyn 
Future<Output = Result<Option<Bytes>, IggyError>> + Send + 'a>>;
 }
 
-pub trait QuicFactory {
+pub trait QuicFactory: ConnectionFactory {
     type Stream: StreamPair;
-    
-    fn connect(&self) -> Pin<Box<dyn Future<Output = Result<(), IggyError>> + 
Send>>;
-    
+
     fn open_stream(&self) -> Pin<Box<dyn Future<Output = Result<Self::Stream, 
IggyError>> + Send + '_>>;
 }
 
@@ -66,10 +65,8 @@ pub struct QuinnFactory {
     server_address: SocketAddr,
 }
 
-impl QuicFactory for QuinnFactory {
-    type Stream = QuinnStreamPair;
-
-    fn connect(&self) -> Pin<Box<dyn Future<Output = Result<(), IggyError>> + 
Send>> {
+impl ConnectionFactory for QuinnFactory {
+    fn connect(&self) -> Pin<Box<dyn Future<Output = Result<(), IggyError>> + 
Send + Sync>> {
         let ep  = self.ep.clone();
         let sn  = self.config.server_name.clone();
         let sa = self.server_address.clone();
@@ -88,6 +85,33 @@ impl QuicFactory for QuinnFactory {
         })
     }
 
+    fn is_alive(&self) -> Pin<Box<dyn Future<Output = bool>>> {
+        let conn = self.connection.clone();
+        Box::pin(async move {
+            let conn = conn.lock().await;
+            match conn.as_ref() {
+                Some(c) => c.close_reason().is_some(),
+                None => false,
+            }
+        })
+    }
+
+    fn shutdown(&self) -> Pin<Box<dyn Future<Output = Result<(), IggyError>> + 
Send + Sync>> {
+        let conn = self.connection.clone();
+        let ep  = self.ep.clone();
+        Box::pin(async move {
+            if let Some(conn) = conn.lock().await.take() {
+                conn.close(0u32.into(), b"");
+            }
+            ep.wait_idle().await;
+            Ok(())
+        })
+    }
+}
+
+impl QuicFactory for QuinnFactory {
+    type Stream = QuinnStreamPair;
+
     fn open_stream(&self) -> Pin<Box<dyn Future<Output = Result<Self::Stream, 
IggyError>> + Send + '_>> {
         let conn = self.connection.clone();
         Box::pin(async move {
diff --git a/core/sdk/src/connection/tcp/mod.rs 
b/core/sdk/src/connection/tcp/mod.rs
index 3d133a45..f4774631 100644
--- a/core/sdk/src/connection/tcp/mod.rs
+++ b/core/sdk/src/connection/tcp/mod.rs
@@ -12,3 +12,4 @@ pub trait SocketFactory {
 
     fn connect(&self) -> Pin<Box<dyn Future<Output = io::Result<Self::Stream>> 
+ Send>>;
 }
+
diff --git a/core/sdk/src/transport_adapter/async.rs 
b/core/sdk/src/transport_adapter/async.rs
index af98e323..e0a90446 100644
--- a/core/sdk/src/transport_adapter/async.rs
+++ b/core/sdk/src/transport_adapter/async.rs
@@ -10,16 +10,16 @@ use tokio::sync::Notify;
 use tracing::{error, trace};
 
 use crate::{
-    connection::quic::QuicFactory,
+    connection::{quic::QuicFactory, ConnectionFactory},
     driver::Driver,
     proto::{
         connection::{IggyCore, Order},
-        runtime::{self, Runtime, sync},
+        runtime::{self, sync, Runtime},
     },
     transport_adapter::RespFut,
 };
 
-pub struct AsyncTransportAdapter<F: QuicFactory, R: Runtime, D: Driver> {
+pub struct AsyncTransportAdapter<F: ConnectionFactory, R: Runtime, D: Driver> {
     factory: Arc<F>,
     rt: Arc<R>,
     core: sync::Mutex<IggyCore>,
@@ -31,15 +31,13 @@ pub struct AsyncTransportAdapter<F: QuicFactory, R: 
Runtime, D: Driver> {
 
 impl<F, R, D> AsyncTransportAdapter<F, R, D>
 where
-    F: QuicFactory + Send + Sync + 'static,
+    F: ConnectionFactory + Send + Sync + 'static,
     R: Runtime + Send + Sync + 'static,
     D: Driver + Send + Sync,
 {
-    pub fn send_with_response<'a, T: Command>(
-        &'a self,
-        command: &'a T,
-    ) -> Pin<Box<dyn Future<Output = Result<RespFut, IggyError>> + Send + Sync 
+ 'a>> {
-        Box::pin(async move {
+    async fn send_with_response<T: Command>(&self, command: &T) -> 
Result<RespFut, IggyError> {
+            self.ensure_connected().await?;
+    
             let (tx, rx) = runtime::oneshot::<Bytes>();
             let current_id = self.id.fetch_add(1, 
std::sync::atomic::Ordering::SeqCst);
 
@@ -48,7 +46,6 @@ where
             self.notify.notify_waiters();
 
             Ok(RespFut { rx: rx })
-        })
     }
 
     pub async fn connect(&self) -> Result<(), IggyError> {
@@ -85,11 +82,27 @@ where
             }
         }
     }
-    // TODO add login/shutdown/disconnect
 
     async fn publish_event(&self, event: DiagnosticEvent) {
         if let Err(error) = self.events.0.broadcast(event).await {
             error!("Failed to send a QUIC diagnostic event: {error}");
         }
     }
+
+    async fn ensure_connected(&self) -> Result<(), IggyError> {
+        if self.factory.is_alive().await {
+            return Ok(())
+        }
+        self.shutdown().await?;
+        self.connect().await
+    }
+
+    async fn shutdown(&self) -> Result<(), IggyError> {
+        self.core.lock().await.on_transport_disconnected();
+        self.factory.shutdown().await?;
+        self.publish_event(DiagnosticEvent::Shutdown).await;
+        Ok(())
+    }
+
+    // TODO add async fn login
 }
diff --git a/core/sdk/src/transport_adapter/quic.rs 
b/core/sdk/src/transport_adapter/quic.rs
deleted file mode 100644
index 8900df5f..00000000
--- a/core/sdk/src/transport_adapter/quic.rs
+++ /dev/null
@@ -1,43 +0,0 @@
-use std::{collections::VecDeque, pin::Pin, sync::{Arc, Mutex}, task::Waker};
-
-use bytes::Bytes;
-use futures::{channel::oneshot, FutureExt};
-use iggy_common::{Command, IggyError};
-
-use crate::{connection::quic::QuicFactory, proto::{connection::IggyCore, 
runtime::{self, Lockable, Runtime}}, transport_adapter::{RespFut, 
TransportAdapter}};
-
-pub struct QuicAdapter<F: QuicFactory, R: Runtime> {
-    factory: Arc<F>,
-    rt: Arc<R>,
-    core: R::Mutex<IggyCore>,
-    waiters: VecDeque<Waker>
-}
-
-impl<F, R> TransportAdapter for QuicAdapter<F, R>
-where
-    F: QuicFactory + Send + Sync + 'static,
-    R: Runtime,
-{
-    fn send_with_response<'a, T: Command>(&'a self, command: &'a T) -> 
Pin<Box<dyn Future<Output = Result<RespFut, IggyError>> + Send + 'a >> {
-        Box::pin(async move {
-            let (tx, rx) = runtime::oneshot::<Bytes>();
-            self.core.lock().await.write(command)?;
-            self.waiters.push_back(value);
-            Ok(RespFut { rx: rx })
-        })
-    }
-}
-
-
-
-/*
-// tood для transprot
-    fn send_with_response<'a, T: iggy_common::Command>(&'a self, command: &'a 
T) -> Pin<Box<dyn Future<Output = Result<bytes::Bytes, iggy_common::IggyError>> 
+ Send + 'a>> {
-        Box::pin(async move {
-            self.core.lock().await.write(command)?;
-
-            Ok(Bytes::new())
-        })
-    }
-
-*/
\ No newline at end of file

Reply via email to