This is an automated email from the ASF dual-hosted git repository.
bashirbekov pushed a commit to branch draft-sans
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/draft-sans by this push:
new 813d8fdd del
813d8fdd is described below
commit 813d8fdd0c05734e5d23922e9c8f60d5bcf86964
Author: haze518 <[email protected]>
AuthorDate: Wed Jul 9 07:47:30 2025 +0600
del
---
core/sdk/src/connection/quic/mod.rs | 14 ++++++++++----
core/sdk/src/driver/mod.rs | 4 ----
core/sdk/src/transport_adapter/async.rs | 18 ++++++++++++++++--
3 files changed, 26 insertions(+), 10 deletions(-)
diff --git a/core/sdk/src/connection/quic/mod.rs
b/core/sdk/src/connection/quic/mod.rs
index c120f153..67d1683c 100644
--- a/core/sdk/src/connection/quic/mod.rs
+++ b/core/sdk/src/connection/quic/mod.rs
@@ -21,6 +21,7 @@ pub trait QuicFactory {
type Stream: StreamPair;
fn connect(&self) -> Pin<Box<dyn Future<Output = Result<(), IggyError>> +
Send>>;
+
fn open_stream(&self) -> Pin<Box<dyn Future<Output = Result<Self::Stream,
IggyError>> + Send + '_>>;
}
@@ -36,7 +37,10 @@ impl StreamPair for QuinnStreamPair {
error!("Failed to write vectored buffs to quic conn: {e}");
IggyError::QuicError
})?;
- self.send.finish();
+ self.send.finish().map_err(|e: quinn::ClosedStream| {
+ error!("Failed to finish sending data: {e}");
+ IggyError::QuicError
+ })?;
Ok(())
})
}
@@ -87,9 +91,11 @@ impl QuicFactory for QuinnFactory {
fn open_stream(&self) -> Pin<Box<dyn Future<Output = Result<Self::Stream,
IggyError>> + Send + '_>> {
let conn = self.connection.clone();
Box::pin(async move {
- let guard = conn.lock().await;
- let conn_ref = guard.as_ref().ok_or(IggyError::NotConnected)?;
- let (send, recv) = conn_ref.open_bi().await.map_err(|e| {
+ let conn = {
+ let guard = conn.lock().await;
+ guard.clone().ok_or(IggyError::NotConnected)?
+ };
+ let (send, recv) = conn.open_bi().await.map_err(|e| {
error!("Failed to open a bidirectional stream: {e}");
IggyError::QuicError
})?;
diff --git a/core/sdk/src/driver/mod.rs b/core/sdk/src/driver/mod.rs
index 6357a7e3..114a12f7 100644
--- a/core/sdk/src/driver/mod.rs
+++ b/core/sdk/src/driver/mod.rs
@@ -43,10 +43,6 @@ where
let cfg = self.config.clone();
let pending = self.pending.clone();
rt.spawn(Box::pin(async move {
- if let Err(e) = q.connect().await {
- error!("Failed to connect: {e}");
- return;
- }
loop {
nt.notified().await;
diff --git a/core/sdk/src/transport_adapter/async.rs
b/core/sdk/src/transport_adapter/async.rs
index 78960bbc..af98e323 100644
--- a/core/sdk/src/transport_adapter/async.rs
+++ b/core/sdk/src/transport_adapter/async.rs
@@ -2,9 +2,10 @@ use std::{
pin::Pin,
sync::{Arc, Mutex, atomic::AtomicU64},
};
+use async_broadcast::{Receiver, Sender, broadcast};
use bytes::Bytes;
-use iggy_common::{Command, IggyError};
+use iggy_common::{Command, DiagnosticEvent, IggyError};
use tokio::sync::Notify;
use tracing::{error, trace};
@@ -25,6 +26,7 @@ pub struct AsyncTransportAdapter<F: QuicFactory, R: Runtime,
D: Driver> {
notify: Arc<Notify>,
id: AtomicU64,
driver: Arc<D>,
+ events: (Sender<DiagnosticEvent>, Receiver<DiagnosticEvent>),
}
impl<F, R, D> AsyncTransportAdapter<F, R, D>
@@ -61,12 +63,14 @@ where
Order::Reconnect => match self.factory.connect().await {
Ok(()) => {
self.core.lock().await.on_transport_connected();
+ self.publish_event(DiagnosticEvent::Connected).await;
return Ok(());
}
Err(e) => {
self.core.lock().await.on_transport_disconnected();
order = self.core.lock().await.poll_connect()?;
if matches!(order, Order::Noop) {
+
self.publish_event(DiagnosticEvent::Disconnected).await;
return Err(e);
}
}
@@ -74,8 +78,18 @@ where
Order::Noop => return Ok(()),
- _ => return Err(IggyError::CannotEstablishConnection),
+ _ => {
+ self.publish_event(DiagnosticEvent::Disconnected).await;
+ return Err(IggyError::CannotEstablishConnection)
+ },
}
}
}
+ // TODO add login/shutdown/disconnect
+
+ async fn publish_event(&self, event: DiagnosticEvent) {
+ if let Err(error) = self.events.0.broadcast(event).await {
+ error!("Failed to send a QUIC diagnostic event: {error}");
+ }
+ }
}