This is an automated email from the ASF dual-hosted git repository.

bashirbekov pushed a commit to branch draft-sans
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/draft-sans by this push:
     new e000dc1b del
e000dc1b is described below

commit e000dc1b45412289e89ce6902988b1b7c2568c40
Author: haze518 <[email protected]>
AuthorDate: Sun Jul 6 09:35:09 2025 +0600

    del
---
 core/sdk/src/driver/mod.rs                         | 78 ++++++++++++++++++++--
 core/sdk/src/proto/connection.rs                   |  1 +
 core/sdk/src/transport_adapter/async.rs            | 17 ++++-
 .../src/transport_adapter/{async.rs => quic.rs}    | 29 ++++----
 4 files changed, 100 insertions(+), 25 deletions(-)

diff --git a/core/sdk/src/driver/mod.rs b/core/sdk/src/driver/mod.rs
index 40bd55b0..2472c64c 100644
--- a/core/sdk/src/driver/mod.rs
+++ b/core/sdk/src/driver/mod.rs
@@ -1,31 +1,97 @@
 use std::sync::Arc;
 
-use crate::proto::{connection::IggyCore, runtime::{sync, Runtime}};
+use bytes::Bytes;
+use dashmap::DashMap;
+use iggy_common::{IggyError, QuicClientConfig};
+use tokio::io::AsyncWriteExt;
+use tracing::{error, info, trace, warn};
+
+use crate::{
+    connection::quic::{QuicFactory, QuinnFactory},
+    proto::{
+        connection::{IggyCore, InboundResult},
+        runtime::{Runtime, sync},
+    },
+};
 
 pub trait Driver {
     fn start(&self);
+    fn register(&self, id: u64, tx: sync::OneShotSender<Bytes>);
 }
 
 pub struct QuicDriver<R>
 where
-    R: Runtime
+    R: Runtime,
 {
-    core: sync::Mutex<IggyCore>,
+    core: Arc<sync::Mutex<IggyCore>>,
     rt: Arc<R>,
     notify: Arc<sync::Notify>,
+    factory: Arc<QuinnFactory>,
+    pub(crate) config: Arc<QuicClientConfig>, // todo change to 
driverQuicConfig
+    pending: Arc<DashMap<u64, sync::OneShotSender<Bytes>>>,
 }
 
 impl<R> Driver for QuicDriver<R>
 where
