This is an automated email from the ASF dual-hosted git repository.

bashirbekov pushed a commit to branch draft-sans
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/draft-sans by this push:
     new 178f9c19 del
178f9c19 is described below

commit 178f9c19cf4aa716c2e3f8a74921dd0b850efe42
Author: haze518 <[email protected]>
AuthorDate: Sat Jul 5 07:34:24 2025 +0600

    del
---
 core/sdk/src/connection/quic/mod.rs    | 146 +++++++++++++++++++++++++++++----
 core/sdk/src/connection/tcp/mod.rs     |   4 +-
 core/sdk/src/connection/tcp/tcp.rs     |  18 +++-
 core/sdk/src/connection/tcp/tls.rs     | 102 +++++++++++------------
 core/sdk/src/lib.rs                    |   1 +
 core/sdk/src/proto/mod.rs              |  18 ----
 core/sdk/src/proto/tcp_adapter.rs      |  31 -------
 core/sdk/src/quic/mod.rs               |   2 +-
 core/sdk/src/transport_adapter/mod.rs  |  10 +++
 core/sdk/src/transport_adapter/quic.rs |  42 ++++++++++
 10 files changed, 252 insertions(+), 122 deletions(-)

diff --git a/core/sdk/src/connection/quic/mod.rs 
b/core/sdk/src/connection/quic/mod.rs
index 804f0b71..c5d9feed 100644
--- a/core/sdk/src/connection/quic/mod.rs
+++ b/core/sdk/src/connection/quic/mod.rs
@@ -1,27 +1,141 @@
 use std::sync::Arc;
+use std::{io, net::SocketAddr, pin::Pin, time::Duration};
+use iggy_common::{IggyError, QuicClientConfig};
+use rustls::crypto::CryptoProvider;
+use tracing::{error, warn};
+use crate::quic::skip_server_verification::SkipServerVerification;
 
-use iggy_common::QuicClientConfig;
-use quinn::{ClientConfig, Endpoint};
+use quinn::crypto::rustls::QuicClientConfig as QuinnQuicClientConfig;
+use quinn::{ClientConfig, Connection, Endpoint, IdleTimeout, VarInt};
 
