This is an automated email from the ASF dual-hosted git repository.
bashirbekov pushed a commit to branch draft-sans
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/draft-sans by this push:
new e8510799 del
e8510799 is described below
commit e85107995bf1341342b7f02ea45272b53b44bdf8
Author: haze518 <[email protected]>
AuthorDate: Mon Jul 7 11:55:30 2025 +0600
del
---
core/common/src/types/command/mod.rs | 4 +-
core/sdk/src/connection/quic/mod.rs | 76 +++++++++++++++++++++++++++++----
core/sdk/src/driver/mod.rs | 74 +++++++++++++++++++++-----------
core/sdk/src/proto/connection.rs | 37 +++++++++++-----
core/sdk/src/proto/runtime.rs | 3 +-
core/sdk/src/quic/quick_client.rs | 2 +-
core/sdk/src/transport_adapter/async.rs | 76 ++++++++++++++++++++++-----------
7 files changed, 197 insertions(+), 75 deletions(-)
diff --git a/core/common/src/types/command/mod.rs
b/core/common/src/types/command/mod.rs
index 4c576381..380273da 100644
--- a/core/common/src/types/command/mod.rs
+++ b/core/common/src/types/command/mod.rs
@@ -19,9 +19,9 @@
use crate::BytesSerializable;
use crate::Validatable;
use crate::error::IggyError;
-use std::fmt::Display;
+use std::fmt::{Display, Debug};
-pub trait Command: BytesSerializable + Validatable<IggyError> + Send + Sync +
Display {
+pub trait Command: BytesSerializable + Validatable<IggyError> + Send + Sync +
Display + Debug {
fn code(&self) -> u32;
}
diff --git a/core/sdk/src/connection/quic/mod.rs
b/core/sdk/src/connection/quic/mod.rs
index c5d9feed..c120f153 100644
--- a/core/sdk/src/connection/quic/mod.rs
+++ b/core/sdk/src/connection/quic/mod.rs
@@ -1,39 +1,99 @@
+use std::io::IoSlice;
use std::sync::Arc;
use std::{io, net::SocketAddr, pin::Pin, time::Duration};
+use bytes::Bytes;
use iggy_common::{IggyError, QuicClientConfig};
use rustls::crypto::CryptoProvider;
+use tokio::io::AsyncWriteExt;
use tracing::{error, warn};
+use crate::proto::runtime::sync;
use crate::quic::skip_server_verification::SkipServerVerification;
use quinn::crypto::rustls::QuicClientConfig as QuinnQuicClientConfig;
-use quinn::{ClientConfig, Connection, Endpoint, IdleTimeout, VarInt};
+use quinn::{ClientConfig, Connection, Endpoint, IdleTimeout, RecvStream,
SendStream, VarInt};
+
+pub trait StreamPair: Send {
+ fn send_vectored<'a>(&'a mut self, bufs: &'a [IoSlice<'_>]) -> Pin<Box<dyn
Future<Output = Result<(), IggyError>> + Send + 'a>>;
+ fn read_chunk<'a>(&'a mut self, at_most: usize) -> Pin<Box<dyn
Future<Output = Result<Option<Bytes>, IggyError>> + Send + 'a>>;
+}
pub trait QuicFactory {
- type Conn;
- fn connect(&self) -> Pin<Box<dyn Future<Output = Result<Self::Conn,
IggyError>> + Send>>;
+ type Stream: StreamPair;
+
+ fn connect(&self) -> Pin<Box<dyn Future<Output = Result<(), IggyError>> +
Send>>;
+ fn open_stream(&self) -> Pin<Box<dyn Future<Output = Result<Self::Stream,
IggyError>> + Send + '_>>;
+}
+
+pub struct QuinnStreamPair {
+ send: SendStream,
+ recv: RecvStream,
+}
+
+impl StreamPair for QuinnStreamPair {
+ fn send_vectored<'a>(&'a mut self, bufs: &'a [IoSlice<'_>]) -> Pin<Box<dyn
Future<Output = Result<(), IggyError>> + Send + 'a>> {
+ Box::pin(async move {
+ self.send.write_vectored(bufs).await.map_err(|e| {
+ error!("Failed to write vectored buffs to quic conn: {e}");
+ IggyError::QuicError
+ })?;
+ self.send.finish();
+ Ok(())
+ })
+ }
+
+ fn read_chunk<'a>(&'a mut self, at_most: usize) -> Pin<Box<dyn
Future<Output = Result<Option<Bytes>, IggyError>> + Send + 'a>> {
+ Box::pin(async move {
+ let res = self.recv.read_chunk(at_most, true).await.map_err(|e| {
+ error!("Failed to read chunk: {e}");
+ IggyError::QuicError
+ })?;
+ if let Some(data) = res {
+ return Ok(Some(data.bytes));
+ }
+ Ok(None)
+ })
+ }
}
pub struct QuinnFactory {
config: Arc<QuicClientConfig>,
ep: Arc<Endpoint>,
+ connection: Arc<sync::Mutex<Option<Connection>>>,
server_address: SocketAddr,
}
impl QuicFactory for QuinnFactory {
- type Conn = Connection;
+ type Stream = QuinnStreamPair;
- fn connect(&self) -> Pin<Box<dyn Future<Output = Result<Self::Conn,
IggyError>> + Send>> {
+ fn connect(&self) -> Pin<Box<dyn Future<Output = Result<(), IggyError>> +
Send>> {
let ep = self.ep.clone();
let sn = self.config.server_name.clone();
let sa = self.server_address.clone();
+ let conn = self.connection.clone();
Box::pin(async move {
+ let mut connection = conn.lock().await;
let connecting = ep
.connect(sa, &sn)
.map_err(|_| IggyError::CannotEstablishConnection)?;
- connecting
+ let new_conn = connecting
.await
- .map_err(|_| IggyError::CannotEstablishConnection)
+ .map_err(|_| IggyError::CannotEstablishConnection)?;
+ let _ = connection.insert(new_conn);
+ Ok(())
+ })
+ }
+
+ fn open_stream(&self) -> Pin<Box<dyn Future<Output = Result<Self::Stream,
IggyError>> + Send + '_>> {
+ let conn = self.connection.clone();
+ Box::pin(async move {
+ let guard = conn.lock().await;
+ let conn_ref = guard.as_ref().ok_or(IggyError::NotConnected)?;
+ let (send, recv) = conn_ref.open_bi().await.map_err(|e| {
+ error!("Failed to open a bidirectional stream: {e}");
+ IggyError::QuicError
+ })?;
+ Ok(QuinnStreamPair { send, recv })
})
}
}
@@ -72,7 +132,7 @@ impl QuinnFactory {
let mut endpoint = endpoint.unwrap();
endpoint.set_default_client_config(quic_config);
- Ok(Self { config: cfg, ep: Arc::new(endpoint), server_address })
+ Ok(Self { config: cfg, ep: Arc::new(endpoint), server_address,
connection: Arc::new(sync::Mutex::new(None)) })
}
}
diff --git a/core/sdk/src/driver/mod.rs b/core/sdk/src/driver/mod.rs
index 2472c64c..6357a7e3 100644
--- a/core/sdk/src/driver/mod.rs
+++ b/core/sdk/src/driver/mod.rs
@@ -7,7 +7,7 @@ use tokio::io::AsyncWriteExt;
use tracing::{error, info, trace, warn};
use crate::{
- connection::quic::{QuicFactory, QuinnFactory},
+ connection::quic::{QuicFactory, QuinnFactory, StreamPair},
proto::{
connection::{IggyCore, InboundResult},
runtime::{Runtime, sync},
@@ -40,49 +40,71 @@ where
let nt = self.notify.clone();
let core = self.core.clone();
let q = self.factory.clone();
- let cfg: Arc<QuicClientConfig> = self.config.clone();
+ let cfg = self.config.clone();
let pending = self.pending.clone();
rt.spawn(Box::pin(async move {
+ if let Err(e) = q.connect().await {
+ error!("Failed to connect: {e}");
+ return;
+ }
loop {
nt.notified().await;
- while let Some(data) = core.lock().await.poll_transmit() {
+ while let Some(data) = {
+ let mut guard = core.lock().await;
+ guard.poll_transmit()
+ } {
if !pending.contains_key(&data.id) {
error!("Failed to get transport adapter id");
continue;
}
- let connection = q.connect().await.unwrap(); // todo
дорогая операция, перенести хранение connection в структуру quic
- let (mut send, mut recv) = connection
- .open_bi()
- .await
- .map_err(|error| {
- error!("Failed to open a bidirectional stream:
{error}");
- IggyError::QuicError
- })
- .unwrap();
- send.write_vectored(&data.as_slices()).await; // TODO add
map_err
- send.finish().unwrap();
+ let mut stream = match q.open_stream().await {
+ Ok(s) => s,
+ Err(e) => {
+ error!("Failed to open a bidirectional stream:
{e}");
+ continue;
+ }
+ };
+
+ if let Err(e) =
stream.send_vectored(&data.as_slices()).await {
+ error!("Failed to send vectored: {e}");
+ continue;
+ }
- let mut n = cfg.response_buffer_size as usize;
+ let mut at_most = cfg.response_buffer_size as usize;
loop {
- let buffer = recv
- .read_to_end(n)
- .await
- .map_err(|error| {
- error!("Failed to read response data:
{error}");
- IggyError::QuicError
- })
- .unwrap();
- match core.lock().await.feed_inbound(&buffer) {
- InboundResult::Need(need) => n = need,
+ let buffer = match stream.read_chunk(at_most).await {
+ Ok(Some(buf)) => buf,
+ Ok(None) => {
+ error!("Unexpected EOF in stream");
+ break;
+ }
+ Err(e) => {
+ error!("Failed to read response data: {e}");
+ break;
+ }
+ };
+
+ let inbound = {
+ let mut guard = core.lock().await;
+ guard.feed_inbound(&buffer)
+ };
+
+ match inbound {
+ InboundResult::Need(need) => at_most = need,
InboundResult::Response(r) => {
if let Some((_key, tx)) =
pending.remove(&data.id) {
let _ = tx.send(r);
}
+ let mut guard = core.lock().await;
+ guard.mark_tx_done();
+ break;
}
InboundResult::Error(e) => {
- // todo add handle error
+ let mut guard = core.lock().await;
+ guard.mark_tx_done();
+ break;
}
}
}
diff --git a/core/sdk/src/proto/connection.rs b/core/sdk/src/proto/connection.rs
index 1bcc581f..19c04ebb 100644
--- a/core/sdk/src/proto/connection.rs
+++ b/core/sdk/src/proto/connection.rs
@@ -40,6 +40,7 @@ impl TxBuf {
}
}
+#[derive(Debug)]
pub enum Order {
Outbound(Box<dyn Command>),
State(ClientState),
@@ -59,15 +60,15 @@ pub enum InboundResult {
pub struct IggyCore {
state: ClientState,
last_connect: Option<IggyTimestamp>,
- pending: VecDeque<(u32 /* code */, Bytes /* payload */)>,
+ pending: VecDeque<(u32 /* code */, Bytes /* payload */, u64 /*
transport_id */)>,
config: Arc<dyn TransportConfig + Send + Sync + 'static>, // todo rewrite
via generic
retry_count: u32,
- current_tx: Option<TxBuf>,
+ current_tx: Option<Arc<TxBuf>>,
rx_buf: BytesMut,
}
impl IggyCore {
- pub fn write(&mut self, cmd: &impl Command) -> Result<(), IggyError> {
+ pub fn write(&mut self, cmd: &impl Command, id: u64) -> Result<(),
IggyError> {
match self.state {
ClientState::Shutdown => {
trace!("Cannot send data. Client is shutdown.");
@@ -83,7 +84,7 @@ impl IggyCore {
}
_ => {}
}
- self.pending.push_back((cmd.code(), cmd.to_bytes()));
+ self.pending.push_back((cmd.code(), cmd.to_bytes(), id));
Ok(())
}
@@ -140,19 +141,23 @@ impl IggyCore {
Ok(Order::Reconnect)
}
- // TODO вызывать при async fn poll
- pub fn poll_transmit(&mut self) -> Option<&TxBuf> {
+ pub fn poll_transmit(&mut self) -> Option<Arc<TxBuf>> {
if self.current_tx.is_none() {
- let (code, payload) = self.pending.pop_front()?;
+ let (code, payload, id) = self.pending.pop_front()?;
let len = (payload.len() + REQUEST_INITIAL_BYTES_LENGTH) as u32;
- self.current_tx = Some(TxBuf {
+ self.current_tx = Some(Arc::new(TxBuf{
hdr_len: len.to_le_bytes(),
hdr_code: code.to_le_bytes(),
- payload,
- });
+ payload,
+ id,
+ }));
}
- self.current_tx.as_ref()
+ self.current_tx.as_ref().cloned()
+ }
+
+ pub fn mark_tx_done(&mut self) {
+ self.current_tx = None
}
pub fn feed_inbound(&mut self, bytes: &[u8]) -> InboundResult {
@@ -199,4 +204,14 @@ impl IggyCore {
let body = full.split_off(8).freeze();
InboundResult::Response(body)
}
+
+ pub fn on_transport_connected(&mut self) {
+ self.state = ClientState::Connected;
+ self.retry_count = 0;
+ self.last_connect = Some(IggyTimestamp::now());
+ }
+
+ pub fn on_transport_disconnected(&mut self) {
+ self.state = ClientState::Disconnected;
+ }
}
diff --git a/core/sdk/src/proto/runtime.rs b/core/sdk/src/proto/runtime.rs
index 6f4981be..b3227bf2 100644
--- a/core/sdk/src/proto/runtime.rs
+++ b/core/sdk/src/proto/runtime.rs
@@ -1,4 +1,4 @@
-use std::{ops::{Deref, DerefMut}, pin::Pin};
+use std::{ops::{Deref, DerefMut}, pin::Pin, time::Duration};
use iggy_common::IggyError;
@@ -13,6 +13,7 @@ pub mod sync {
pub trait Runtime: Sync + Send + 'static {
fn spawn(&self, future: Pin<Box<dyn Future<Output = ()> + Send>>);
+ fn sleep(&self, duration: Duration) -> Pin<Box<dyn Future<Output = ()> +
Send>>;
}
#[cfg(feature = "runtime_tokio")]
diff --git a/core/sdk/src/quic/quick_client.rs
b/core/sdk/src/quic/quick_client.rs
index 9317cd27..e3ce4419 100644
--- a/core/sdk/src/quic/quick_client.rs
+++ b/core/sdk/src/quic/quick_client.rs
@@ -20,6 +20,7 @@ use crate::prelude::AutoLogin;
use iggy_binary_protocol::{
BinaryClient, BinaryTransport, Client, PersonalAccessTokenClient,
UserClient,
};
+use tokio::io::AsyncWriteExt;
use crate::prelude::{IggyDuration, IggyError, IggyTimestamp, QuicClientConfig};
use crate::quic::skip_server_verification::SkipServerVerification;
@@ -458,7 +459,6 @@ impl QuicClient {
IggyError::QuicError
})?;
trace!("Sending a QUIC request with code: {code}");
-
send.write_all(&(payload_length as u32).to_le_bytes())
.await
.map_err(|error| {
diff --git a/core/sdk/src/transport_adapter/async.rs
b/core/sdk/src/transport_adapter/async.rs
index 7271ed1a..78960bbc 100644
--- a/core/sdk/src/transport_adapter/async.rs
+++ b/core/sdk/src/transport_adapter/async.rs
@@ -1,57 +1,81 @@
-use std::{pin::Pin, sync::{atomic::AtomicU64, Arc, Mutex}};
+use std::{
+ pin::Pin,
+ sync::{Arc, Mutex, atomic::AtomicU64},
+};
use bytes::Bytes;
use iggy_common::{Command, IggyError};
use tokio::sync::Notify;
+use tracing::{error, trace};
-use crate::{connection::quic::QuicFactory, proto::{connection::{IggyCore,
Order}, runtime::{self, sync, Lockable, Runtime}}, transport_adapter::{RespFut,
TransportAdapter}};
+use crate::{
+ connection::quic::QuicFactory,
+ driver::Driver,
+ proto::{
+ connection::{IggyCore, Order},
+ runtime::{self, Runtime, sync},
+ },
+ transport_adapter::RespFut,
+};
-pub struct AsyncTransportAdapter<F: QuicFactory, R: Runtime> {
+pub struct AsyncTransportAdapter<F: QuicFactory, R: Runtime, D: Driver> {
factory: Arc<F>,
rt: Arc<R>,
core: sync::Mutex<IggyCore>,
notify: Arc<Notify>,
id: AtomicU64,
- driver: Driver,
+ driver: Arc<D>,
}
-impl<F, R> AsyncTransportAdapter<F, R>
+impl<F, R, D> AsyncTransportAdapter<F, R, D>
where
F: QuicFactory + Send + Sync + 'static,
- R: Runtime,
+ R: Runtime + Send + Sync + 'static,
+ D: Driver + Send + Sync,
{
- pub fn send_with_response<'a, T: Command>(&'a self, command: &'a T) ->
Pin<Box<dyn Future<Output = Result<RespFut, IggyError>> + Send + 'a >> {
+ pub fn send_with_response<'a, T: Command>(
+ &'a self,
+ command: &'a T,
+ ) -> Pin<Box<dyn Future<Output = Result<RespFut, IggyError>> + Send + Sync
+ 'a>> {
Box::pin(async move {
let (tx, rx) = runtime::oneshot::<Bytes>();
let current_id = self.id.fetch_add(1,
std::sync::atomic::Ordering::SeqCst);
-
+
self.core.lock().await.write(command, current_id)?;
- self.driver.register(current_id);
- self.notify.notify_one();
+ self.driver.register(current_id, tx);
+ self.notify.notify_waiters();
- Ok(RespFut{rx: rx})
+ Ok(RespFut { rx: rx })
})
}
pub async fn connect(&self) -> Result<(), IggyError> {
- let connect = self.core.lock().await.start_connect()?;
+ let mut order = self.core.lock().await.start_connect()?;
loop {
- match connect {
- Order::Reconnect => {
-
+ match order {
+ Order::Wait(dur) => {
+ self.rt.sleep(dur.get_duration()).await;
+ order = self.core.lock().await.poll_connect()?;
}
+
+ Order::Reconnect => match self.factory.connect().await {
+ Ok(()) => {
+ self.core.lock().await.on_transport_connected();
+ return Ok(());
+ }
+ Err(e) => {
+ self.core.lock().await.on_transport_disconnected();
+ order = self.core.lock().await.poll_connect()?;
+ if matches!(order, Order::Noop) {
+ return Err(e);
+ }
+ }
+ },
+
+ Order::Noop => return Ok(()),
+
+ _ => return Err(IggyError::CannotEstablishConnection),
}
}
}
}
-/*
-// tood для transprot
- fn send_with_response<'a, T: iggy_common::Command>(&'a self, command: &'a
T) -> Pin<Box<dyn Future<Output = Result<bytes::Bytes, iggy_common::IggyError>>
+ Send + 'a>> {
- Box::pin(async move {
- self.core.lock().await.write(command)?;
-
- Ok(Bytes::new())
- })
- }
-
-*/
\ No newline at end of file