-    R: Runtime
+    R: Runtime,
 {
     fn start(&self) {
         let rt = self.rt.clone();
         let nt = self.notify.clone();
-        rt.spawn(Box::pin(async {
+        let core = self.core.clone();
+        let q = self.factory.clone();
+        let cfg: Arc<QuicClientConfig> = self.config.clone();
+        let pending = self.pending.clone();
+        rt.spawn(Box::pin(async move {
             loop {
-                nt.notified().await
+                nt.notified().await;
+
+                while let Some(data) = core.lock().await.poll_transmit() {
+                    if !pending.contains_key(&data.id) {
+                        error!("Failed to get transport adapter id");
+                        continue;
+                    }
+
+                    let connection = q.connect().await.unwrap(); // todo 
дорогая операция, перенести хранение connection в структуру quic
+                    let (mut send, mut recv) = connection
+                        .open_bi()
+                        .await
+                        .map_err(|error| {
+                            error!("Failed to open a bidirectional stream: 
{error}");
+                            IggyError::QuicError
+                        })
+                        .unwrap();
+                    send.write_vectored(&data.as_slices()).await; // TODO add 
map_err
+                    send.finish().unwrap();
+
+                    let mut n = cfg.response_buffer_size as usize;
+                    loop {
+                        let buffer = recv
+                            .read_to_end(n)
+                            .await
+                            .map_err(|error| {
+                                error!("Failed to read response data: 
{error}");
+                                IggyError::QuicError
+                            })
+                            .unwrap();
+                        match core.lock().await.feed_inbound(&buffer) {
+                            InboundResult::Need(need) => n = need,
+                            InboundResult::Response(r) => {
+                                if let Some((_key, tx)) = 
pending.remove(&data.id) {
+                                    let _ = tx.send(r);
+                                }
+                            }
+                            InboundResult::Error(e) => {
+                                // todo add handle error
+                            }
+                        }
+                    }
+                }
             }
         }));
     }
+
+    fn register(&self, id: u64, tx: sync::OneShotSender<Bytes>) {
+        self.pending.insert(id, tx);
+    }
 }
diff --git a/core/sdk/src/proto/connection.rs b/core/sdk/src/proto/connection.rs
index 69d0fd68..1bcc581f 100644
--- a/core/sdk/src/proto/connection.rs
+++ b/core/sdk/src/proto/connection.rs
@@ -24,6 +24,7 @@ pub trait TransportConfig {
 }
 
 pub struct TxBuf {
+    pub id: u64,
     hdr_len: [u8; 4],
     hdr_code: [u8; 4],
     payload: Bytes,
diff --git a/core/sdk/src/transport_adapter/async.rs 
b/core/sdk/src/transport_adapter/async.rs
index 4f8fe6dd..7271ed1a 100644
--- a/core/sdk/src/transport_adapter/async.rs
+++ b/core/sdk/src/transport_adapter/async.rs
@@ -4,7 +4,7 @@ use bytes::Bytes;
 use iggy_common::{Command, IggyError};
 use tokio::sync::Notify;
 
-use crate::{connection::quic::QuicFactory, proto::{connection::IggyCore, 
runtime::{self, sync, Lockable, Runtime}}, transport_adapter::{RespFut, 
TransportAdapter}};
+use crate::{connection::quic::QuicFactory, proto::{connection::{IggyCore, 
Order}, runtime::{self, sync, Lockable, Runtime}}, transport_adapter::{RespFut, 
TransportAdapter}};
 
 pub struct AsyncTransportAdapter<F: QuicFactory, R: Runtime> {
     factory: Arc<F>,
@@ -20,7 +20,7 @@ where
     F: QuicFactory + Send + Sync + 'static,
     R: Runtime,
 {
-    fn send_with_response<'a, T: Command>(&'a self, command: &'a T) -> 
Pin<Box<dyn Future<Output = Result<RespFut, IggyError>> + Send + 'a >> {
+    pub fn send_with_response<'a, T: Command>(&'a self, command: &'a T) -> 
Pin<Box<dyn Future<Output = Result<RespFut, IggyError>> + Send + 'a >> {
         Box::pin(async move {
             let (tx, rx) = runtime::oneshot::<Bytes>();
             let current_id = self.id.fetch_add(1, 
std::sync::atomic::Ordering::SeqCst);
@@ -29,9 +29,20 @@ where
             self.driver.register(current_id);
             self.notify.notify_one();
 
-            OK(RespFut{rx: rx})
+            Ok(RespFut{rx: rx})
         })
     }
+
+    pub async fn connect(&self) -> Result<(), IggyError> {
+        let connect = self.core.lock().await.start_connect()?;
+        loop {
+            match connect {
+                Order::Reconnect => {
+                    
+                }
+            }
+        }
+    }
 }
 /*
 // tood для transprot
diff --git a/core/sdk/src/transport_adapter/async.rs 
b/core/sdk/src/transport_adapter/quic.rs
similarity index 54%
copy from core/sdk/src/transport_adapter/async.rs
copy to core/sdk/src/transport_adapter/quic.rs
index 4f8fe6dd..8900df5f 100644
--- a/core/sdk/src/transport_adapter/async.rs
+++ b/core/sdk/src/transport_adapter/quic.rs
@@ -1,21 +1,19 @@
-use std::{pin::Pin, sync::{atomic::AtomicU64, Arc, Mutex}};
+use std::{collections::VecDeque, pin::Pin, sync::{Arc, Mutex}, task::Waker};
 
 use bytes::Bytes;
+use futures::{channel::oneshot, FutureExt};
 use iggy_common::{Command, IggyError};
-use tokio::sync::Notify;
 
-use crate::{connection::quic::QuicFactory, proto::{connection::IggyCore, 
runtime::{self, sync, Lockable, Runtime}}, transport_adapter::{RespFut, 
TransportAdapter}};
+use crate::{connection::quic::QuicFactory, proto::{connection::IggyCore, 
runtime::{self, Lockable, Runtime}}, transport_adapter::{RespFut, 
TransportAdapter}};
 
-pub struct AsyncTransportAdapter<F: QuicFactory, R: Runtime> {
+pub struct QuicAdapter<F: QuicFactory, R: Runtime> {
     factory: Arc<F>,
     rt: Arc<R>,
-    core: sync::Mutex<IggyCore>,
-    notify: Arc<Notify>,
-    id: AtomicU64,
-    driver: Driver,
+    core: R::Mutex<IggyCore>,
+    waiters: VecDeque<Waker>
 }
 
-impl<F, R> AsyncTransportAdapter<F, R>
+impl<F, R> TransportAdapter for QuicAdapter<F, R>
 where
     F: QuicFactory + Send + Sync + 'static,
     R: Runtime,
@@ -23,16 +21,15 @@ where
     fn send_with_response<'a, T: Command>(&'a self, command: &'a T) -> 
Pin<Box<dyn Future<Output = Result<RespFut, IggyError>> + Send + 'a >> {
         Box::pin(async move {
             let (tx, rx) = runtime::oneshot::<Bytes>();
-            let current_id = self.id.fetch_add(1, 
std::sync::atomic::Ordering::SeqCst);
-            
-            self.core.lock().await.write(command, current_id)?;
-            self.driver.register(current_id);
-            self.notify.notify_one();
-
-            OK(RespFut{rx: rx})
+            self.core.lock().await.write(command)?;
+            self.waiters.push_back(value);
+            Ok(RespFut { rx: rx })
         })
     }
 }
+
+
+
 /*
 // tood для transprot
     fn send_with_response<'a, T: iggy_common::Command>(&'a self, command: &'a 
T) -> Pin<Box<dyn Future<Output = Result<bytes::Bytes, iggy_common::IggyError>> 
+ Send + 'a>> {

Reply via email to