This is an automated email from the ASF dual-hosted git repository.
bashirbekov pushed a commit to branch draft-sans
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/draft-sans by this push:
new e000dc1b del
e000dc1b is described below
commit e000dc1b45412289e89ce6902988b1b7c2568c40
Author: haze518 <[email protected]>
AuthorDate: Sun Jul 6 09:35:09 2025 +0600
del
---
core/sdk/src/driver/mod.rs | 78 ++++++++++++++++++++--
core/sdk/src/proto/connection.rs | 1 +
core/sdk/src/transport_adapter/async.rs | 17 ++++-
.../src/transport_adapter/{async.rs => quic.rs} | 29 ++++----
4 files changed, 100 insertions(+), 25 deletions(-)
diff --git a/core/sdk/src/driver/mod.rs b/core/sdk/src/driver/mod.rs
index 40bd55b0..2472c64c 100644
--- a/core/sdk/src/driver/mod.rs
+++ b/core/sdk/src/driver/mod.rs
@@ -1,31 +1,97 @@
use std::sync::Arc;
-use crate::proto::{connection::IggyCore, runtime::{sync, Runtime}};
+use bytes::Bytes;
+use dashmap::DashMap;
+use iggy_common::{IggyError, QuicClientConfig};
+use tokio::io::AsyncWriteExt;
+use tracing::{error, info, trace, warn};
+
+use crate::{
+ connection::quic::{QuicFactory, QuinnFactory},
+ proto::{
+ connection::{IggyCore, InboundResult},
+ runtime::{Runtime, sync},
+ },
+};
pub trait Driver {
fn start(&self);
+ fn register(&self, id: u64, tx: sync::OneShotSender<Bytes>);
}
pub struct QuicDriver<R>
where
- R: Runtime
+ R: Runtime,
{
- core: sync::Mutex<IggyCore>,
+ core: Arc<sync::Mutex<IggyCore>>,
rt: Arc<R>,
notify: Arc<sync::Notify>,
+ factory: Arc<QuinnFactory>,
+ pub(crate) config: Arc<QuicClientConfig>, // todo change to
driverQuicConfig
+ pending: Arc<DashMap<u64, sync::OneShotSender<Bytes>>>,
}
impl<R> Driver for QuicDriver<R>
where
- R: Runtime
+ R: Runtime,
{
fn start(&self) {
let rt = self.rt.clone();
let nt = self.notify.clone();
- rt.spawn(Box::pin(async {
+ let core = self.core.clone();
+ let q = self.factory.clone();
+ let cfg: Arc<QuicClientConfig> = self.config.clone();
+ let pending = self.pending.clone();
+ rt.spawn(Box::pin(async move {
loop {
- nt.notified().await
+ nt.notified().await;
+
+ while let Some(data) = core.lock().await.poll_transmit() {
+ if !pending.contains_key(&data.id) {
+ error!("Failed to get transport adapter id");
+ continue;
+ }
+
+ let connection = q.connect().await.unwrap(); // todo
дорогая операция, перенести хранение connection в структуру quic
+ let (mut send, mut recv) = connection
+ .open_bi()
+ .await
+ .map_err(|error| {
+ error!("Failed to open a bidirectional stream:
{error}");
+ IggyError::QuicError
+ })
+ .unwrap();
+ send.write_vectored(&data.as_slices()).await; // TODO add
map_err
+ send.finish().unwrap();
+
+ let mut n = cfg.response_buffer_size as usize;
+ loop {
+ let buffer = recv
+ .read_to_end(n)
+ .await
+ .map_err(|error| {
+ error!("Failed to read response data:
{error}");
+ IggyError::QuicError
+ })
+ .unwrap();
+ match core.lock().await.feed_inbound(&buffer) {
+ InboundResult::Need(need) => n = need,
+ InboundResult::Response(r) => {
+ if let Some((_key, tx)) =
pending.remove(&data.id) {
+ let _ = tx.send(r);
+ }
+ }
+ InboundResult::Error(e) => {
+ // todo add handle error
+ }
+ }
+ }
+ }
}
}));
}
+
+ fn register(&self, id: u64, tx: sync::OneShotSender<Bytes>) {
+ self.pending.insert(id, tx);
+ }
}
diff --git a/core/sdk/src/proto/connection.rs b/core/sdk/src/proto/connection.rs
index 69d0fd68..1bcc581f 100644
--- a/core/sdk/src/proto/connection.rs
+++ b/core/sdk/src/proto/connection.rs
@@ -24,6 +24,7 @@ pub trait TransportConfig {
}
pub struct TxBuf {
+ pub id: u64,
hdr_len: [u8; 4],
hdr_code: [u8; 4],
payload: Bytes,
diff --git a/core/sdk/src/transport_adapter/async.rs
b/core/sdk/src/transport_adapter/async.rs
index 4f8fe6dd..7271ed1a 100644
--- a/core/sdk/src/transport_adapter/async.rs
+++ b/core/sdk/src/transport_adapter/async.rs
@@ -4,7 +4,7 @@ use bytes::Bytes;
use iggy_common::{Command, IggyError};
use tokio::sync::Notify;
-use crate::{connection::quic::QuicFactory, proto::{connection::IggyCore,
runtime::{self, sync, Lockable, Runtime}}, transport_adapter::{RespFut,
TransportAdapter}};
+use crate::{connection::quic::QuicFactory, proto::{connection::{IggyCore,
Order}, runtime::{self, sync, Lockable, Runtime}}, transport_adapter::{RespFut,
TransportAdapter}};
pub struct AsyncTransportAdapter<F: QuicFactory, R: Runtime> {
factory: Arc<F>,
@@ -20,7 +20,7 @@ where
F: QuicFactory + Send + Sync + 'static,
R: Runtime,
{
- fn send_with_response<'a, T: Command>(&'a self, command: &'a T) ->
Pin<Box<dyn Future<Output = Result<RespFut, IggyError>> + Send + 'a >> {
+ pub fn send_with_response<'a, T: Command>(&'a self, command: &'a T) ->
Pin<Box<dyn Future<Output = Result<RespFut, IggyError>> + Send + 'a >> {
Box::pin(async move {
let (tx, rx) = runtime::oneshot::<Bytes>();
let current_id = self.id.fetch_add(1,
std::sync::atomic::Ordering::SeqCst);
@@ -29,9 +29,20 @@ where
self.driver.register(current_id);
self.notify.notify_one();
- OK(RespFut{rx: rx})
+ Ok(RespFut{rx: rx})
})
}
+
+ pub async fn connect(&self) -> Result<(), IggyError> {
+ let connect = self.core.lock().await.start_connect()?;
+ loop {
+ match connect {
+ Order::Reconnect => {
+
+ }
+ }
+ }
+ }
}
/*
// tood для transprot
diff --git a/core/sdk/src/transport_adapter/async.rs
b/core/sdk/src/transport_adapter/quic.rs
similarity index 54%
copy from core/sdk/src/transport_adapter/async.rs
copy to core/sdk/src/transport_adapter/quic.rs
index 4f8fe6dd..8900df5f 100644
--- a/core/sdk/src/transport_adapter/async.rs
+++ b/core/sdk/src/transport_adapter/quic.rs
@@ -1,21 +1,19 @@
-use std::{pin::Pin, sync::{atomic::AtomicU64, Arc, Mutex}};
+use std::{collections::VecDeque, pin::Pin, sync::{Arc, Mutex}, task::Waker};
use bytes::Bytes;
+use futures::{channel::oneshot, FutureExt};
use iggy_common::{Command, IggyError};
-use tokio::sync::Notify;
-use crate::{connection::quic::QuicFactory, proto::{connection::IggyCore,
runtime::{self, sync, Lockable, Runtime}}, transport_adapter::{RespFut,
TransportAdapter}};
+use crate::{connection::quic::QuicFactory, proto::{connection::IggyCore,
runtime::{self, Lockable, Runtime}}, transport_adapter::{RespFut,
TransportAdapter}};
-pub struct AsyncTransportAdapter<F: QuicFactory, R: Runtime> {
+pub struct QuicAdapter<F: QuicFactory, R: Runtime> {
factory: Arc<F>,
rt: Arc<R>,
- core: sync::Mutex<IggyCore>,
- notify: Arc<Notify>,
- id: AtomicU64,
- driver: Driver,
+ core: R::Mutex<IggyCore>,
+ waiters: VecDeque<Waker>
}
-impl<F, R> AsyncTransportAdapter<F, R>
+impl<F, R> TransportAdapter for QuicAdapter<F, R>
where
F: QuicFactory + Send + Sync + 'static,
R: Runtime,
@@ -23,16 +21,15 @@ where
fn send_with_response<'a, T: Command>(&'a self, command: &'a T) ->
Pin<Box<dyn Future<Output = Result<RespFut, IggyError>> + Send + 'a >> {
Box::pin(async move {
let (tx, rx) = runtime::oneshot::<Bytes>();
- let current_id = self.id.fetch_add(1,
std::sync::atomic::Ordering::SeqCst);
-
- self.core.lock().await.write(command, current_id)?;
- self.driver.register(current_id);
- self.notify.notify_one();
-
- OK(RespFut{rx: rx})
+ self.core.lock().await.write(command)?;
+ self.waiters.push_back(value);
+ Ok(RespFut { rx: rx })
})
}
}
+
+
+
/*
// tood для transprot
fn send_with_response<'a, T: iggy_common::Command>(&'a self, command: &'a
T) -> Pin<Box<dyn Future<Output = Result<bytes::Bytes, iggy_common::IggyError>>
+ Send + 'a>> {