This is an automated email from the ASF dual-hosted git repository.

bashirbekov pushed a commit to branch draft-sans
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/draft-sans by this push:
     new 0073a308 del
0073a308 is described below

commit 0073a308a48f76e523f6eb3bd664a02a5a511ce0
Author: haze518 <[email protected]>
AuthorDate: Fri Jul 4 06:00:52 2025 +0600

    del
---
 Cargo.lock                            |  1 +
 core/sdk/Cargo.toml                   |  1 +
 core/sdk/src/connection/mod.rs        |  2 ++
 core/sdk/src/connection/quic/mod.rs   | 27 ++++++++++++++++
 core/sdk/src/connection/tcp/mod.rs    | 14 +++++++++
 core/sdk/src/connection/tcp/tcp.rs    | 18 +++++++++++
 core/sdk/src/connection/tcp/tls.rs    | 58 +++++++++++++++++++++++++++++++++++
 core/sdk/src/lib.rs                   |  2 ++
 core/sdk/src/transport_factory/mod.rs |  5 +++
 9 files changed, 128 insertions(+)

diff --git a/Cargo.lock b/Cargo.lock
index 0406b044..7bb6fafe 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -3707,6 +3707,7 @@ dependencies = [
  "serde",
  "tokio",
  "tokio-rustls",
+ "tokio-util",
  "tracing",
  "trait-variant",
  "webpki-roots 1.0.1",
diff --git a/core/sdk/Cargo.toml b/core/sdk/Cargo.toml
index c6b26729..44cf8d2d 100644
--- a/core/sdk/Cargo.toml
+++ b/core/sdk/Cargo.toml
@@ -57,6 +57,7 @@ rustls = { workspace = true }
 serde = { workspace = true }
 tokio = { workspace = true }
 tokio-rustls = { workspace = true }
+tokio-util.workspace = true
 tracing = { workspace = true }
 trait-variant = { workspace = true }
 webpki-roots = { workspace = true }
diff --git a/core/sdk/src/connection/mod.rs b/core/sdk/src/connection/mod.rs
new file mode 100644
index 00000000..85a4c13f
--- /dev/null
+++ b/core/sdk/src/connection/mod.rs
@@ -0,0 +1,2 @@
+pub mod tcp;
+pub mod quic;
diff --git a/core/sdk/src/connection/quic/mod.rs 
b/core/sdk/src/connection/quic/mod.rs
new file mode 100644
index 00000000..804f0b71
--- /dev/null
+++ b/core/sdk/src/connection/quic/mod.rs
@@ -0,0 +1,27 @@
+use std::sync::Arc;
+
+use iggy_common::QuicClientConfig;
+use quinn::{ClientConfig, Endpoint};
+
+use crate::connection::SocketFactory;
+
+pub struct QuicFactory {
+    config: ClientConfig,
+    quic_config: Arc<QuicClientConfig>, // rename to QuicFabricConfig
+}
+
+impl SocketFactory for QuicFactory {
+    type Stream = Arc<quinn::Connection>;
+
+    fn connect(&self, addr: std::net::SocketAddr) -> std::pin::Pin<Box<dyn 
Future<Output = std::io::Result<Self::Stream>> + Send>> {
+        let endpoint = Endpoint::client(addr)?;
+        let config = self.config.clone();
+        endpoint.set_default_client_config(config);
+
+        let connecting = endpoint.connect(addr, 
&self.quic_config.server_name)?;
+        Box::pin(async move {
+            let connection = connecting.await?;
+            Ok(stream)
+        })
+    }
+}
diff --git a/core/sdk/src/connection/tcp/mod.rs 
b/core/sdk/src/connection/tcp/mod.rs
new file mode 100644
index 00000000..ec1d2deb
--- /dev/null
+++ b/core/sdk/src/connection/tcp/mod.rs
@@ -0,0 +1,14 @@
+pub mod tcp;
+pub mod tls;
+
+use std::{io, net::SocketAddr, pin::Pin};
+use futures::{AsyncRead, AsyncWrite};
+
+pub trait AsyncStream: AsyncRead + AsyncWrite + Unpin + Send + 'static {}
+impl<T> AsyncStream for T where T: AsyncRead + AsyncWrite + Unpin + Send + 
'static {}
+
+pub trait SocketFactory { // TODO rename to something common
+    type Stream: AsyncStream;
+
+    fn connect(&self, addr: SocketAddr) -> Pin<Box<dyn Future<Output = 
io::Result<Self::Stream>> + Send>>;
+}
diff --git a/core/sdk/src/connection/tcp/tcp.rs 
b/core/sdk/src/connection/tcp/tcp.rs
new file mode 100644
index 00000000..561a81d7
--- /dev/null
+++ b/core/sdk/src/connection/tcp/tcp.rs
@@ -0,0 +1,18 @@
+use crate::connection::tcp::SocketFactory;
+use tokio_util::compat::TokioAsyncReadCompatExt;
+
+pub type TokioCompatStream = tokio_util::compat::Compat<tokio::net::TcpStream>;
+
+pub struct TokioTcpFactory;
+
+impl SocketFactory for TokioTcpFactory {
+    type Stream = TokioCompatStream;
+
+    fn connect(&self, addr: std::net::SocketAddr) -> std::pin::Pin<Box<dyn 
Future<Output = std::io::Result<Self::Stream>> + Send>> {
+        Box::pin(async move {
+            let stream = tokio::net::TcpStream::connect(addr).await?;
+            stream.set_nodelay(true)?;
+            Ok(stream.compat())
+        })
+    }
+}
diff --git a/core/sdk/src/connection/tcp/tls.rs 
b/core/sdk/src/connection/tcp/tls.rs
new file mode 100644
index 00000000..624b09e2
--- /dev/null
+++ b/core/sdk/src/connection/tcp/tls.rs
@@ -0,0 +1,58 @@
+use std::{io, net::SocketAddr, pin::Pin};
+
+use rustls::pki_types::ServerName;
+use tokio_util::compat::{Compat, TokioAsyncReadCompatExt};
+
+use crate::connection::tcp::{AsyncStream, SocketFactory};
+
+pub trait TlsConnector<Plain: AsyncStream> {
+    type TlsStream: AsyncStream;
+
+    fn handshake(
+        &self,
+        plain: Plain,
+        domain: &str,
+    ) -> Pin<Box<dyn Future<Output = io::Result<Self::TlsStream>> + Send>>;
+}
+
+pub struct TokioRustls {
+    inner: tokio_rustls::TlsConnector,
+}
+impl TlsConnector<Compat<tokio::net::TcpStream>> for TokioRustls {
+    type TlsStream = 
Compat<tokio_rustls::client::TlsStream<tokio::net::TcpStream>>;
+
+    fn handshake(
+        &self,
+        plain: Compat<tokio::net::TcpStream>,
+        domain: &str,
+    ) -> Pin<Box<dyn Future<Output = std::io::Result<Self::TlsStream>> + 
Send>> {
+        let dom = ServerName::try_from(domain.to_owned()).unwrap();
+        let tcp = plain.into_inner();
+        let fut = self.inner.connect(dom, tcp);
+        Box::pin(async move { Ok(fut.await?.compat()) })
+    }
+}
+
+pub struct TlsFactory<F, C> {
+    base: F,
+    tls:  C,
+    domain: String,
+}
+
+impl<F, C> SocketFactory for TlsFactory<F, C>
+where
+    F: SocketFactory,
+    C: TlsConnector<F::Stream> + Clone + Send + Sync + 'static,
+{
+    type Stream = C::TlsStream;
+
+    fn connect(&self, addr: SocketAddr) -> Pin<Box<dyn Future<Output = 
io::Result<Self::Stream>> + Send>> {
+        let base = self.base.connect(addr);
+        let tls  = self.tls.clone();
+        let dom  = self.domain.clone();
+        Box::pin(async move {
+            let plain = base.await?;
+            tls.handshake(plain, &dom).await
+        })
+    }
+}
diff --git a/core/sdk/src/lib.rs b/core/sdk/src/lib.rs
index d1c5397d..87efe42c 100644
--- a/core/sdk/src/lib.rs
+++ b/core/sdk/src/lib.rs
@@ -27,3 +27,5 @@ pub mod quic;
 pub mod stream_builder;
 pub mod tcp;
 pub mod proto;
+pub mod transport_factory;
+pub mod connection;
diff --git a/core/sdk/src/transport_factory/mod.rs 
b/core/sdk/src/transport_factory/mod.rs
new file mode 100644
index 00000000..fc429407
--- /dev/null
+++ b/core/sdk/src/transport_factory/mod.rs
@@ -0,0 +1,5 @@
+use futures::{AsyncRead, AsyncWrite};
+
+pub trait AsyncStream: AsyncRead + AsyncWrite + Unpin + Send + 'static {}
+impl<T> AsyncStream for T where T: AsyncRead + AsyncWrite + Unpin + Send + 
'static {}
+

Reply via email to