This is an automated email from the ASF dual-hosted git repository.
bashirbekov pushed a commit to branch draft-sans
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/draft-sans by this push:
new d8b48b3f test
d8b48b3f is described below
commit d8b48b3f6137e71d7ce24f5038d4f4f9b93f1182
Author: haze518 <[email protected]>
AuthorDate: Sat Jul 12 19:09:07 2025 +0600
test
---
core/sdk/src/connection/mod.rs | 4 +-
core/sdk/src/connection/quic/mod.rs | 392 ++++++++++++++++++------------------
core/sdk/src/connection/tcp/mod.rs | 52 ++---
core/sdk/src/connection/tcp/tcp.rs | 16 +-
core/sdk/src/driver/mod.rs | 172 ++++++++--------
core/sdk/src/driver/tcp.rs | 23 ++-
6 files changed, 335 insertions(+), 324 deletions(-)
diff --git a/core/sdk/src/connection/mod.rs b/core/sdk/src/connection/mod.rs
index e028ad38..64fa975f 100644
--- a/core/sdk/src/connection/mod.rs
+++ b/core/sdk/src/connection/mod.rs
@@ -19,6 +19,6 @@ pub trait StreamConnectionFactory: ConnectionFactory {
}
pub trait StreamPair: Send {
- fn send_vectored<'a>(&'a mut self, bufs: &'a [IoSlice<'_>]) -> Pin<Box<dyn
Future<Output = Result<(), IggyError>> + Send + 'a>>;
- fn read_buf<'a>(&'a mut self, buf: &'a mut BytesMut) -> Pin<Box<dyn
Future<Output = Result<usize, IggyError>> + Send + 'a>>;
+ fn send_vectored<'a>(&'a self, bufs: &'a [IoSlice<'_>]) -> Pin<Box<dyn
Future<Output = Result<(), IggyError>> + Send + 'a>>;
+ fn read_buf<'a>(&'a self, buf: &'a mut BytesMut) -> Pin<Box<dyn
Future<Output = Result<usize, IggyError>> + Send + 'a>>;
}
diff --git a/core/sdk/src/connection/quic/mod.rs
b/core/sdk/src/connection/quic/mod.rs
index 0f58b4a1..3305654e 100644
--- a/core/sdk/src/connection/quic/mod.rs
+++ b/core/sdk/src/connection/quic/mod.rs
@@ -18,199 +18,199 @@ pub struct QuinnStreamPair {
recv: RecvStream,
}
-impl StreamPair for QuinnStreamPair {
- fn send_vectored<'a>(&'a mut self, bufs: &'a [IoSlice<'_>]) -> Pin<Box<dyn
Future<Output = Result<(), IggyError>> + Send + 'a>> {
- Box::pin(async move {
- self.send.write_vectored(bufs).await.map_err(|e| {
- error!("Failed to write vectored buffs to quic conn: {e}");
- IggyError::QuicError
- })?;
- self.send.finish().map_err(|e: quinn::ClosedStream| {
- error!("Failed to finish sending data: {e}");
- IggyError::QuicError
- })?;
- Ok(())
- })
- }
-
- fn read_buf<'a>(&'a mut self, mut buf: &'a mut BytesMut) -> Pin<Box<dyn
Future<Output = Result<usize, IggyError>> + Send + 'a>> {
- Box::pin(async move {
- self.recv.read_buf(&mut buf).await.map_err(|e| {
- error!("Failed to read chunk: {e}");
- IggyError::QuicError
- })
- })
- }
-}
-
-pub struct QuinnFactory {
- config: Arc<QuicClientConfig>,
- ep: Arc<Endpoint>,
- connection: Arc<sync::Mutex<Option<Connection>>>,
- server_address: SocketAddr,
-}
-
-impl ConnectionFactory for QuinnFactory {
- fn connect(&self) -> Pin<Box<dyn Future<Output = Result<(), IggyError>> +
Send + Sync>> {
- let ep = self.ep.clone();
- let sn = self.config.server_name.clone();
- let sa = self.server_address.clone();
- let conn = self.connection.clone();
- Box::pin(async move {
- let mut connection = conn.lock().await;
- let connecting = ep
- .connect(sa, &sn)
- .map_err(|_| IggyError::CannotEstablishConnection)?;
-
- let new_conn = connecting
- .await
- .map_err(|_| IggyError::CannotEstablishConnection)?;
- let _ = connection.insert(new_conn);
- Ok(())
- })
- }
-
- fn is_alive(&self) -> Pin<Box<dyn Future<Output = bool> + Send + Sync>> {
- let conn = self.connection.clone();
- Box::pin(async move {
- let conn = conn.lock().await;
- match conn.as_ref() {
- Some(c) => c.close_reason().is_some(),
- None => false,
- }
- })
- }
-
- fn shutdown(&self) -> Pin<Box<dyn Future<Output = Result<(), IggyError>> +
Send + Sync>> {
- let conn = self.connection.clone();
- let ep = self.ep.clone();
- Box::pin(async move {
- if let Some(conn) = conn.lock().await.take() {
- conn.close(0u32.into(), b"");
- }
- ep.wait_idle().await;
- Ok(())
- })
- }
-}
-
-impl StreamConnectionFactory for QuinnFactory {
- type Stream = QuinnStreamPair;
-
- fn open_stream(&self) -> Pin<Box<dyn Future<Output = Result<Self::Stream,
IggyError>> + Send + '_>> {
- let conn = self.connection.clone();
- Box::pin(async move {
- let conn = {
- let guard = conn.lock().await;
- guard.clone().ok_or(IggyError::NotConnected)?
- };
- let (send, recv) = conn.open_bi().await.map_err(|e| {
- error!("Failed to open a bidirectional stream: {e}");
- IggyError::QuicError
- })?;
- Ok(QuinnStreamPair { send, recv })
- })
- }
-}
-
-impl QuinnFactory {
- pub fn new(config: QuicClientConfig) -> Result<Self, IggyError> {
- let cfg = Arc::new(config);
-
- let server_address = cfg
- .server_address
- .parse::<SocketAddr>()
- .map_err(|error| {
- error!("Invalid server address: {error}");
- IggyError::InvalidServerAddress
- })?;
- let client_address = if server_address.is_ipv6()
- && cfg.client_address == QuicClientConfig::default().client_address
- {
- "[::1]:0"
- } else {
- &cfg.client_address
- }
- .parse::<SocketAddr>()
- .map_err(|error| {
- error!("Invalid client address: {error}");
- IggyError::InvalidClientAddress
- })?;
-
- let quic_config = configure(&cfg)?;
- let endpoint = Endpoint::client(client_address);
- if endpoint.is_err() {
- error!("Cannot create client endpoint");
- return Err(IggyError::CannotCreateEndpoint);
- }
-
- let mut endpoint = endpoint.unwrap();
- endpoint.set_default_client_config(quic_config);
-
- Ok(Self { config: cfg, ep: Arc::new(endpoint), server_address,
connection: Arc::new(sync::Mutex::new(None)) })
- }
-}
-
-fn configure(config: &QuicClientConfig) -> Result<ClientConfig, IggyError> {
- let max_concurrent_bidi_streams =
VarInt::try_from(config.max_concurrent_bidi_streams);
- if max_concurrent_bidi_streams.is_err() {
- error!(
- "Invalid 'max_concurrent_bidi_streams': {}",
- config.max_concurrent_bidi_streams
- );
- return Err(IggyError::InvalidConfiguration);
- }
-
- let receive_window = VarInt::try_from(config.receive_window);
- if receive_window.is_err() {
- error!("Invalid 'receive_window': {}", config.receive_window);
- return Err(IggyError::InvalidConfiguration);
- }
-
- let mut transport = quinn::TransportConfig::default();
- transport.initial_mtu(config.initial_mtu);
- transport.send_window(config.send_window);
- transport.receive_window(receive_window.unwrap());
- transport.datagram_send_buffer_size(config.datagram_send_buffer_size as
usize);
-
transport.max_concurrent_bidi_streams(max_concurrent_bidi_streams.unwrap());
- if config.keep_alive_interval > 0 {
-
transport.keep_alive_interval(Some(Duration::from_millis(config.keep_alive_interval)));
- }
- if config.max_idle_timeout > 0 {
- let max_idle_timeout =
-
IdleTimeout::try_from(Duration::from_millis(config.max_idle_timeout));
- if max_idle_timeout.is_err() {
- error!("Invalid 'max_idle_timeout': {}", config.max_idle_timeout);
- return Err(IggyError::InvalidConfiguration);
- }
- transport.max_idle_timeout(Some(max_idle_timeout.unwrap()));
- }
-
- if CryptoProvider::get_default().is_none() {
- if let Err(e) =
rustls::crypto::ring::default_provider().install_default() {
- warn!(
- "Failed to install rustls crypto provider. Error: {:?}. This
may be normal if another thread installed it first.",
- e
- );
- }
- }
- let mut client_config = match config.validate_certificate {
- true => ClientConfig::with_platform_verifier(),
- false => {
- match QuinnQuicClientConfig::try_from(
- rustls::ClientConfig::builder()
- .dangerous()
-
.with_custom_certificate_verifier(SkipServerVerification::new())
- .with_no_client_auth(),
- ) {
- Ok(config) => ClientConfig::new(Arc::new(config)),
- Err(error) => {
- error!("Failed to create QUIC client configuration:
{error}");
- return Err(IggyError::InvalidConfiguration);
- }
- }
- }
- };
- client_config.transport_config(Arc::new(transport));
- Ok(client_config)
-}
+// impl StreamPair for QuinnStreamPair {
+// fn send_vectored<'a>(&'a mut self, bufs: &'a [IoSlice<'_>]) ->
Pin<Box<dyn Future<Output = Result<(), IggyError>> + Send + 'a>> {
+// Box::pin(async move {
+// self.send.write_vectored(bufs).await.map_err(|e| {
+// error!("Failed to write vectored buffs to quic conn: {e}");
+// IggyError::QuicError
+// })?;
+// self.send.finish().map_err(|e: quinn::ClosedStream| {
+// error!("Failed to finish sending data: {e}");
+// IggyError::QuicError
+// })?;
+// Ok(())
+// })
+// }
+
+// fn read_buf<'a>(&'a mut self, mut buf: &'a mut BytesMut) -> Pin<Box<dyn
Future<Output = Result<usize, IggyError>> + Send + 'a>> {
+// Box::pin(async move {
+// self.recv.read_buf(&mut buf).await.map_err(|e| {
+// error!("Failed to read chunk: {e}");
+// IggyError::QuicError
+// })
+// })
+// }
+// }
+
+// pub struct QuinnFactory {
+// config: Arc<QuicClientConfig>,
+// ep: Arc<Endpoint>,
+// connection: Arc<sync::Mutex<Option<Connection>>>,
+// server_address: SocketAddr,
+// }
+
+// impl ConnectionFactory for QuinnFactory {
+// fn connect(&self) -> Pin<Box<dyn Future<Output = Result<(), IggyError>>
+ Send + Sync>> {
+// let ep = self.ep.clone();
+// let sn = self.config.server_name.clone();
+// let sa = self.server_address.clone();
+// let conn = self.connection.clone();
+// Box::pin(async move {
+// let mut connection = conn.lock().await;
+// let connecting = ep
+// .connect(sa, &sn)
+// .map_err(|_| IggyError::CannotEstablishConnection)?;
+
+// let new_conn = connecting
+// .await
+// .map_err(|_| IggyError::CannotEstablishConnection)?;
+// let _ = connection.insert(new_conn);
+// Ok(())
+// })
+// }
+
+// fn is_alive(&self) -> Pin<Box<dyn Future<Output = bool> + Send + Sync>>
{
+// let conn = self.connection.clone();
+// Box::pin(async move {
+// let conn = conn.lock().await;
+// match conn.as_ref() {
+// Some(c) => c.close_reason().is_some(),
+// None => false,
+// }
+// })
+// }
+
+// fn shutdown(&self) -> Pin<Box<dyn Future<Output = Result<(),
IggyError>> + Send + Sync>> {
+// let conn = self.connection.clone();
+// let ep = self.ep.clone();
+// Box::pin(async move {
+// if let Some(conn) = conn.lock().await.take() {
+// conn.close(0u32.into(), b"");
+// }
+// ep.wait_idle().await;
+// Ok(())
+// })
+// }
+// }
+
+// impl StreamConnectionFactory for QuinnFactory {
+// type Stream = QuinnStreamPair;
+
+// fn open_stream(&self) -> Pin<Box<dyn Future<Output =
Result<Self::Stream, IggyError>> + Send + '_>> {
+// let conn = self.connection.clone();
+// Box::pin(async move {
+// let conn = {
+// let guard = conn.lock().await;
+// guard.clone().ok_or(IggyError::NotConnected)?
+// };
+// let (send, recv) = conn.open_bi().await.map_err(|e| {
+// error!("Failed to open a bidirectional stream: {e}");
+// IggyError::QuicError
+// })?;
+// Ok(QuinnStreamPair { send, recv })
+// })
+// }
+// }
+
+// impl QuinnFactory {
+// pub fn new(config: QuicClientConfig) -> Result<Self, IggyError> {
+// let cfg = Arc::new(config);
+
+// let server_address = cfg
+// .server_address
+// .parse::<SocketAddr>()
+// .map_err(|error| {
+// error!("Invalid server address: {error}");
+// IggyError::InvalidServerAddress
+// })?;
+// let client_address = if server_address.is_ipv6()
+// && cfg.client_address ==
QuicClientConfig::default().client_address
+// {
+// "[::1]:0"
+// } else {
+// &cfg.client_address
+// }
+// .parse::<SocketAddr>()
+// .map_err(|error| {
+// error!("Invalid client address: {error}");
+// IggyError::InvalidClientAddress
+// })?;
+
+// let quic_config = configure(&cfg)?;
+// let endpoint = Endpoint::client(client_address);
+// if endpoint.is_err() {
+// error!("Cannot create client endpoint");
+// return Err(IggyError::CannotCreateEndpoint);
+// }
+
+// let mut endpoint = endpoint.unwrap();
+// endpoint.set_default_client_config(quic_config);
+
+// Ok(Self { config: cfg, ep: Arc::new(endpoint), server_address,
connection: Arc::new(sync::Mutex::new(None)) })
+// }
+// }
+
+// fn configure(config: &QuicClientConfig) -> Result<ClientConfig, IggyError> {
+// let max_concurrent_bidi_streams =
VarInt::try_from(config.max_concurrent_bidi_streams);
+// if max_concurrent_bidi_streams.is_err() {
+// error!(
+// "Invalid 'max_concurrent_bidi_streams': {}",
+// config.max_concurrent_bidi_streams
+// );
+// return Err(IggyError::InvalidConfiguration);
+// }
+
+// let receive_window = VarInt::try_from(config.receive_window);
+// if receive_window.is_err() {
+// error!("Invalid 'receive_window': {}", config.receive_window);
+// return Err(IggyError::InvalidConfiguration);
+// }
+
+// let mut transport = quinn::TransportConfig::default();
+// transport.initial_mtu(config.initial_mtu);
+// transport.send_window(config.send_window);
+// transport.receive_window(receive_window.unwrap());
+// transport.datagram_send_buffer_size(config.datagram_send_buffer_size as
usize);
+//
transport.max_concurrent_bidi_streams(max_concurrent_bidi_streams.unwrap());
+// if config.keep_alive_interval > 0 {
+//
transport.keep_alive_interval(Some(Duration::from_millis(config.keep_alive_interval)));
+// }
+// if config.max_idle_timeout > 0 {
+// let max_idle_timeout =
+//
IdleTimeout::try_from(Duration::from_millis(config.max_idle_timeout));
+// if max_idle_timeout.is_err() {
+// error!("Invalid 'max_idle_timeout': {}",
config.max_idle_timeout);
+// return Err(IggyError::InvalidConfiguration);
+// }
+// transport.max_idle_timeout(Some(max_idle_timeout.unwrap()));
+// }
+
+// if CryptoProvider::get_default().is_none() {
+// if let Err(e) =
rustls::crypto::ring::default_provider().install_default() {
+// warn!(
+// "Failed to install rustls crypto provider. Error: {:?}.
This may be normal if another thread installed it first.",
+// e
+// );
+// }
+// }
+// let mut client_config = match config.validate_certificate {
+// true => ClientConfig::with_platform_verifier(),
+// false => {
+// match QuinnQuicClientConfig::try_from(
+// rustls::ClientConfig::builder()
+// .dangerous()
+//
.with_custom_certificate_verifier(SkipServerVerification::new())
+// .with_no_client_auth(),
+// ) {
+// Ok(config) => ClientConfig::new(Arc::new(config)),
+// Err(error) => {
+// error!("Failed to create QUIC client configuration:
{error}");
+// return Err(IggyError::InvalidConfiguration);
+// }
+// }
+// }
+// };
+// client_config.transport_config(Arc::new(transport));
+// Ok(client_config)
+// }
diff --git a/core/sdk/src/connection/tcp/mod.rs
b/core/sdk/src/connection/tcp/mod.rs
index 51993511..3a6a15d8 100644
--- a/core/sdk/src/connection/tcp/mod.rs
+++ b/core/sdk/src/connection/tcp/mod.rs
@@ -11,7 +11,7 @@ use tokio::{
};
use iggy_common::IggyError;
-use crate::connection::StreamPair;
+use crate::{connection::StreamPair, proto::runtime::sync};
pub trait AsyncStream: AsyncRead + AsyncWrite + Unpin + Send + 'static {}
impl<T> AsyncStream for T where T: AsyncRead + AsyncWrite + Unpin + Send +
'static {}
@@ -24,46 +24,50 @@ pub trait SocketFactory {
#[derive(Debug)]
pub struct TokioTcpStream {
- reader: BufReader<OwnedReadHalf>,
- writer: BufWriter<OwnedWriteHalf>,
+ reader: sync::Mutex<BufReader<OwnedReadHalf>>,
+ writer: sync::Mutex<BufWriter<OwnedWriteHalf>>,
}
impl StreamPair for TokioTcpStream {
fn send_vectored<'a>(
- &'a mut self,
+ &'a self,
bufs: &'a [io::IoSlice<'_>],
) -> Pin<Box<dyn Future<Output = Result<(), iggy_common::IggyError>> +
Send + 'a>> {
Box::pin(async move {
- for val in bufs {
- self.writer.write(val).await.map_err(|e| {
- error!(
- "Failed to write data to the TCP connection: {e}",
- );
- IggyError::TcpError
- })?;
- }
- // self.writer.write_vectored(bufs).await.map_err(|e| {
+ let mut w = self.writer.lock().await;
+ w.write_vectored(bufs).await.map_err(|_| IggyError::TcpError)?;
+ w.flush().await.map_err(|_| IggyError::TcpError)?;
+ Ok(())
+ // for val in bufs {
+ // self.writer.write(val).await.map_err(|e| {
+ // error!(
+ // "Failed to write data to the TCP connection: {e}",
+ // );
+ // IggyError::TcpError
+ // })?;
+ // }
+ // // self.writer.write_vectored(bufs).await.map_err(|e| {
+ // // error!(
+ // // "Failed to write data to the TCP connection: {e}",
+ // // );
+ // // IggyError::TcpError
+ // // })?;
+ // self.writer.flush().await.map_err(|e| {
// error!(
// "Failed to write data to the TCP connection: {e}",
// );
// IggyError::TcpError
// })?;
- self.writer.flush().await.map_err(|e| {
- error!(
- "Failed to write data to the TCP connection: {e}",
- );
- IggyError::TcpError
- })?;
- Ok(())
+ // Ok(())
})
}
fn read_buf<'a>(
- &'a mut self,
+ &'a self,
buf: &'a mut BytesMut,
) -> Pin<Box<dyn Future<Output = Result<usize, iggy_common::IggyError>> +
Send + 'a>> {
Box::pin(async move {
- self.reader.read_buf(buf).await.map_err(|e| {
+ self.reader.lock().await.read_buf(buf).await.map_err(|e| {
error!(
"Failed to read data from the TCP connection: {e}",
);
@@ -77,8 +81,8 @@ impl TokioTcpStream {
fn new(stream: TcpStream) -> Self {
let (reader, writer) = stream.into_split();
Self {
- reader: BufReader::new(reader),
- writer: BufWriter::new(writer),
+ reader: sync::Mutex::new(BufReader::new(reader)),
+ writer: sync::Mutex::new(BufWriter::new(writer)),
}
}
}
diff --git a/core/sdk/src/connection/tcp/tcp.rs
b/core/sdk/src/connection/tcp/tcp.rs
index cf5757f4..70e4049b 100644
--- a/core/sdk/src/connection/tcp/tcp.rs
+++ b/core/sdk/src/connection/tcp/tcp.rs
@@ -15,7 +15,7 @@ pub type TokioCompatStream =
tokio_util::compat::Compat<tokio::net::TcpStream>;
pub struct TokioTcpFactory {
pub(crate) config: Arc<TcpClientConfig>,
client_address: Arc<sync::Mutex<Option<SocketAddr>>>,
- pub(crate) stream: Arc<sync::Mutex<Option<TokioTcpStream>>>,
+ pub(crate) stream: Arc<sync::Mutex<Option<Arc<TokioTcpStream>>>>,
}
impl ConnectionFactory for TokioTcpFactory {
@@ -50,7 +50,7 @@ impl ConnectionFactory for TokioTcpFactory {
error!("Failed to set the nodelay option on the client: {e}");
}
// TODO add tls
- let _ = tokio_tcp_stream.insert(TokioTcpStream::new(conn));
+ let _ =
tokio_tcp_stream.insert(Arc::new(TokioTcpStream::new(conn)));
Ok(())
})
@@ -58,21 +58,17 @@ impl ConnectionFactory for TokioTcpFactory {
// TODO пока заглушка, нужно подумать насчет того, как это делать
fn is_alive(&self) -> std::pin::Pin<Box<dyn Future<Output = bool> + Send +
Sync>> {
- let conn = self.stream.clone();
+ let slot = self.stream.clone();
Box::pin(async move {
- let conn = conn.lock().await;
- match conn.as_ref() {
- Some(c) => true,
- None => false,
- }
+ slot.lock().await.is_some()
})
}
fn shutdown(&self) -> std::pin::Pin<Box<dyn Future<Output = Result<(),
IggyError>> + Send + Sync>> {
let conn = self.stream.clone();
Box::pin(async move {
- if let Some(mut conn) = conn.lock().await.take() {
- conn.writer.shutdown().await.map_err(|e| {
+ if let Some(conn) = conn.lock().await.take() {
+ conn.writer.lock().await.shutdown().await.map_err(|e| {
error!(
"Failed to shutdown the TCP connection to the TCP
connection: {e}",
);
diff --git a/core/sdk/src/driver/mod.rs b/core/sdk/src/driver/mod.rs
index dd7c7dc6..d77ae183 100644
--- a/core/sdk/src/driver/mod.rs
+++ b/core/sdk/src/driver/mod.rs
@@ -10,7 +10,7 @@ use tracing::{error, info, trace, warn};
use crate::{
connection::{StreamConnectionFactory, StreamPair},
- connection::quic::QuinnFactory,
+ // connection::quic::QuinnFactory,
proto::{
connection::{IggyCore, InboundResult},
runtime::{Runtime, sync},
@@ -22,98 +22,98 @@ pub trait Driver {
fn register(&self, id: u64, tx: sync::OneShotSender<Bytes>);
}
-pub struct QuicDriver<R>
-where
- R: Runtime,
-{
- core: Arc<sync::Mutex<IggyCore>>,
- rt: Arc<R>,
- notify: Arc<sync::Notify>,
- factory: Arc<QuinnFactory>,
- pub(crate) config: Arc<QuicClientConfig>, // todo change to
driverQuicConfig
- pending: Arc<DashMap<u64, sync::OneShotSender<Bytes>>>,
-}
+// pub struct QuicDriver<R>
+// where
+// R: Runtime,
+// {
+// core: Arc<sync::Mutex<IggyCore>>,
+// rt: Arc<R>,
+// notify: Arc<sync::Notify>,
+// factory: Arc<QuinnFactory>,
+// pub(crate) config: Arc<QuicClientConfig>, // todo change to
driverQuicConfig
+// pending: Arc<DashMap<u64, sync::OneShotSender<Bytes>>>,
+// }
-// TODO add errChan
-impl<R> Driver for QuicDriver<R>
-where
- R: Runtime,
-{
- fn start(&self) {
- let rt = self.rt.clone();
- let nt = self.notify.clone();
- let core = self.core.clone();
- let q = self.factory.clone();
- let cfg = self.config.clone();
- let pending = self.pending.clone();
- rt.spawn(Box::pin(async move {
- let mut rx_buf = BytesMut::new();
- loop {
- nt.notified().await;
+// // TODO add errChan
+// impl<R> Driver for QuicDriver<R>
+// where
+// R: Runtime,
+// {
+// fn start(&self) {
+// let rt = self.rt.clone();
+// let nt = self.notify.clone();
+// let core = self.core.clone();
+// let q = self.factory.clone();
+// let cfg = self.config.clone();
+// let pending = self.pending.clone();
+// rt.spawn(Box::pin(async move {
+// let mut rx_buf = BytesMut::new();
+// loop {
+// nt.notified().await;
- while let Some(data) = {
- let mut guard = core.lock().await;
- guard.poll_transmit()
- } {
- if !pending.contains_key(&data.id) {
- error!("Failed to get transport adapter id");
- continue;
- }
+// while let Some(data) = {
+// let mut guard = core.lock().await;
+// guard.poll_transmit()
+// } {
+// if !pending.contains_key(&data.id) {
+// error!("Failed to get transport adapter id");
+// continue;
+// }
- let mut stream = match q.open_stream().await {
- Ok(s) => s,
- Err(e) => {
- error!("Failed to open a bidirectional stream:
{e}");
- continue;
- }
- };
+// let mut stream = match q.open_stream().await {
+// Ok(s) => s,
+// Err(e) => {
+// error!("Failed to open a bidirectional stream:
{e}");
+// continue;
+// }
+// };
- if let Err(e) =
stream.send_vectored(&data.as_slices()).await {
- error!("Failed to send vectored: {e}");
- break;
- }
+// if let Err(e) =
stream.send_vectored(&data.as_slices()).await {
+// error!("Failed to send vectored: {e}");
+// break;
+// }
- let mut at_most = cfg.response_buffer_size as usize;
- loop {
- rx_buf.reserve(at_most);
+// let mut at_most = cfg.response_buffer_size as usize;
+// loop {
+// rx_buf.reserve(at_most);
- match stream.read_buf(&mut rx_buf).await {
- Ok(0) => { error!("EOF before header/body");
break }
- Ok(n) => n,
- Err(e) => { error!("read_buf failed: {e}");
break }
- };
+// match stream.read_buf(&mut rx_buf).await {
+// Ok(0) => { error!("EOF before header/body");
break }
+// Ok(n) => n,
+// Err(e) => { error!("read_buf failed: {e}");
break }
+// };
- let buf = Cursor::new(&rx_buf[..]);
+// let buf = Cursor::new(&rx_buf[..]);
- let inbound = {
- let mut guard = core.lock().await;
- guard.feed_inbound(buf)
- };
+// let inbound = {
+// let mut guard = core.lock().await;
+// guard.feed_inbound(buf)
+// };
- match inbound {
- InboundResult::Need(need) => at_most = need,
- InboundResult::Ready(position) => {
- let frame = rx_buf.split_to(position).freeze();
- if let Some((_k, tx)) =
pending.remove(&data.id) {
- let _ = tx.send(frame);
- }
- core.lock().await.mark_tx_done();
- at_most = cfg.response_buffer_size as usize;
- continue;
- }
- InboundResult::Error(_) => {
- let mut guard = core.lock().await;
- guard.mark_tx_done();
- break;
- }
- }
- }
- }
- }
- }));
- }
+// match inbound {
+// InboundResult::Need(need) => at_most = need,
+// InboundResult::Ready(position) => {
+// let frame =
rx_buf.split_to(position).freeze();
+// if let Some((_k, tx)) =
pending.remove(&data.id) {
+// let _ = tx.send(frame);
+// }
+// core.lock().await.mark_tx_done();
+// at_most = cfg.response_buffer_size as usize;
+// continue;
+// }
+// InboundResult::Error(_) => {
+// let mut guard = core.lock().await;
+// guard.mark_tx_done();
+// break;
+// }
+// }
+// }
+// }
+// }
+// }));
+// }
- fn register(&self, id: u64, tx: sync::OneShotSender<Bytes>) {
- self.pending.insert(id, tx);
- }
-}
+// fn register(&self, id: u64, tx: sync::OneShotSender<Bytes>) {
+// self.pending.insert(id, tx);
+// }
+// }
diff --git a/core/sdk/src/driver/tcp.rs b/core/sdk/src/driver/tcp.rs
index 2bfabe29..0b5a5a2b 100644
--- a/core/sdk/src/driver/tcp.rs
+++ b/core/sdk/src/driver/tcp.rs
@@ -59,8 +59,10 @@ where
continue;
}
- let mut guard = factory.stream.lock().await;
- let stream = guard.as_mut().unwrap();
+ let stream = match factory.stream.lock().await.clone() {
+ Some(s) => s,
+ None => { error!("Not connected"); break }
+ };
if let Err(e) =
stream.send_vectored(&data.as_slices()).await {
error!("Failed to send vectored: {e}");
@@ -73,9 +75,17 @@ where
rx_buf.reserve(at_most);
match stream.read_buf(&mut rx_buf).await {
- Ok(0) => { error!("EOF before header/body");
break }
+ Ok(0) => {
+ error!("EOF before header/body");
+ panic!("EOF before header/body");
+ break
+ }
Ok(n) => n,
- Err(e) => { error!("read_buf failed: {e}");
break }
+ Err(e) => {
+ error!("read_buf failed: {e}");
+ panic!("read_buf failed");
+ break
+ }
};
let buf = Cursor::new(&rx_buf[..]);
@@ -92,13 +102,14 @@ where
if let Some((_k, tx)) =
pending.remove(&data.id) {
let _ = tx.send(frame);
}
- core.lock().await.mark_tx_done();
+ core.try_lock().unwrap().mark_tx_done();
at_most = init_len;
- continue;
+ break;
}
InboundResult::Error(_) => {
pending.remove(&data.id);
core.lock().await.mark_tx_done();
+ panic!("read_buf failed");
break;
}
}