-use crate::connection::SocketFactory;
-
-pub struct QuicFactory {
-    config: ClientConfig,
-    quic_config: Arc<QuicClientConfig>, // rename to QuicFabricConfig
+pub trait QuicFactory {
+    type Conn;
+    fn connect(&self) -> Pin<Box<dyn Future<Output = Result<Self::Conn, 
IggyError>> + Send>>;
 }
 
-impl SocketFactory for QuicFactory {
-    type Stream = Arc<quinn::Connection>;
+pub struct QuinnFactory {
+    config: Arc<QuicClientConfig>,
+    ep: Arc<Endpoint>,
+    server_address: SocketAddr,
+}
 
-    fn connect(&self, addr: std::net::SocketAddr) -> std::pin::Pin<Box<dyn 
Future<Output = std::io::Result<Self::Stream>> + Send>> {
-        let endpoint = Endpoint::client(addr)?;
-        let config = self.config.clone();
-        endpoint.set_default_client_config(config);
+impl QuicFactory for QuinnFactory {
+    type Conn = Connection;
 
-        let connecting = endpoint.connect(addr, 
&self.quic_config.server_name)?;
+    fn connect(&self) -> Pin<Box<dyn Future<Output = Result<Self::Conn, 
IggyError>> + Send>> {
+        let ep  = self.ep.clone();
+        let sn  = self.config.server_name.clone();
+        let sa = self.server_address.clone();
         Box::pin(async move {
-            let connection = connecting.await?;
-            Ok(stream)
+            let connecting = ep
+                .connect(sa, &sn)
+                .map_err(|_| IggyError::CannotEstablishConnection)?;
+
+            connecting
+                .await
+                .map_err(|_| IggyError::CannotEstablishConnection)
         })
     }
 }
+
+impl QuinnFactory {
+    pub fn new(config: QuicClientConfig) -> Result<Self, IggyError> {
+        let cfg = Arc::new(config);
+
+        let server_address = cfg
+            .server_address
+            .parse::<SocketAddr>()
+            .map_err(|error| {
+                error!("Invalid server address: {error}");
+                IggyError::InvalidServerAddress
+            })?;
+        let client_address = if server_address.is_ipv6()
+            && cfg.client_address == QuicClientConfig::default().client_address
+        {
+            "[::1]:0"
+        } else {
+            &cfg.client_address
+        }
+        .parse::<SocketAddr>()
+        .map_err(|error| {
+            error!("Invalid client address: {error}");
+            IggyError::InvalidClientAddress
+        })?;
+
+        let quic_config = configure(&cfg)?;
+        let endpoint = Endpoint::client(client_address);
+        if endpoint.is_err() {
+            error!("Cannot create client endpoint");
+            return Err(IggyError::CannotCreateEndpoint);
+        }
+
+        let mut endpoint = endpoint.unwrap();
+        endpoint.set_default_client_config(quic_config);
+
+        Ok(Self { config: cfg, ep: Arc::new(endpoint), server_address })
+    }
+}
+
+fn configure(config: &QuicClientConfig) -> Result<ClientConfig, IggyError> {
+    let max_concurrent_bidi_streams = 
VarInt::try_from(config.max_concurrent_bidi_streams);
+    if max_concurrent_bidi_streams.is_err() {
+        error!(
+            "Invalid 'max_concurrent_bidi_streams': {}",
+            config.max_concurrent_bidi_streams
+        );
+        return Err(IggyError::InvalidConfiguration);
+    }
+
+    let receive_window = VarInt::try_from(config.receive_window);
+    if receive_window.is_err() {
+        error!("Invalid 'receive_window': {}", config.receive_window);
+        return Err(IggyError::InvalidConfiguration);
+    }
+
+    let mut transport = quinn::TransportConfig::default();
+    transport.initial_mtu(config.initial_mtu);
+    transport.send_window(config.send_window);
+    transport.receive_window(receive_window.unwrap());
+    transport.datagram_send_buffer_size(config.datagram_send_buffer_size as 
usize);
+    
transport.max_concurrent_bidi_streams(max_concurrent_bidi_streams.unwrap());
+    if config.keep_alive_interval > 0 {
+        
transport.keep_alive_interval(Some(Duration::from_millis(config.keep_alive_interval)));
+    }
+    if config.max_idle_timeout > 0 {
+        let max_idle_timeout =
+            
IdleTimeout::try_from(Duration::from_millis(config.max_idle_timeout));
+        if max_idle_timeout.is_err() {
+            error!("Invalid 'max_idle_timeout': {}", config.max_idle_timeout);
+            return Err(IggyError::InvalidConfiguration);
+        }
+        transport.max_idle_timeout(Some(max_idle_timeout.unwrap()));
+    }
+
+    if CryptoProvider::get_default().is_none() {
+        if let Err(e) = 
rustls::crypto::ring::default_provider().install_default() {
+            warn!(
+                "Failed to install rustls crypto provider. Error: {:?}. This 
may be normal if another thread installed it first.",
+                e
+            );
+        }
+    }
+    let mut client_config = match config.validate_certificate {
+        true => ClientConfig::with_platform_verifier(),
+        false => {
+            match QuinnQuicClientConfig::try_from(
+                rustls::ClientConfig::builder()
+                    .dangerous()
+                    
.with_custom_certificate_verifier(SkipServerVerification::new())
+                    .with_no_client_auth(),
+            ) {
+                Ok(config) => ClientConfig::new(Arc::new(config)),
+                Err(error) => {
+                    error!("Failed to create QUIC client configuration: 
{error}");
+                    return Err(IggyError::InvalidConfiguration);
+                }
+            }
+        }
+    };
+    client_config.transport_config(Arc::new(transport));
+    Ok(client_config)
+}
diff --git a/core/sdk/src/connection/tcp/mod.rs 
b/core/sdk/src/connection/tcp/mod.rs
index ec1d2deb..3d133a45 100644
--- a/core/sdk/src/connection/tcp/mod.rs
+++ b/core/sdk/src/connection/tcp/mod.rs
@@ -7,8 +7,8 @@ use futures::{AsyncRead, AsyncWrite};
 pub trait AsyncStream: AsyncRead + AsyncWrite + Unpin + Send + 'static {}
 impl<T> AsyncStream for T where T: AsyncRead + AsyncWrite + Unpin + Send + 
'static {}
 
-pub trait SocketFactory { // TODO rename to something common
+pub trait SocketFactory {
     type Stream: AsyncStream;
 
-    fn connect(&self, addr: SocketAddr) -> Pin<Box<dyn Future<Output = 
io::Result<Self::Stream>> + Send>>;
+    fn connect(&self) -> Pin<Box<dyn Future<Output = io::Result<Self::Stream>> 
+ Send>>;
 }
diff --git a/core/sdk/src/connection/tcp/tcp.rs 
b/core/sdk/src/connection/tcp/tcp.rs
index 561a81d7..d511867a 100644
--- a/core/sdk/src/connection/tcp/tcp.rs
+++ b/core/sdk/src/connection/tcp/tcp.rs
@@ -1,18 +1,30 @@
+use std::sync::Arc;
+
 use crate::connection::tcp::SocketFactory;
+use iggy_common::TcpClientConfig;
 use tokio_util::compat::TokioAsyncReadCompatExt;
 
 pub type TokioCompatStream = tokio_util::compat::Compat<tokio::net::TcpStream>;
 
-pub struct TokioTcpFactory;
+pub struct TokioTcpFactory {
+    config: Arc<TcpClientConfig>,
+}
 
 impl SocketFactory for TokioTcpFactory {
     type Stream = TokioCompatStream;
 
-    fn connect(&self, addr: std::net::SocketAddr) -> std::pin::Pin<Box<dyn 
Future<Output = std::io::Result<Self::Stream>> + Send>> {
+    fn connect(&self) -> std::pin::Pin<Box<dyn Future<Output = 
std::io::Result<Self::Stream>> + Send>> {
+        let sa = self.config.server_address.clone();
         Box::pin(async move {
-            let stream = tokio::net::TcpStream::connect(addr).await?;
+            let stream = tokio::net::TcpStream::connect(sa).await?;
             stream.set_nodelay(true)?;
             Ok(stream.compat())
         })
     }
 }
+
+impl TokioTcpFactory {
+    pub fn new(config: TcpClientConfig) -> Self {
+        Self { config: Arc::new(config) }
+    }
+}
diff --git a/core/sdk/src/connection/tcp/tls.rs 
b/core/sdk/src/connection/tcp/tls.rs
index 624b09e2..a6949067 100644
--- a/core/sdk/src/connection/tcp/tls.rs
+++ b/core/sdk/src/connection/tcp/tls.rs
@@ -5,54 +5,54 @@ use tokio_util::compat::{Compat, TokioAsyncReadCompatExt};
 
 use crate::connection::tcp::{AsyncStream, SocketFactory};
 
-pub trait TlsConnector<Plain: AsyncStream> {
-    type TlsStream: AsyncStream;
-
-    fn handshake(
-        &self,
-        plain: Plain,
-        domain: &str,
-    ) -> Pin<Box<dyn Future<Output = io::Result<Self::TlsStream>> + Send>>;
-}
-
-pub struct TokioRustls {
-    inner: tokio_rustls::TlsConnector,
-}
-impl TlsConnector<Compat<tokio::net::TcpStream>> for TokioRustls {
-    type TlsStream = 
Compat<tokio_rustls::client::TlsStream<tokio::net::TcpStream>>;
-
-    fn handshake(
-        &self,
-        plain: Compat<tokio::net::TcpStream>,
-        domain: &str,
-    ) -> Pin<Box<dyn Future<Output = std::io::Result<Self::TlsStream>> + 
Send>> {
-        let dom = ServerName::try_from(domain.to_owned()).unwrap();
-        let tcp = plain.into_inner();
-        let fut = self.inner.connect(dom, tcp);
-        Box::pin(async move { Ok(fut.await?.compat()) })
-    }
-}
-
-pub struct TlsFactory<F, C> {
-    base: F,
-    tls:  C,
-    domain: String,
-}
-
-impl<F, C> SocketFactory for TlsFactory<F, C>
-where
-    F: SocketFactory,
-    C: TlsConnector<F::Stream> + Clone + Send + Sync + 'static,
-{
-    type Stream = C::TlsStream;
-
-    fn connect(&self, addr: SocketAddr) -> Pin<Box<dyn Future<Output = 
io::Result<Self::Stream>> + Send>> {
-        let base = self.base.connect(addr);
-        let tls  = self.tls.clone();
-        let dom  = self.domain.clone();
-        Box::pin(async move {
-            let plain = base.await?;
-            tls.handshake(plain, &dom).await
-        })
-    }
-}
+// pub trait TlsConnector<Plain: AsyncStream> {
+//     type TlsStream: AsyncStream;
+
+//     fn handshake(
+//         &self,
+//         plain: Plain,
+//         domain: &str,
+//     ) -> Pin<Box<dyn Future<Output = io::Result<Self::TlsStream>> + Send>>;
+// }
+
+// pub struct TokioRustls {
+//     inner: tokio_rustls::TlsConnector,
+// }
+// impl TlsConnector<Compat<tokio::net::TcpStream>> for TokioRustls {
+//     type TlsStream = 
Compat<tokio_rustls::client::TlsStream<tokio::net::TcpStream>>;
+
+//     fn handshake(
+//         &self,
+//         plain: Compat<tokio::net::TcpStream>,
+//         domain: &str,
+//     ) -> Pin<Box<dyn Future<Output = std::io::Result<Self::TlsStream>> + 
Send>> {
+//         let dom = ServerName::try_from(domain.to_owned()).unwrap();
+//         let tcp = plain.into_inner();
+//         let fut = self.inner.connect(dom, tcp);
+//         Box::pin(async move { Ok(fut.await?.compat()) })
+//     }
+// }
+
+// pub struct TlsFactory<F, C> {
+//     base: F,
+//     tls:  C,
+//     domain: String,
+// }
+
+// impl<F, C> SocketFactory for TlsFactory<F, C>
+// where
+//     F: SocketFactory,
+//     C: TlsConnector<F::Stream> + Clone + Send + Sync + 'static,
+// {
+//     type Stream = C::TlsStream;
+
+//     fn connect(&self) -> Pin<Box<dyn Future<Output = 
io::Result<Self::Stream>> + Send>> {
+//         let base = self.base.connect(addr);
+//         let tls  = self.tls.clone();
+//         let dom  = self.domain.clone();
+//         Box::pin(async move {
+//             let plain = base.await?;
+//             tls.handshake(plain, &dom).await
+//         })
+//     }
+// }
diff --git a/core/sdk/src/lib.rs b/core/sdk/src/lib.rs
index 87efe42c..e27a3554 100644
--- a/core/sdk/src/lib.rs
+++ b/core/sdk/src/lib.rs
@@ -29,3 +29,4 @@ pub mod tcp;
 pub mod proto;
 pub mod transport_factory;
 pub mod connection;
+pub mod transport_adapter;
diff --git a/core/sdk/src/proto/mod.rs b/core/sdk/src/proto/mod.rs
index 478f6fac..62d0aca6 100644
--- a/core/sdk/src/proto/mod.rs
+++ b/core/sdk/src/proto/mod.rs
@@ -1,20 +1,2 @@
 pub mod connection;
-pub mod tcp_adapter;
 pub mod runtime;
-
-use std::collections::VecDeque;
-
-use bytes::Bytes;
-use iggy_common::ClientState;
-
-pub struct IggyCore {
-    buffer: VecDeque<Bytes>,
-    current_state: ClientState,
-}
-
-impl IggyCore {
-    pub fn write(&mut self, payload: Bytes) {
-        self.buffer.push_back(payload)
-    }
-
-}
diff --git a/core/sdk/src/proto/tcp_adapter.rs 
b/core/sdk/src/proto/tcp_adapter.rs
deleted file mode 100644
index 41edd05d..00000000
--- a/core/sdk/src/proto/tcp_adapter.rs
+++ /dev/null
@@ -1,31 +0,0 @@
-use std::{sync::Mutex, task::Context};
-
-use bytes::Bytes;
-use futures::future::poll_fn;
-use iggy_common::{Command, IggyError};
-
-use crate::proto::{connection::IggyCore, runtime::{Lockable, Runtime}};
-use crate::tcp::tcp_connection_stream_kind::ConnectionStreamKind;
-
-pub struct TCPAdapter<R: Runtime> {
-    rt: R,
-    core: Mutex<IggyCore>,
-    pub(crate) stream: R::Mutex<Option<ConnectionStreamKind>>,
-}
-
-impl<R: Runtime> TCPAdapter<R> {
-    pub async fn send_with_response(&self, command: &impl Command) -> 
Result<Bytes, IggyError> {
-        // poll_fn(|cx| )
-        Ok(Bytes::new())
-    }
-
-    fn write(&self, command: &impl Command) -> Result<(), IggyError> {
-        command.validate()?;
-        self.core.lock().unwrap().write(command)
-    }
-
-    fn execute_poll(&mut self, cx: &mut Context, command: &impl Command) -> 
Result<Bytes, IggyError> {
-        self.write(command)?;
-        
-    }
-}
diff --git a/core/sdk/src/quic/mod.rs b/core/sdk/src/quic/mod.rs
index 0ffce248..c67398e0 100644
--- a/core/sdk/src/quic/mod.rs
+++ b/core/sdk/src/quic/mod.rs
@@ -17,4 +17,4 @@
  */
 
 pub mod quick_client;
-mod skip_server_verification;
+pub mod skip_server_verification;
diff --git a/core/sdk/src/transport_adapter/mod.rs 
b/core/sdk/src/transport_adapter/mod.rs
new file mode 100644
index 00000000..ee44fefc
--- /dev/null
+++ b/core/sdk/src/transport_adapter/mod.rs
@@ -0,0 +1,10 @@
+pub mod quic;
+
+use std::pin::Pin;
+
+use bytes::Bytes;
+use iggy_common::{Command, IggyError};
+
+pub trait Driver {
+    fn send_with_response<'a, T: Command>(&'a self, command: &'a T) -> 
Pin<Box<dyn Future<Output = Result<Bytes, IggyError>> + Send + 'a>>;
+}
diff --git a/core/sdk/src/transport_adapter/quic.rs 
b/core/sdk/src/transport_adapter/quic.rs
new file mode 100644
index 00000000..574cfaec
--- /dev/null
+++ b/core/sdk/src/transport_adapter/quic.rs
@@ -0,0 +1,42 @@
+use std::{pin::Pin, sync::{Arc, Mutex}};
+
+use bytes::Bytes;
+
+use crate::{connection::quic::QuicFactory, proto::{connection::IggyCore, 
runtime::{Lockable, Runtime}}, transport_adapter::Driver};
+
+pub struct QuicAdapter<F: QuicFactory, R: Runtime> {
+    factory: Arc<F>,
+    rt: Arc<R>,
+    core: R::Mutex<IggyCore>,
+}
+
+impl<F: QuicFactory + Send + Sync + 'static, R: Runtime> Driver for 
QuicAdapter<F, R> {
+    fn send_with_response<'a, T: iggy_common::Command>(&'a self, command: &'a 
T) -> Pin<Box<dyn Future<Output = Result<bytes::Bytes, iggy_common::IggyError>> 
+ Send + 'a>> {
+        Box::pin(async move {
+            self.core.lock().await.write(command)?;
+
+            loop {
+                // опрашиваем core на момент того, не записала ли таска 
драйвера туда новые данные.
+                // То есть вызываем poll в цикле(или wake). При этом чтобы 
данные не перепутались мы
+                // можем сохранять serial id здесь при write, так и драйвер 
будет знать id, и тут тоже мы будем понимать что ждать.
+                // То есть тогда драйвер заисывает payload: Bytes + id: uint64
+            }
+
+            Ok(Bytes::new())
+        })
+    }
+}
+
+
+
+/*
+// tood для transprot
+    fn send_with_response<'a, T: iggy_common::Command>(&'a self, command: &'a 
T) -> Pin<Box<dyn Future<Output = Result<bytes::Bytes, iggy_common::IggyError>> 
+ Send + 'a>> {
+        Box::pin(async move {
+            self.core.lock().await.write(command)?;
+
+            Ok(Bytes::new())
+        })
+    }
+
+*/
\ No newline at end of file

Reply via email to