This is an automated email from the ASF dual-hosted git repository.

bashirbekov pushed a commit to branch draft-sans
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit abd7bc34d17f4a7b47138d7fb117dc55e73e53fc
Merge: 178f9c19 bcb70f0d
Author: haze518 <[email protected]>
AuthorDate: Sat Jul 5 16:30:58 2025 +0600

    del

 core/common/src/error/iggy_error.rs     |  2 ++
 core/sdk/Cargo.toml                     |  3 ++-
 core/sdk/src/driver/mod.rs              | 31 ++++++++++++++++++++++
 core/sdk/src/lib.rs                     |  1 +
 core/sdk/src/proto/runtime.rs           | 30 +++++++++++++--------
 core/sdk/src/transport_adapter/async.rs | 46 +++++++++++++++++++++++++++++++++
 core/sdk/src/transport_adapter/mod.rs   | 19 +++++++++++---
 core/sdk/src/transport_adapter/quic.rs  | 42 ------------------------------
 8 files changed, 116 insertions(+), 58 deletions(-)

diff --cc core/sdk/src/driver/mod.rs
index 00000000,00000000..40bd55b0
new file mode 100644
--- /dev/null
+++ b/core/sdk/src/driver/mod.rs
@@@ -1,0 -1,0 +1,31 @@@
++use std::sync::Arc;
++
++use crate::proto::{connection::IggyCore, runtime::{sync, Runtime}};
++
++pub trait Driver {
++    fn start(&self);
++}
++
++pub struct QuicDriver<R>
++where
++    R: Runtime
++{
++    core: sync::Mutex<IggyCore>,
++    rt: Arc<R>,
++    notify: Arc<sync::Notify>,
++}
++
++impl<R> Driver for QuicDriver<R>
++where
++    R: Runtime
++{
++    fn start(&self) {
++        let rt = self.rt.clone();
++        let nt = self.notify.clone();
++        rt.spawn(Box::pin(async {
++            loop {
++                nt.notified().await
++            }
++        }));
++    }
++}
diff --cc core/sdk/src/lib.rs
index e27a3554,e27a3554..fd55e016
--- a/core/sdk/src/lib.rs
+++ b/core/sdk/src/lib.rs
@@@ -30,3 -30,3 +30,4 @@@ pub mod proto
  pub mod transport_factory;
  pub mod connection;
  pub mod transport_adapter;
++pub mod driver;
diff --cc core/sdk/src/proto/runtime.rs
index 7adb1b19,43a382c2..6f4981be
--- a/core/sdk/src/proto/runtime.rs
+++ b/core/sdk/src/proto/runtime.rs
@@@ -1,18 -1,32 +1,26 @@@
  use std::{ops::{Deref, DerefMut}, pin::Pin};
  
- pub trait Runtime: Sync + Send + 'static {
-     type Mutex<T>: Lockable<T> + Send + Sync + 'static
-     where
-         T: Send + 'static;
+ use iggy_common::IggyError;
+ 
++
+ #[cfg(feature = "runtime_tokio")]
 -type Mutex<T> = tokio::sync::Mutex<T>;
 -#[cfg(feature = "runtime_tokio")]
 -pub type OneShotSender<T> = tokio::sync::oneshot::Sender<T>;
 -#[cfg(feature = "runtime_tokio")]
 -pub type OneShotReceiver<T> = tokio::sync::oneshot::Receiver<T>;
++pub mod sync {
++    pub type Mutex<T> = tokio::sync::Mutex<T>;
++    pub type OneShotSender<T> = tokio::sync::oneshot::Sender<T>;
++    pub type OneShotReceiver<T> = tokio::sync::oneshot::Receiver<T>;
++    pub type Notify = tokio::sync::Notify;
++}
  
-     fn mutex<T: Send + 'static>(&self, value: T) -> Self::Mutex<T>;
+ pub trait Runtime: Sync + Send + 'static {
 -    type Mutex<T>: Lockable<T> + Send + Sync + 'static // TODO remove
 -    where
 -        T: Send + 'static;
 -
 -    fn mutex<T: Send + 'static>(&self, value: T) -> Self::Mutex<T>;
++    fn spawn(&self, future: Pin<Box<dyn Future<Output = ()> + Send>>);
  }
  
--pub trait Lockable<T>: Send + Sync + 'static {
--    type Guard<'a>: Deref<Target = T> + DerefMut + 'a
--    where
--        Self: 'a,
--        T: 'a;
 -
 -    fn lock(&self) -> Pin<Box<dyn Future<Output = Self::Guard<'_>> + Send + 
'_>>;
++#[cfg(feature = "runtime_tokio")]
++pub fn oneshot<T>() -> (sync::OneShotSender<T>, sync::OneShotReceiver<T>) {
++    tokio::sync::oneshot::channel()
+ }
  
