This is an automated email from the ASF dual-hosted git repository. bashirbekov pushed a commit to branch draft-sans in repository https://gitbox.apache.org/repos/asf/iggy.git
commit abd7bc34d17f4a7b47138d7fb117dc55e73e53fc Merge: 178f9c19 bcb70f0d Author: haze518 <[email protected]> AuthorDate: Sat Jul 5 16:30:58 2025 +0600 del core/common/src/error/iggy_error.rs | 2 ++ core/sdk/Cargo.toml | 3 ++- core/sdk/src/driver/mod.rs | 31 ++++++++++++++++++++++ core/sdk/src/lib.rs | 1 + core/sdk/src/proto/runtime.rs | 30 +++++++++++++-------- core/sdk/src/transport_adapter/async.rs | 46 +++++++++++++++++++++++++++++++++ core/sdk/src/transport_adapter/mod.rs | 19 +++++++++++--- core/sdk/src/transport_adapter/quic.rs | 42 ------------------------------ 8 files changed, 116 insertions(+), 58 deletions(-) diff --cc core/sdk/src/driver/mod.rs index 00000000,00000000..40bd55b0 new file mode 100644 --- /dev/null +++ b/core/sdk/src/driver/mod.rs @@@ -1,0 -1,0 +1,31 @@@ ++use std::sync::Arc; ++ ++use crate::proto::{connection::IggyCore, runtime::{sync, Runtime}}; ++ ++pub trait Driver { ++ fn start(&self); ++} ++ ++pub struct QuicDriver<R> ++where ++ R: Runtime ++{ ++ core: sync::Mutex<IggyCore>, ++ rt: Arc<R>, ++ notify: Arc<sync::Notify>, ++} ++ ++impl<R> Driver for QuicDriver<R> ++where ++ R: Runtime ++{ ++ fn start(&self) { ++ let rt = self.rt.clone(); ++ let nt = self.notify.clone(); ++ rt.spawn(Box::pin(async { ++ loop { ++ nt.notified().await ++ } ++ })); ++ } ++} diff --cc core/sdk/src/lib.rs index e27a3554,e27a3554..fd55e016 --- a/core/sdk/src/lib.rs +++ b/core/sdk/src/lib.rs @@@ -30,3 -30,3 +30,4 @@@ pub mod proto pub mod transport_factory; pub mod connection; pub mod transport_adapter; ++pub mod driver; diff --cc core/sdk/src/proto/runtime.rs index 7adb1b19,43a382c2..6f4981be --- a/core/sdk/src/proto/runtime.rs +++ b/core/sdk/src/proto/runtime.rs @@@ -1,18 -1,32 +1,26 @@@ use std::{ops::{Deref, DerefMut}, pin::Pin}; - pub trait Runtime: Sync + Send + 'static { - type Mutex<T>: Lockable<T> + Send + Sync + 'static - where - T: Send + 'static; + use iggy_common::IggyError; + ++ + #[cfg(feature = "runtime_tokio")] -type Mutex<T> = tokio::sync::Mutex<T>; -#[cfg(feature = "runtime_tokio")] -pub type OneShotSender<T> = tokio::sync::oneshot::Sender<T>; -#[cfg(feature = "runtime_tokio")] -pub type OneShotReceiver<T> = tokio::sync::oneshot::Receiver<T>; ++pub mod sync { ++ pub type Mutex<T> = tokio::sync::Mutex<T>; ++ pub type OneShotSender<T> = tokio::sync::oneshot::Sender<T>; ++ pub type OneShotReceiver<T> = tokio::sync::oneshot::Receiver<T>; ++ pub type Notify = tokio::sync::Notify; ++} - fn mutex<T: Send + 'static>(&self, value: T) -> Self::Mutex<T>; + pub trait Runtime: Sync + Send + 'static { - type Mutex<T>: Lockable<T> + Send + Sync + 'static // TODO remove - where - T: Send + 'static; - - fn mutex<T: Send + 'static>(&self, value: T) -> Self::Mutex<T>; ++ fn spawn(&self, future: Pin<Box<dyn Future<Output = ()> + Send>>); } --pub trait Lockable<T>: Send + Sync + 'static { -- type Guard<'a>: Deref<Target = T> + DerefMut + 'a -- where -- Self: 'a, -- T: 'a; - - fn lock(&self) -> Pin<Box<dyn Future<Output = Self::Guard<'_>> + Send + '_>>; ++#[cfg(feature = "runtime_tokio")] ++pub fn oneshot<T>() -> (sync::OneShotSender<T>, sync::OneShotReceiver<T>) { ++ tokio::sync::oneshot::channel() + } - fn lock(&self) -> Pin<Box<dyn Future<Output = Self::Guard<'_>> + Send + '_>>; + #[cfg(feature = "runtime_tokio")] -pub fn oneshot<T>() -> (OneShotSender<T>, OneShotReceiver<T>) { - tokio::sync::oneshot::channel() ++pub fn notify() -> sync::Notify { ++ tokio::sync::Notify::new() } diff --cc core/sdk/src/transport_adapter/async.rs index 00000000,8900df5f..4f8fe6dd mode 000000,100644..100644 --- a/core/sdk/src/transport_adapter/async.rs +++ b/core/sdk/src/transport_adapter/async.rs @@@ -1,0 -1,43 +1,46 @@@ -use std::{collections::VecDeque, pin::Pin, sync::{Arc, Mutex}, task::Waker}; ++use std::{pin::Pin, sync::{atomic::AtomicU64, Arc, Mutex}}; + + use bytes::Bytes; -use futures::{channel::oneshot, FutureExt}; + use iggy_common::{Command, IggyError}; ++use tokio::sync::Notify; + -use crate::{connection::quic::QuicFactory, proto::{connection::IggyCore, runtime::{self, Lockable, Runtime}}, transport_adapter::{RespFut, TransportAdapter}}; ++use crate::{connection::quic::QuicFactory, proto::{connection::IggyCore, runtime::{self, sync, Lockable, Runtime}}, transport_adapter::{RespFut, TransportAdapter}}; + -pub struct QuicAdapter<F: QuicFactory, R: Runtime> { ++pub struct AsyncTransportAdapter<F: QuicFactory, R: Runtime> { + factory: Arc<F>, + rt: Arc<R>, - core: R::Mutex<IggyCore>, - waiters: VecDeque<Waker> ++ core: sync::Mutex<IggyCore>, ++ notify: Arc<Notify>, ++ id: AtomicU64, ++ driver: Driver, + } + -impl<F, R> TransportAdapter for QuicAdapter<F, R> ++impl<F, R> AsyncTransportAdapter<F, R> + where + F: QuicFactory + Send + Sync + 'static, + R: Runtime, + { + fn send_with_response<'a, T: Command>(&'a self, command: &'a T) -> Pin<Box<dyn Future<Output = Result<RespFut, IggyError>> + Send + 'a >> { + Box::pin(async move { + let (tx, rx) = runtime::oneshot::<Bytes>(); - self.core.lock().await.write(command)?; - self.waiters.push_back(value); - Ok(RespFut { rx: rx }) ++ let current_id = self.id.fetch_add(1, std::sync::atomic::Ordering::SeqCst); ++ ++ self.core.lock().await.write(command, current_id)?; ++ self.driver.register(current_id); ++ self.notify.notify_one(); ++ ++ OK(RespFut{rx: rx}) + }) + } + } - - - + /* + // tood для transprot + fn send_with_response<'a, T: iggy_common::Command>(&'a self, command: &'a T) -> Pin<Box<dyn Future<Output = Result<bytes::Bytes, iggy_common::IggyError>> + Send + 'a>> { + Box::pin(async move { + self.core.lock().await.write(command)?; + + Ok(Bytes::new()) + }) + } + + */ diff --cc core/sdk/src/transport_adapter/mod.rs index ee44fefc,c7d6a932..d1656f5f --- a/core/sdk/src/transport_adapter/mod.rs +++ b/core/sdk/src/transport_adapter/mod.rs @@@ -1,10 -1,25 +1,21 @@@ --pub mod quic; ++pub mod r#async; - use std::pin::Pin; + use std::{pin::Pin, sync::Arc}; use bytes::Bytes; + use futures::FutureExt; use iggy_common::{Command, IggyError}; - pub trait Driver { - fn send_with_response<'a, T: Command>(&'a self, command: &'a T) -> Pin<Box<dyn Future<Output = Result<Bytes, IggyError>> + Send + 'a>>; -use crate::proto::runtime::OneShotReceiver; ++use crate::proto::runtime::sync::OneShotReceiver; + + pub struct RespFut { + rx: OneShotReceiver<Bytes> + } + + impl Future for RespFut { + type Output = Result<Bytes, IggyError>; + + fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Self::Output> { + self.get_mut().rx.poll_unpin(cx).map_err(|_| IggyError::ReceiveError) + } } - -pub trait TransportAdapter { - fn send_with_response<'a, T: Command>(&'a self, command: &'a T) -> Pin<Box<dyn Future<Output = Result<RespFut, IggyError>> + Send + 'a>>; -}
