This is an automated email from the ASF dual-hosted git repository.
bashirbekov pushed a commit to branch draft-sans
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/draft-sans by this push:
new c5e8c170 del
c5e8c170 is described below
commit c5e8c1703f244f67086d2929c3634caec6448cdd
Author: haze518 <[email protected]>
AuthorDate: Wed Jul 9 10:10:47 2025 +0600
del
---
core/sdk/src/connection/mod.rs | 10 ++++++++
core/sdk/src/connection/quic/mod.rs | 40 ++++++++++++++++++++++++------
core/sdk/src/connection/tcp/mod.rs | 1 +
core/sdk/src/transport_adapter/async.rs | 35 ++++++++++++++++++---------
core/sdk/src/transport_adapter/quic.rs | 43 ---------------------------------
5 files changed, 67 insertions(+), 62 deletions(-)
diff --git a/core/sdk/src/connection/mod.rs b/core/sdk/src/connection/mod.rs
index 85a4c13f..d65a1318 100644
--- a/core/sdk/src/connection/mod.rs
+++ b/core/sdk/src/connection/mod.rs
@@ -1,2 +1,12 @@
+use std::pin::Pin;
+
+use iggy_common::IggyError;
+
pub mod tcp;
pub mod quic;
+
+pub trait ConnectionFactory {
+ fn connect(&self) -> Pin<Box<dyn Future<Output = Result<(), IggyError>> +
Send + Sync>>;
+ fn is_alive(&self) -> Pin<Box<dyn Future<Output = bool>>>;
+ fn shutdown(&self) -> Pin<Box<dyn Future<Output = Result<(), IggyError>> +
Send + Sync>>;
+}
diff --git a/core/sdk/src/connection/quic/mod.rs
b/core/sdk/src/connection/quic/mod.rs
index 67d1683c..422e7f38 100644
--- a/core/sdk/src/connection/quic/mod.rs
+++ b/core/sdk/src/connection/quic/mod.rs
@@ -6,6 +6,7 @@ use iggy_common::{IggyError, QuicClientConfig};
use rustls::crypto::CryptoProvider;
use tokio::io::AsyncWriteExt;
use tracing::{error, warn};
+use crate::connection::ConnectionFactory;
use crate::proto::runtime::sync;
use crate::quic::skip_server_verification::SkipServerVerification;
@@ -17,11 +18,9 @@ pub trait StreamPair: Send {
fn read_chunk<'a>(&'a mut self, at_most: usize) -> Pin<Box<dyn
Future<Output = Result<Option<Bytes>, IggyError>> + Send + 'a>>;
}
-pub trait QuicFactory {
+pub trait QuicFactory: ConnectionFactory {
type Stream: StreamPair;
-
- fn connect(&self) -> Pin<Box<dyn Future<Output = Result<(), IggyError>> +
Send>>;
-
+
fn open_stream(&self) -> Pin<Box<dyn Future<Output = Result<Self::Stream,
IggyError>> + Send + '_>>;
}
@@ -66,10 +65,8 @@ pub struct QuinnFactory {
server_address: SocketAddr,
}
-impl QuicFactory for QuinnFactory {
- type Stream = QuinnStreamPair;
-
- fn connect(&self) -> Pin<Box<dyn Future<Output = Result<(), IggyError>> +
Send>> {
+impl ConnectionFactory for QuinnFactory {
+ fn connect(&self) -> Pin<Box<dyn Future<Output = Result<(), IggyError>> +
Send + Sync>> {
let ep = self.ep.clone();
let sn = self.config.server_name.clone();
let sa = self.server_address.clone();
@@ -88,6 +85,33 @@ impl QuicFactory for QuinnFactory {
})
}
+ fn is_alive(&self) -> Pin<Box<dyn Future<Output = bool>>> {
+ let conn = self.connection.clone();
+ Box::pin(async move {
+ let conn = conn.lock().await;
+ match conn.as_ref() {
+ Some(c) => c.close_reason().is_some(),
+ None => false,
+ }
+ })
+ }
+
+ fn shutdown(&self) -> Pin<Box<dyn Future<Output = Result<(), IggyError>> +
Send + Sync>> {
+ let conn = self.connection.clone();
+ let ep = self.ep.clone();
+ Box::pin(async move {
+ if let Some(conn) = conn.lock().await.take() {
+ conn.close(0u32.into(), b"");
+ }
+ ep.wait_idle().await;
+ Ok(())
+ })
+ }
+}
+
+impl QuicFactory for QuinnFactory {
+ type Stream = QuinnStreamPair;
+
fn open_stream(&self) -> Pin<Box<dyn Future<Output = Result<Self::Stream,
IggyError>> + Send + '_>> {
let conn = self.connection.clone();
Box::pin(async move {
diff --git a/core/sdk/src/connection/tcp/mod.rs
b/core/sdk/src/connection/tcp/mod.rs
index 3d133a45..f4774631 100644
--- a/core/sdk/src/connection/tcp/mod.rs
+++ b/core/sdk/src/connection/tcp/mod.rs
@@ -12,3 +12,4 @@ pub trait SocketFactory {
fn connect(&self) -> Pin<Box<dyn Future<Output = io::Result<Self::Stream>>
+ Send>>;
}
+
diff --git a/core/sdk/src/transport_adapter/async.rs
b/core/sdk/src/transport_adapter/async.rs
index af98e323..e0a90446 100644
--- a/core/sdk/src/transport_adapter/async.rs
+++ b/core/sdk/src/transport_adapter/async.rs
@@ -10,16 +10,16 @@ use tokio::sync::Notify;
use tracing::{error, trace};
use crate::{
- connection::quic::QuicFactory,
+ connection::{quic::QuicFactory, ConnectionFactory},
driver::Driver,
proto::{
connection::{IggyCore, Order},
- runtime::{self, Runtime, sync},
+ runtime::{self, sync, Runtime},
},
transport_adapter::RespFut,
};
-pub struct AsyncTransportAdapter<F: QuicFactory, R: Runtime, D: Driver> {
+pub struct AsyncTransportAdapter<F: ConnectionFactory, R: Runtime, D: Driver> {
factory: Arc<F>,
rt: Arc<R>,
core: sync::Mutex<IggyCore>,
@@ -31,15 +31,13 @@ pub struct AsyncTransportAdapter<F: QuicFactory, R:
Runtime, D: Driver> {
impl<F, R, D> AsyncTransportAdapter<F, R, D>
where
- F: QuicFactory + Send + Sync + 'static,
+ F: ConnectionFactory + Send + Sync + 'static,
R: Runtime + Send + Sync + 'static,
D: Driver + Send + Sync,
{
- pub fn send_with_response<'a, T: Command>(
- &'a self,
- command: &'a T,
- ) -> Pin<Box<dyn Future<Output = Result<RespFut, IggyError>> + Send + Sync
+ 'a>> {
- Box::pin(async move {
+ async fn send_with_response<T: Command>(&self, command: &T) ->
Result<RespFut, IggyError> {
+ self.ensure_connected().await?;
+
let (tx, rx) = runtime::oneshot::<Bytes>();
let current_id = self.id.fetch_add(1,
std::sync::atomic::Ordering::SeqCst);
@@ -48,7 +46,6 @@ where
self.notify.notify_waiters();
Ok(RespFut { rx: rx })
- })
}
pub async fn connect(&self) -> Result<(), IggyError> {
@@ -85,11 +82,27 @@ where
}
}
}
- // TODO add login/shutdown/disconnect
async fn publish_event(&self, event: DiagnosticEvent) {
if let Err(error) = self.events.0.broadcast(event).await {
error!("Failed to send a QUIC diagnostic event: {error}");
}
}
+
+ async fn ensure_connected(&self) -> Result<(), IggyError> {
+ if self.factory.is_alive().await {
+ return Ok(())
+ }
+ self.shutdown().await?;
+ self.connect().await
+ }
+
+ async fn shutdown(&self) -> Result<(), IggyError> {
+ self.core.lock().await.on_transport_disconnected();
+ self.factory.shutdown().await?;
+ self.publish_event(DiagnosticEvent::Shutdown).await;
+ Ok(())
+ }
+
+ // TODO add async fn login
}
diff --git a/core/sdk/src/transport_adapter/quic.rs
b/core/sdk/src/transport_adapter/quic.rs
deleted file mode 100644
index 8900df5f..00000000
--- a/core/sdk/src/transport_adapter/quic.rs
+++ /dev/null
@@ -1,43 +0,0 @@
-use std::{collections::VecDeque, pin::Pin, sync::{Arc, Mutex}, task::Waker};
-
-use bytes::Bytes;
-use futures::{channel::oneshot, FutureExt};
-use iggy_common::{Command, IggyError};
-
-use crate::{connection::quic::QuicFactory, proto::{connection::IggyCore,
runtime::{self, Lockable, Runtime}}, transport_adapter::{RespFut,
TransportAdapter}};
-
-pub struct QuicAdapter<F: QuicFactory, R: Runtime> {
- factory: Arc<F>,
- rt: Arc<R>,
- core: R::Mutex<IggyCore>,
- waiters: VecDeque<Waker>
-}
-
-impl<F, R> TransportAdapter for QuicAdapter<F, R>
-where
- F: QuicFactory + Send + Sync + 'static,
- R: Runtime,
-{
- fn send_with_response<'a, T: Command>(&'a self, command: &'a T) ->
Pin<Box<dyn Future<Output = Result<RespFut, IggyError>> + Send + 'a >> {
- Box::pin(async move {
- let (tx, rx) = runtime::oneshot::<Bytes>();
- self.core.lock().await.write(command)?;
- self.waiters.push_back(value);
- Ok(RespFut { rx: rx })
- })
- }
-}
-
-
-
-/*
-// tood для transprot
- fn send_with_response<'a, T: iggy_common::Command>(&'a self, command: &'a
T) -> Pin<Box<dyn Future<Output = Result<bytes::Bytes, iggy_common::IggyError>>
+ Send + 'a>> {
- Box::pin(async move {
- self.core.lock().await.write(command)?;
-
- Ok(Bytes::new())
- })
- }
-
-*/
\ No newline at end of file