-     fn lock(&self) -> Pin<Box<dyn Future<Output = Self::Guard<'_>> + Send + 
'_>>;
+ #[cfg(feature = "runtime_tokio")]
 -pub fn oneshot<T>() -> (OneShotSender<T>, OneShotReceiver<T>) {
 -    tokio::sync::oneshot::channel()
++pub fn notify() -> sync::Notify {
++    tokio::sync::Notify::new()
  }
diff --cc core/sdk/src/transport_adapter/async.rs
index 00000000,8900df5f..4f8fe6dd
mode 000000,100644..100644
--- a/core/sdk/src/transport_adapter/async.rs
+++ b/core/sdk/src/transport_adapter/async.rs
@@@ -1,0 -1,43 +1,46 @@@
 -use std::{collections::VecDeque, pin::Pin, sync::{Arc, Mutex}, task::Waker};
++use std::{pin::Pin, sync::{atomic::AtomicU64, Arc, Mutex}};
+ 
+ use bytes::Bytes;
 -use futures::{channel::oneshot, FutureExt};
+ use iggy_common::{Command, IggyError};
++use tokio::sync::Notify;
+ 
 -use crate::{connection::quic::QuicFactory, proto::{connection::IggyCore, 
runtime::{self, Lockable, Runtime}}, transport_adapter::{RespFut, 
TransportAdapter}};
++use crate::{connection::quic::QuicFactory, proto::{connection::IggyCore, 
runtime::{self, sync, Lockable, Runtime}}, transport_adapter::{RespFut, 
TransportAdapter}};
+ 
 -pub struct QuicAdapter<F: QuicFactory, R: Runtime> {
++pub struct AsyncTransportAdapter<F: QuicFactory, R: Runtime> {
+     factory: Arc<F>,
+     rt: Arc<R>,
 -    core: R::Mutex<IggyCore>,
 -    waiters: VecDeque<Waker>
++    core: sync::Mutex<IggyCore>,
++    notify: Arc<Notify>,
++    id: AtomicU64,
++    driver: Driver,
+ }
+ 
 -impl<F, R> TransportAdapter for QuicAdapter<F, R>
++impl<F, R> AsyncTransportAdapter<F, R>
+ where
+     F: QuicFactory + Send + Sync + 'static,
+     R: Runtime,
+ {
+     fn send_with_response<'a, T: Command>(&'a self, command: &'a T) -> 
Pin<Box<dyn Future<Output = Result<RespFut, IggyError>> + Send + 'a >> {
+         Box::pin(async move {
+             let (tx, rx) = runtime::oneshot::<Bytes>();
 -            self.core.lock().await.write(command)?;
 -            self.waiters.push_back(value);
 -            Ok(RespFut { rx: rx })
++            let current_id = self.id.fetch_add(1, 
std::sync::atomic::Ordering::SeqCst);
++            
++            self.core.lock().await.write(command, current_id)?;
++            self.driver.register(current_id);
++            self.notify.notify_one();
++
++            OK(RespFut{rx: rx})
+         })
+     }
+ }
 -
 -
 -
+ /*
+ // tood для transprot
+     fn send_with_response<'a, T: iggy_common::Command>(&'a self, command: &'a 
T) -> Pin<Box<dyn Future<Output = Result<bytes::Bytes, iggy_common::IggyError>> 
+ Send + 'a>> {
+         Box::pin(async move {
+             self.core.lock().await.write(command)?;
+ 
+             Ok(Bytes::new())
+         })
+     }
+ 
+ */
diff --cc core/sdk/src/transport_adapter/mod.rs
index ee44fefc,c7d6a932..d1656f5f
--- a/core/sdk/src/transport_adapter/mod.rs
+++ b/core/sdk/src/transport_adapter/mod.rs
@@@ -1,10 -1,25 +1,21 @@@
--pub mod quic;
++pub mod r#async;
  
- use std::pin::Pin;
+ use std::{pin::Pin, sync::Arc};
  
  use bytes::Bytes;
+ use futures::FutureExt;
  use iggy_common::{Command, IggyError};
  
- pub trait Driver {
-     fn send_with_response<'a, T: Command>(&'a self, command: &'a T) -> 
Pin<Box<dyn Future<Output = Result<Bytes, IggyError>> + Send + 'a>>;
 -use crate::proto::runtime::OneShotReceiver;
++use crate::proto::runtime::sync::OneShotReceiver;
+ 
+ pub struct RespFut {
+     rx: OneShotReceiver<Bytes>
+ }
+ 
+ impl Future for RespFut {
+     type Output = Result<Bytes, IggyError>;
+ 
+     fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> 
std::task::Poll<Self::Output> {
+         self.get_mut().rx.poll_unpin(cx).map_err(|_| IggyError::ReceiveError)
+     }
  }
 -
 -pub trait TransportAdapter {
 -    fn send_with_response<'a, T: Command>(&'a self, command: &'a T) -> 
Pin<Box<dyn Future<Output = Result<RespFut, IggyError>> + Send + 'a>>;
 -}

Reply via email to