This is an automated email from the ASF dual-hosted git repository.
bashirbekov pushed a commit to branch draft-sans
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/draft-sans by this push:
new 0073a308 del
0073a308 is described below
commit 0073a308a48f76e523f6eb3bd664a02a5a511ce0
Author: haze518 <[email protected]>
AuthorDate: Fri Jul 4 06:00:52 2025 +0600
del
---
Cargo.lock | 1 +
core/sdk/Cargo.toml | 1 +
core/sdk/src/connection/mod.rs | 2 ++
core/sdk/src/connection/quic/mod.rs | 27 ++++++++++++++++
core/sdk/src/connection/tcp/mod.rs | 14 +++++++++
core/sdk/src/connection/tcp/tcp.rs | 18 +++++++++++
core/sdk/src/connection/tcp/tls.rs | 58 +++++++++++++++++++++++++++++++++++
core/sdk/src/lib.rs | 2 ++
core/sdk/src/transport_factory/mod.rs | 5 +++
9 files changed, 128 insertions(+)
diff --git a/Cargo.lock b/Cargo.lock
index 0406b044..7bb6fafe 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -3707,6 +3707,7 @@ dependencies = [
"serde",
"tokio",
"tokio-rustls",
+ "tokio-util",
"tracing",
"trait-variant",
"webpki-roots 1.0.1",
diff --git a/core/sdk/Cargo.toml b/core/sdk/Cargo.toml
index c6b26729..44cf8d2d 100644
--- a/core/sdk/Cargo.toml
+++ b/core/sdk/Cargo.toml
@@ -57,6 +57,7 @@ rustls = { workspace = true }
serde = { workspace = true }
tokio = { workspace = true }
tokio-rustls = { workspace = true }
+tokio-util.workspace = true
tracing = { workspace = true }
trait-variant = { workspace = true }
webpki-roots = { workspace = true }
diff --git a/core/sdk/src/connection/mod.rs b/core/sdk/src/connection/mod.rs
new file mode 100644
index 00000000..85a4c13f
--- /dev/null
+++ b/core/sdk/src/connection/mod.rs
@@ -0,0 +1,2 @@
+pub mod tcp;
+pub mod quic;
diff --git a/core/sdk/src/connection/quic/mod.rs
b/core/sdk/src/connection/quic/mod.rs
new file mode 100644
index 00000000..804f0b71
--- /dev/null
+++ b/core/sdk/src/connection/quic/mod.rs
@@ -0,0 +1,27 @@
+use std::sync::Arc;
+
+use iggy_common::QuicClientConfig;
+use quinn::{ClientConfig, Endpoint};
+
+use crate::connection::SocketFactory;
+
+pub struct QuicFactory {
+ config: ClientConfig,
+ quic_config: Arc<QuicClientConfig>, // rename to QuicFabricConfig
+}
+
+impl SocketFactory for QuicFactory {
+ type Stream = Arc<quinn::Connection>;
+
+ fn connect(&self, addr: std::net::SocketAddr) -> std::pin::Pin<Box<dyn
Future<Output = std::io::Result<Self::Stream>> + Send>> {
+ let endpoint = Endpoint::client(addr)?;
+ let config = self.config.clone();
+ endpoint.set_default_client_config(config);
+
+ let connecting = endpoint.connect(addr,
&self.quic_config.server_name)?;
+ Box::pin(async move {
+ let connection = connecting.await?;
+ Ok(stream)
+ })
+ }
+}
diff --git a/core/sdk/src/connection/tcp/mod.rs
b/core/sdk/src/connection/tcp/mod.rs
new file mode 100644
index 00000000..ec1d2deb
--- /dev/null
+++ b/core/sdk/src/connection/tcp/mod.rs
@@ -0,0 +1,14 @@
+pub mod tcp;
+pub mod tls;
+
+use std::{io, net::SocketAddr, pin::Pin};
+use futures::{AsyncRead, AsyncWrite};
+
+pub trait AsyncStream: AsyncRead + AsyncWrite + Unpin + Send + 'static {}
+impl<T> AsyncStream for T where T: AsyncRead + AsyncWrite + Unpin + Send +
'static {}
+
+pub trait SocketFactory { // TODO rename to something common
+ type Stream: AsyncStream;
+
+ fn connect(&self, addr: SocketAddr) -> Pin<Box<dyn Future<Output =
io::Result<Self::Stream>> + Send>>;
+}
diff --git a/core/sdk/src/connection/tcp/tcp.rs
b/core/sdk/src/connection/tcp/tcp.rs
new file mode 100644
index 00000000..561a81d7
--- /dev/null
+++ b/core/sdk/src/connection/tcp/tcp.rs
@@ -0,0 +1,18 @@
+use crate::connection::tcp::SocketFactory;
+use tokio_util::compat::TokioAsyncReadCompatExt;
+
+pub type TokioCompatStream = tokio_util::compat::Compat<tokio::net::TcpStream>;
+
+pub struct TokioTcpFactory;
+
+impl SocketFactory for TokioTcpFactory {
+ type Stream = TokioCompatStream;
+
+ fn connect(&self, addr: std::net::SocketAddr) -> std::pin::Pin<Box<dyn
Future<Output = std::io::Result<Self::Stream>> + Send>> {
+ Box::pin(async move {
+ let stream = tokio::net::TcpStream::connect(addr).await?;
+ stream.set_nodelay(true)?;
+ Ok(stream.compat())
+ })
+ }
+}
diff --git a/core/sdk/src/connection/tcp/tls.rs
b/core/sdk/src/connection/tcp/tls.rs
new file mode 100644
index 00000000..624b09e2
--- /dev/null
+++ b/core/sdk/src/connection/tcp/tls.rs
@@ -0,0 +1,58 @@
+use std::{io, net::SocketAddr, pin::Pin};
+
+use rustls::pki_types::ServerName;
+use tokio_util::compat::{Compat, TokioAsyncReadCompatExt};
+
+use crate::connection::tcp::{AsyncStream, SocketFactory};
+
+pub trait TlsConnector<Plain: AsyncStream> {
+ type TlsStream: AsyncStream;
+
+ fn handshake(
+ &self,
+ plain: Plain,
+ domain: &str,
+ ) -> Pin<Box<dyn Future<Output = io::Result<Self::TlsStream>> + Send>>;
+}
+
+pub struct TokioRustls {
+ inner: tokio_rustls::TlsConnector,
+}
+impl TlsConnector<Compat<tokio::net::TcpStream>> for TokioRustls {
+ type TlsStream =
Compat<tokio_rustls::client::TlsStream<tokio::net::TcpStream>>;
+
+ fn handshake(
+ &self,
+ plain: Compat<tokio::net::TcpStream>,
+ domain: &str,
+ ) -> Pin<Box<dyn Future<Output = std::io::Result<Self::TlsStream>> +
Send>> {
+ let dom = ServerName::try_from(domain.to_owned()).unwrap();
+ let tcp = plain.into_inner();
+ let fut = self.inner.connect(dom, tcp);
+ Box::pin(async move { Ok(fut.await?.compat()) })
+ }
+}
+
+pub struct TlsFactory<F, C> {
+ base: F,
+ tls: C,
+ domain: String,
+}
+
+impl<F, C> SocketFactory for TlsFactory<F, C>
+where
+ F: SocketFactory,
+ C: TlsConnector<F::Stream> + Clone + Send + Sync + 'static,
+{
+ type Stream = C::TlsStream;
+
+ fn connect(&self, addr: SocketAddr) -> Pin<Box<dyn Future<Output =
io::Result<Self::Stream>> + Send>> {
+ let base = self.base.connect(addr);
+ let tls = self.tls.clone();
+ let dom = self.domain.clone();
+ Box::pin(async move {
+ let plain = base.await?;
+ tls.handshake(plain, &dom).await
+ })
+ }
+}
diff --git a/core/sdk/src/lib.rs b/core/sdk/src/lib.rs
index d1c5397d..87efe42c 100644
--- a/core/sdk/src/lib.rs
+++ b/core/sdk/src/lib.rs
@@ -27,3 +27,5 @@ pub mod quic;
pub mod stream_builder;
pub mod tcp;
pub mod proto;
+pub mod transport_factory;
+pub mod connection;
diff --git a/core/sdk/src/transport_factory/mod.rs
b/core/sdk/src/transport_factory/mod.rs
new file mode 100644
index 00000000..fc429407
--- /dev/null
+++ b/core/sdk/src/transport_factory/mod.rs
@@ -0,0 +1,5 @@
+use futures::{AsyncRead, AsyncWrite};
+
+pub trait AsyncStream: AsyncRead + AsyncWrite + Unpin + Send + 'static {}
+impl<T> AsyncStream for T where T: AsyncRead + AsyncWrite + Unpin + Send +
'static {}
+