This is an automated email from the ASF dual-hosted git repository.
bashirbekov pushed a commit to branch draft-sans
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/draft-sans by this push:
new 178f9c19 del
178f9c19 is described below
commit 178f9c19cf4aa716c2e3f8a74921dd0b850efe42
Author: haze518 <[email protected]>
AuthorDate: Sat Jul 5 07:34:24 2025 +0600
del
---
core/sdk/src/connection/quic/mod.rs | 146 +++++++++++++++++++++++++++++----
core/sdk/src/connection/tcp/mod.rs | 4 +-
core/sdk/src/connection/tcp/tcp.rs | 18 +++-
core/sdk/src/connection/tcp/tls.rs | 102 +++++++++++------------
core/sdk/src/lib.rs | 1 +
core/sdk/src/proto/mod.rs | 18 ----
core/sdk/src/proto/tcp_adapter.rs | 31 -------
core/sdk/src/quic/mod.rs | 2 +-
core/sdk/src/transport_adapter/mod.rs | 10 +++
core/sdk/src/transport_adapter/quic.rs | 42 ++++++++++
10 files changed, 252 insertions(+), 122 deletions(-)
diff --git a/core/sdk/src/connection/quic/mod.rs
b/core/sdk/src/connection/quic/mod.rs
index 804f0b71..c5d9feed 100644
--- a/core/sdk/src/connection/quic/mod.rs
+++ b/core/sdk/src/connection/quic/mod.rs
@@ -1,27 +1,141 @@
use std::sync::Arc;
+use std::{io, net::SocketAddr, pin::Pin, time::Duration};
+use iggy_common::{IggyError, QuicClientConfig};
+use rustls::crypto::CryptoProvider;
+use tracing::{error, warn};
+use crate::quic::skip_server_verification::SkipServerVerification;
-use iggy_common::QuicClientConfig;
-use quinn::{ClientConfig, Endpoint};
+use quinn::crypto::rustls::QuicClientConfig as QuinnQuicClientConfig;
+use quinn::{ClientConfig, Connection, Endpoint, IdleTimeout, VarInt};
-use crate::connection::SocketFactory;
-
-pub struct QuicFactory {
- config: ClientConfig,
- quic_config: Arc<QuicClientConfig>, // rename to QuicFabricConfig
+pub trait QuicFactory {
+ type Conn;
+ fn connect(&self) -> Pin<Box<dyn Future<Output = Result<Self::Conn,
IggyError>> + Send>>;
}
-impl SocketFactory for QuicFactory {
- type Stream = Arc<quinn::Connection>;
+pub struct QuinnFactory {
+ config: Arc<QuicClientConfig>,
+ ep: Arc<Endpoint>,
+ server_address: SocketAddr,
+}
- fn connect(&self, addr: std::net::SocketAddr) -> std::pin::Pin<Box<dyn
Future<Output = std::io::Result<Self::Stream>> + Send>> {
- let endpoint = Endpoint::client(addr)?;
- let config = self.config.clone();
- endpoint.set_default_client_config(config);
+impl QuicFactory for QuinnFactory {
+ type Conn = Connection;
- let connecting = endpoint.connect(addr,
&self.quic_config.server_name)?;
+ fn connect(&self) -> Pin<Box<dyn Future<Output = Result<Self::Conn,
IggyError>> + Send>> {
+ let ep = self.ep.clone();
+ let sn = self.config.server_name.clone();
+ let sa = self.server_address.clone();
Box::pin(async move {
- let connection = connecting.await?;
- Ok(stream)
+ let connecting = ep
+ .connect(sa, &sn)
+ .map_err(|_| IggyError::CannotEstablishConnection)?;
+
+ connecting
+ .await
+ .map_err(|_| IggyError::CannotEstablishConnection)
})
}
}
+
+impl QuinnFactory {
+ pub fn new(config: QuicClientConfig) -> Result<Self, IggyError> {
+ let cfg = Arc::new(config);
+
+ let server_address = cfg
+ .server_address
+ .parse::<SocketAddr>()
+ .map_err(|error| {
+ error!("Invalid server address: {error}");
+ IggyError::InvalidServerAddress
+ })?;
+ let client_address = if server_address.is_ipv6()
+ && cfg.client_address == QuicClientConfig::default().client_address
+ {
+ "[::1]:0"
+ } else {
+ &cfg.client_address
+ }
+ .parse::<SocketAddr>()
+ .map_err(|error| {
+ error!("Invalid client address: {error}");
+ IggyError::InvalidClientAddress
+ })?;
+
+ let quic_config = configure(&cfg)?;
+ let endpoint = Endpoint::client(client_address);
+ if endpoint.is_err() {
+ error!("Cannot create client endpoint");
+ return Err(IggyError::CannotCreateEndpoint);
+ }
+
+ let mut endpoint = endpoint.unwrap();
+ endpoint.set_default_client_config(quic_config);
+
+ Ok(Self { config: cfg, ep: Arc::new(endpoint), server_address })
+ }
+}
+
+fn configure(config: &QuicClientConfig) -> Result<ClientConfig, IggyError> {
+ let max_concurrent_bidi_streams =
VarInt::try_from(config.max_concurrent_bidi_streams);
+ if max_concurrent_bidi_streams.is_err() {
+ error!(
+ "Invalid 'max_concurrent_bidi_streams': {}",
+ config.max_concurrent_bidi_streams
+ );
+ return Err(IggyError::InvalidConfiguration);
+ }
+
+ let receive_window = VarInt::try_from(config.receive_window);
+ if receive_window.is_err() {
+ error!("Invalid 'receive_window': {}", config.receive_window);
+ return Err(IggyError::InvalidConfiguration);
+ }
+
+ let mut transport = quinn::TransportConfig::default();
+ transport.initial_mtu(config.initial_mtu);
+ transport.send_window(config.send_window);
+ transport.receive_window(receive_window.unwrap());
+ transport.datagram_send_buffer_size(config.datagram_send_buffer_size as
usize);
+
transport.max_concurrent_bidi_streams(max_concurrent_bidi_streams.unwrap());
+ if config.keep_alive_interval > 0 {
+
transport.keep_alive_interval(Some(Duration::from_millis(config.keep_alive_interval)));
+ }
+ if config.max_idle_timeout > 0 {
+ let max_idle_timeout =
+
IdleTimeout::try_from(Duration::from_millis(config.max_idle_timeout));
+ if max_idle_timeout.is_err() {
+ error!("Invalid 'max_idle_timeout': {}", config.max_idle_timeout);
+ return Err(IggyError::InvalidConfiguration);
+ }
+ transport.max_idle_timeout(Some(max_idle_timeout.unwrap()));
+ }
+
+ if CryptoProvider::get_default().is_none() {
+ if let Err(e) =
rustls::crypto::ring::default_provider().install_default() {
+ warn!(
+ "Failed to install rustls crypto provider. Error: {:?}. This
may be normal if another thread installed it first.",
+ e
+ );
+ }
+ }
+ let mut client_config = match config.validate_certificate {
+ true => ClientConfig::with_platform_verifier(),
+ false => {
+ match QuinnQuicClientConfig::try_from(
+ rustls::ClientConfig::builder()
+ .dangerous()
+
.with_custom_certificate_verifier(SkipServerVerification::new())
+ .with_no_client_auth(),
+ ) {
+ Ok(config) => ClientConfig::new(Arc::new(config)),
+ Err(error) => {
+ error!("Failed to create QUIC client configuration:
{error}");
+ return Err(IggyError::InvalidConfiguration);
+ }
+ }
+ }
+ };
+ client_config.transport_config(Arc::new(transport));
+ Ok(client_config)
+}
diff --git a/core/sdk/src/connection/tcp/mod.rs
b/core/sdk/src/connection/tcp/mod.rs
index ec1d2deb..3d133a45 100644
--- a/core/sdk/src/connection/tcp/mod.rs
+++ b/core/sdk/src/connection/tcp/mod.rs
@@ -7,8 +7,8 @@ use futures::{AsyncRead, AsyncWrite};
pub trait AsyncStream: AsyncRead + AsyncWrite + Unpin + Send + 'static {}
impl<T> AsyncStream for T where T: AsyncRead + AsyncWrite + Unpin + Send +
'static {}
-pub trait SocketFactory { // TODO rename to something common
+pub trait SocketFactory {
type Stream: AsyncStream;
- fn connect(&self, addr: SocketAddr) -> Pin<Box<dyn Future<Output =
io::Result<Self::Stream>> + Send>>;
+ fn connect(&self) -> Pin<Box<dyn Future<Output = io::Result<Self::Stream>>
+ Send>>;
}
diff --git a/core/sdk/src/connection/tcp/tcp.rs
b/core/sdk/src/connection/tcp/tcp.rs
index 561a81d7..d511867a 100644
--- a/core/sdk/src/connection/tcp/tcp.rs
+++ b/core/sdk/src/connection/tcp/tcp.rs
@@ -1,18 +1,30 @@
+use std::sync::Arc;
+
use crate::connection::tcp::SocketFactory;
+use iggy_common::TcpClientConfig;
use tokio_util::compat::TokioAsyncReadCompatExt;
pub type TokioCompatStream = tokio_util::compat::Compat<tokio::net::TcpStream>;
-pub struct TokioTcpFactory;
+pub struct TokioTcpFactory {
+ config: Arc<TcpClientConfig>,
+}
impl SocketFactory for TokioTcpFactory {
type Stream = TokioCompatStream;
- fn connect(&self, addr: std::net::SocketAddr) -> std::pin::Pin<Box<dyn
Future<Output = std::io::Result<Self::Stream>> + Send>> {
+ fn connect(&self) -> std::pin::Pin<Box<dyn Future<Output =
std::io::Result<Self::Stream>> + Send>> {
+ let sa = self.config.server_address.clone();
Box::pin(async move {
- let stream = tokio::net::TcpStream::connect(addr).await?;
+ let stream = tokio::net::TcpStream::connect(sa).await?;
stream.set_nodelay(true)?;
Ok(stream.compat())
})
}
}
+
+impl TokioTcpFactory {
+ pub fn new(config: TcpClientConfig) -> Self {
+ Self { config: Arc::new(config) }
+ }
+}
diff --git a/core/sdk/src/connection/tcp/tls.rs
b/core/sdk/src/connection/tcp/tls.rs
index 624b09e2..a6949067 100644
--- a/core/sdk/src/connection/tcp/tls.rs
+++ b/core/sdk/src/connection/tcp/tls.rs
@@ -5,54 +5,54 @@ use tokio_util::compat::{Compat, TokioAsyncReadCompatExt};
use crate::connection::tcp::{AsyncStream, SocketFactory};
-pub trait TlsConnector<Plain: AsyncStream> {
- type TlsStream: AsyncStream;
-
- fn handshake(
- &self,
- plain: Plain,
- domain: &str,
- ) -> Pin<Box<dyn Future<Output = io::Result<Self::TlsStream>> + Send>>;
-}
-
-pub struct TokioRustls {
- inner: tokio_rustls::TlsConnector,
-}
-impl TlsConnector<Compat<tokio::net::TcpStream>> for TokioRustls {
- type TlsStream =
Compat<tokio_rustls::client::TlsStream<tokio::net::TcpStream>>;
-
- fn handshake(
- &self,
- plain: Compat<tokio::net::TcpStream>,
- domain: &str,
- ) -> Pin<Box<dyn Future<Output = std::io::Result<Self::TlsStream>> +
Send>> {
- let dom = ServerName::try_from(domain.to_owned()).unwrap();
- let tcp = plain.into_inner();
- let fut = self.inner.connect(dom, tcp);
- Box::pin(async move { Ok(fut.await?.compat()) })
- }
-}
-
-pub struct TlsFactory<F, C> {
- base: F,
- tls: C,
- domain: String,
-}
-
-impl<F, C> SocketFactory for TlsFactory<F, C>
-where
- F: SocketFactory,
- C: TlsConnector<F::Stream> + Clone + Send + Sync + 'static,
-{
- type Stream = C::TlsStream;
-
- fn connect(&self, addr: SocketAddr) -> Pin<Box<dyn Future<Output =
io::Result<Self::Stream>> + Send>> {
- let base = self.base.connect(addr);
- let tls = self.tls.clone();
- let dom = self.domain.clone();
- Box::pin(async move {
- let plain = base.await?;
- tls.handshake(plain, &dom).await
- })
- }
-}
+// pub trait TlsConnector<Plain: AsyncStream> {
+// type TlsStream: AsyncStream;
+
+// fn handshake(
+// &self,
+// plain: Plain,
+// domain: &str,
+// ) -> Pin<Box<dyn Future<Output = io::Result<Self::TlsStream>> + Send>>;
+// }
+
+// pub struct TokioRustls {
+// inner: tokio_rustls::TlsConnector,
+// }
+// impl TlsConnector<Compat<tokio::net::TcpStream>> for TokioRustls {
+// type TlsStream =
Compat<tokio_rustls::client::TlsStream<tokio::net::TcpStream>>;
+
+// fn handshake(
+// &self,
+// plain: Compat<tokio::net::TcpStream>,
+// domain: &str,
+// ) -> Pin<Box<dyn Future<Output = std::io::Result<Self::TlsStream>> +
Send>> {
+// let dom = ServerName::try_from(domain.to_owned()).unwrap();
+// let tcp = plain.into_inner();
+// let fut = self.inner.connect(dom, tcp);
+// Box::pin(async move { Ok(fut.await?.compat()) })
+// }
+// }
+
+// pub struct TlsFactory<F, C> {
+// base: F,
+// tls: C,
+// domain: String,
+// }
+
+// impl<F, C> SocketFactory for TlsFactory<F, C>
+// where
+// F: SocketFactory,
+// C: TlsConnector<F::Stream> + Clone + Send + Sync + 'static,
+// {
+// type Stream = C::TlsStream;
+
+// fn connect(&self) -> Pin<Box<dyn Future<Output =
io::Result<Self::Stream>> + Send>> {
+// let base = self.base.connect(addr);
+// let tls = self.tls.clone();
+// let dom = self.domain.clone();
+// Box::pin(async move {
+// let plain = base.await?;
+// tls.handshake(plain, &dom).await
+// })
+// }
+// }
diff --git a/core/sdk/src/lib.rs b/core/sdk/src/lib.rs
index 87efe42c..e27a3554 100644
--- a/core/sdk/src/lib.rs
+++ b/core/sdk/src/lib.rs
@@ -29,3 +29,4 @@ pub mod tcp;
pub mod proto;
pub mod transport_factory;
pub mod connection;
+pub mod transport_adapter;
diff --git a/core/sdk/src/proto/mod.rs b/core/sdk/src/proto/mod.rs
index 478f6fac..62d0aca6 100644
--- a/core/sdk/src/proto/mod.rs
+++ b/core/sdk/src/proto/mod.rs
@@ -1,20 +1,2 @@
pub mod connection;
-pub mod tcp_adapter;
pub mod runtime;
-
-use std::collections::VecDeque;
-
-use bytes::Bytes;
-use iggy_common::ClientState;
-
-pub struct IggyCore {
- buffer: VecDeque<Bytes>,
- current_state: ClientState,
-}
-
-impl IggyCore {
- pub fn write(&mut self, payload: Bytes) {
- self.buffer.push_back(payload)
- }
-
-}
diff --git a/core/sdk/src/proto/tcp_adapter.rs
b/core/sdk/src/proto/tcp_adapter.rs
deleted file mode 100644
index 41edd05d..00000000
--- a/core/sdk/src/proto/tcp_adapter.rs
+++ /dev/null
@@ -1,31 +0,0 @@
-use std::{sync::Mutex, task::Context};
-
-use bytes::Bytes;
-use futures::future::poll_fn;
-use iggy_common::{Command, IggyError};
-
-use crate::proto::{connection::IggyCore, runtime::{Lockable, Runtime}};
-use crate::tcp::tcp_connection_stream_kind::ConnectionStreamKind;
-
-pub struct TCPAdapter<R: Runtime> {
- rt: R,
- core: Mutex<IggyCore>,
- pub(crate) stream: R::Mutex<Option<ConnectionStreamKind>>,
-}
-
-impl<R: Runtime> TCPAdapter<R> {
- pub async fn send_with_response(&self, command: &impl Command) ->
Result<Bytes, IggyError> {
- // poll_fn(|cx| )
- Ok(Bytes::new())
- }
-
- fn write(&self, command: &impl Command) -> Result<(), IggyError> {
- command.validate()?;
- self.core.lock().unwrap().write(command)
- }
-
- fn execute_poll(&mut self, cx: &mut Context, command: &impl Command) ->
Result<Bytes, IggyError> {
- self.write(command)?;
-
- }
-}
diff --git a/core/sdk/src/quic/mod.rs b/core/sdk/src/quic/mod.rs
index 0ffce248..c67398e0 100644
--- a/core/sdk/src/quic/mod.rs
+++ b/core/sdk/src/quic/mod.rs
@@ -17,4 +17,4 @@
*/
pub mod quick_client;
-mod skip_server_verification;
+pub mod skip_server_verification;
diff --git a/core/sdk/src/transport_adapter/mod.rs
b/core/sdk/src/transport_adapter/mod.rs
new file mode 100644
index 00000000..ee44fefc
--- /dev/null
+++ b/core/sdk/src/transport_adapter/mod.rs
@@ -0,0 +1,10 @@
+pub mod quic;
+
+use std::pin::Pin;
+
+use bytes::Bytes;
+use iggy_common::{Command, IggyError};
+
+pub trait Driver {
+ fn send_with_response<'a, T: Command>(&'a self, command: &'a T) ->
Pin<Box<dyn Future<Output = Result<Bytes, IggyError>> + Send + 'a>>;
+}
diff --git a/core/sdk/src/transport_adapter/quic.rs
b/core/sdk/src/transport_adapter/quic.rs
new file mode 100644
index 00000000..574cfaec
--- /dev/null
+++ b/core/sdk/src/transport_adapter/quic.rs
@@ -0,0 +1,42 @@
+use std::{pin::Pin, sync::{Arc, Mutex}};
+
+use bytes::Bytes;
+
+use crate::{connection::quic::QuicFactory, proto::{connection::IggyCore,
runtime::{Lockable, Runtime}}, transport_adapter::Driver};
+
+pub struct QuicAdapter<F: QuicFactory, R: Runtime> {
+ factory: Arc<F>,
+ rt: Arc<R>,
+ core: R::Mutex<IggyCore>,
+}
+
+impl<F: QuicFactory + Send + Sync + 'static, R: Runtime> Driver for
QuicAdapter<F, R> {
+ fn send_with_response<'a, T: iggy_common::Command>(&'a self, command: &'a
T) -> Pin<Box<dyn Future<Output = Result<bytes::Bytes, iggy_common::IggyError>>
+ Send + 'a>> {
+ Box::pin(async move {
+ self.core.lock().await.write(command)?;
+
+ loop {
+ // опрашиваем core на момент того, не записала ли таска
драйвера туда новые данные.
+ // То есть вызываем poll в цикле(или wake). При этом чтобы
данные не перепутались мы
+ // можем сохранять serial id здесь при write, так и драйвер
будет знать id, и тут тоже мы будем понимать что ждать.
+ // То есть тогда драйвер заисывает payload: Bytes + id: uint64
+ }
+
+ Ok(Bytes::new())
+ })
+ }
+}
+
+
+
+/*
+// tood для transprot
+ fn send_with_response<'a, T: iggy_common::Command>(&'a self, command: &'a
T) -> Pin<Box<dyn Future<Output = Result<bytes::Bytes, iggy_common::IggyError>>
+ Send + 'a>> {
+ Box::pin(async move {
+ self.core.lock().await.write(command)?;
+
+ Ok(Bytes::new())
+ })
+ }
+
+*/
\ No newline at end of file