This is an automated email from the ASF dual-hosted git repository.

bashirbekov pushed a commit to branch draft-sans
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/draft-sans by this push:
     new 813d8fdd del
813d8fdd is described below

commit 813d8fdd0c05734e5d23922e9c8f60d5bcf86964
Author: haze518 <[email protected]>
AuthorDate: Wed Jul 9 07:47:30 2025 +0600

    del
---
 core/sdk/src/connection/quic/mod.rs     | 14 ++++++++++----
 core/sdk/src/driver/mod.rs              |  4 ----
 core/sdk/src/transport_adapter/async.rs | 18 ++++++++++++++++--
 3 files changed, 26 insertions(+), 10 deletions(-)

diff --git a/core/sdk/src/connection/quic/mod.rs 
b/core/sdk/src/connection/quic/mod.rs
index c120f153..67d1683c 100644
--- a/core/sdk/src/connection/quic/mod.rs
+++ b/core/sdk/src/connection/quic/mod.rs
@@ -21,6 +21,7 @@ pub trait QuicFactory {
     type Stream: StreamPair;
     
     fn connect(&self) -> Pin<Box<dyn Future<Output = Result<(), IggyError>> + 
Send>>;
+    
     fn open_stream(&self) -> Pin<Box<dyn Future<Output = Result<Self::Stream, 
IggyError>> + Send + '_>>;
 }
 
@@ -36,7 +37,10 @@ impl StreamPair for QuinnStreamPair {
                 error!("Failed to write vectored buffs to quic conn: {e}");
                 IggyError::QuicError
             })?;
-            self.send.finish();
+            self.send.finish().map_err(|e: quinn::ClosedStream| {
+                error!("Failed to finish sending data: {e}");
+                IggyError::QuicError
+            })?;
             Ok(())
         })
     }
@@ -87,9 +91,11 @@ impl QuicFactory for QuinnFactory {
     fn open_stream(&self) -> Pin<Box<dyn Future<Output = Result<Self::Stream, 
IggyError>> + Send + '_>> {
         let conn = self.connection.clone();
         Box::pin(async move {
-            let guard = conn.lock().await;
-            let conn_ref = guard.as_ref().ok_or(IggyError::NotConnected)?;
-            let (send, recv) = conn_ref.open_bi().await.map_err(|e| {
+            let conn = {
+                let guard = conn.lock().await;
+                guard.clone().ok_or(IggyError::NotConnected)?
+            };
+            let (send, recv) = conn.open_bi().await.map_err(|e| {
                 error!("Failed to open a bidirectional stream: {e}");
                 IggyError::QuicError 
             })?;
diff --git a/core/sdk/src/driver/mod.rs b/core/sdk/src/driver/mod.rs
index 6357a7e3..114a12f7 100644
--- a/core/sdk/src/driver/mod.rs
+++ b/core/sdk/src/driver/mod.rs
@@ -43,10 +43,6 @@ where
         let cfg = self.config.clone();
         let pending = self.pending.clone();
         rt.spawn(Box::pin(async move {
-            if let Err(e) = q.connect().await {
-                error!("Failed to connect: {e}");
-                return;
-            }
             loop {
                 nt.notified().await;
 
diff --git a/core/sdk/src/transport_adapter/async.rs 
b/core/sdk/src/transport_adapter/async.rs
index 78960bbc..af98e323 100644
--- a/core/sdk/src/transport_adapter/async.rs
+++ b/core/sdk/src/transport_adapter/async.rs
@@ -2,9 +2,10 @@ use std::{
     pin::Pin,
     sync::{Arc, Mutex, atomic::AtomicU64},
 };
+use async_broadcast::{Receiver, Sender, broadcast};
 
 use bytes::Bytes;
-use iggy_common::{Command, IggyError};
+use iggy_common::{Command, DiagnosticEvent, IggyError};
 use tokio::sync::Notify;
 use tracing::{error, trace};
 
@@ -25,6 +26,7 @@ pub struct AsyncTransportAdapter<F: QuicFactory, R: Runtime, 
D: Driver> {
     notify: Arc<Notify>,
     id: AtomicU64,
     driver: Arc<D>,
+    events: (Sender<DiagnosticEvent>, Receiver<DiagnosticEvent>),
 }
 
 impl<F, R, D> AsyncTransportAdapter<F, R, D>
@@ -61,12 +63,14 @@ where
                 Order::Reconnect => match self.factory.connect().await {
                     Ok(()) => {
                         self.core.lock().await.on_transport_connected();
+                        self.publish_event(DiagnosticEvent::Connected).await;
                         return Ok(());
                     }
                     Err(e) => {
                         self.core.lock().await.on_transport_disconnected();
                         order = self.core.lock().await.poll_connect()?;
                         if matches!(order, Order::Noop) {
+                            
self.publish_event(DiagnosticEvent::Disconnected).await;
                             return Err(e);
                         }
                     }
@@ -74,8 +78,18 @@ where
 
                 Order::Noop => return Ok(()),
 
-                _ => return Err(IggyError::CannotEstablishConnection),
+                _ => {
+                    self.publish_event(DiagnosticEvent::Disconnected).await;
+                    return Err(IggyError::CannotEstablishConnection)
+                },
             }
         }
     }
+    // TODO add login/shutdown/disconnect
+
+    async fn publish_event(&self, event: DiagnosticEvent) {
+        if let Err(error) = self.events.0.broadcast(event).await {
+            error!("Failed to send a QUIC diagnostic event: {error}");
+        }
+    }
 }

Reply via email to