This is an automated email from the ASF dual-hosted git repository.

bashirbekov pushed a commit to branch draft-sans
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/draft-sans by this push:
     new 4309efc0 del
4309efc0 is described below

commit 4309efc0f2956a28838a6f24b6e850b0f4f8f91d
Author: haze518 <[email protected]>
AuthorDate: Sat Jul 12 08:54:20 2025 +0600

    del
---
 core/integration/tests/sdk/producer/mod.rs | 39 +++++++++---------------------
 core/sdk/src/connection/mod.rs             |  4 +--
 core/sdk/src/connection/quic/mod.rs        |  4 +--
 core/sdk/src/connection/tcp/mod.rs         | 26 ++++++++++++++------
 core/sdk/src/connection/tcp/tcp.rs         |  1 +
 core/sdk/src/driver/mod.rs                 |  3 ++-
 core/sdk/src/driver/tcp.rs                 |  2 ++
 core/sdk/src/proto/connection.rs           |  3 +++
 core/sdk/src/proto/runtime.rs              |  1 +
 core/sdk/src/transport_adapter/async.rs    | 36 ++++++++++++++++++++++++++-
 10 files changed, 77 insertions(+), 42 deletions(-)

diff --git a/core/integration/tests/sdk/producer/mod.rs 
b/core/integration/tests/sdk/producer/mod.rs
index b022634a..90f9fe83 100644
--- a/core/integration/tests/sdk/producer/mod.rs
+++ b/core/integration/tests/sdk/producer/mod.rs
@@ -19,6 +19,7 @@
 mod background;
 
 use std::sync::Arc;
+use std::time::Duration;
 
 use bytes::Bytes;
 use iggy::clients::client::IggyClient;
@@ -28,7 +29,8 @@ use iggy::prelude::*;
 use iggy::proto::connection::{IggyCore, IggyCoreConfig};
 use iggy::proto::runtime::{sync, TokioRuntime};
 use iggy::transport_adapter::r#async::AsyncTransportAdapter;
-use integration::test_server::TestServer;
+use integration::test_server::{login_root, TestServer};
+use tokio::time::sleep;
 
 const STREAM_ID: u32 = 1;
 const TOPIC_ID: u32 = 1;
@@ -72,12 +74,13 @@ async fn cleanup(system_client: &IggyClient) {
 }
 
 
-async fn async_send() {
-    let mut test_server = TestServer::default();
-    test_server.start();
+#[tokio::test]
+async fn test_async_send() {
+    // let mut test_server = TestServer::default();
+    // test_server.start();
 
     let tcp_client_config = TcpClientConfig {
-        server_address: test_server.get_raw_tcp_addr().unwrap(),
+        server_address: "127.0.0.1:8090".to_string(),
         ..TcpClientConfig::default()
     };
     
@@ -85,30 +88,10 @@ async fn async_send() {
     let core = 
Arc::new(sync::Mutex::new(IggyCore::new(IggyCoreConfig::default())));
     let rt: Arc<TokioRuntime> = Arc::new(TokioRuntime{});
     let notify = Arc::new(sync::Notify::new());
-    let dirver = TokioTcpDriver::new(core, rt.clone(), notify.clone(), 
tcp_factory);
-    let adapter = AsyncTransportAdapter::new(tcp_factory, rt, core, dirver, 
notify);
+    let dirver = TokioTcpDriver::new(core.clone(), rt.clone(), notify.clone(), 
tcp_factory.clone());
+    let adapter = Box::new(AsyncTransportAdapter::new(tcp_factory, rt, core, 
dirver, notify));
 
-    adapter.connect().await;
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-    
-
-    let client = 
Box::new(TcpClient::create(Arc::new(tcp_client_config)).unwrap());
-    let client = IggyClient::create(client, None, None);
+    let client = IggyClient::create(adapter, None, None);
 
     client.connect().await.unwrap();
     assert!(client.ping().await.is_ok(), "Failed to ping server");
diff --git a/core/sdk/src/connection/mod.rs b/core/sdk/src/connection/mod.rs
index d9024e5c..e028ad38 100644
--- a/core/sdk/src/connection/mod.rs
+++ b/core/sdk/src/connection/mod.rs
@@ -1,6 +1,6 @@
 use std::{io::IoSlice, pin::Pin};
 
-use bytes::Bytes;
+use bytes::{Bytes, BytesMut};
 use iggy_common::IggyError;
 
 pub mod tcp;
@@ -20,5 +20,5 @@ pub trait StreamConnectionFactory: ConnectionFactory {
 
 pub trait StreamPair: Send {
     fn send_vectored<'a>(&'a mut self, bufs: &'a [IoSlice<'_>]) -> Pin<Box<dyn 
Future<Output = Result<(), IggyError>> + Send + 'a>>;
-    fn read_buf<'a>(&'a mut self, buf: &'a mut [u8]) -> Pin<Box<dyn 
Future<Output = Result<usize, IggyError>> + Send + 'a>>;
+    fn read_buf<'a>(&'a mut self, buf: &'a mut BytesMut) -> Pin<Box<dyn 
Future<Output = Result<usize, IggyError>> + Send + 'a>>;
 }
diff --git a/core/sdk/src/connection/quic/mod.rs 
b/core/sdk/src/connection/quic/mod.rs
index 1733c3b8..0f58b4a1 100644
--- a/core/sdk/src/connection/quic/mod.rs
+++ b/core/sdk/src/connection/quic/mod.rs
@@ -1,7 +1,7 @@
 use std::io::IoSlice;
 use std::sync::Arc;
 use std::{io, net::SocketAddr, pin::Pin, time::Duration};
-use bytes::Bytes;
+use bytes::{Bytes, BytesMut};
 use iggy_common::{IggyError, QuicClientConfig};
 use rustls::crypto::CryptoProvider;
 use tokio::io::{AsyncReadExt, AsyncWriteExt};
@@ -33,7 +33,7 @@ impl StreamPair for QuinnStreamPair {
         })
     }
 
-    fn read_buf<'a>(&'a mut self, mut buf: &'a mut [u8]) -> Pin<Box<dyn 
Future<Output = Result<usize, IggyError>> + Send + 'a>> {
+    fn read_buf<'a>(&'a mut self, mut buf: &'a mut BytesMut) -> Pin<Box<dyn 
Future<Output = Result<usize, IggyError>> + Send + 'a>> {
         Box::pin(async move {
             self.recv.read_buf(&mut buf).await.map_err(|e| {
                 error!("Failed to read chunk: {e}");
diff --git a/core/sdk/src/connection/tcp/mod.rs 
b/core/sdk/src/connection/tcp/mod.rs
index 5c07fffd..51993511 100644
--- a/core/sdk/src/connection/tcp/mod.rs
+++ b/core/sdk/src/connection/tcp/mod.rs
@@ -1,6 +1,7 @@
 pub mod tcp;
 pub mod tls;
 
+use bytes::BytesMut;
 use futures::{AsyncRead, AsyncWrite};
 use tracing::error;
 use std::{io, net::SocketAddr, pin::Pin};
@@ -21,6 +22,7 @@ pub trait SocketFactory {
     fn connect(&self) -> Pin<Box<dyn Future<Output = io::Result<Self::Stream>> 
+ Send>>;
 }
 
+#[derive(Debug)]
 pub struct TokioTcpStream {
     reader: BufReader<OwnedReadHalf>,
     writer: BufWriter<OwnedWriteHalf>,
@@ -32,12 +34,20 @@ impl StreamPair for TokioTcpStream {
         bufs: &'a [io::IoSlice<'_>],
     ) -> Pin<Box<dyn Future<Output = Result<(), iggy_common::IggyError>> + 
Send + 'a>> {
         Box::pin(async move {
-            self.writer.write_vectored(bufs).await.map_err(|e| {
-                error!(
-                    "Failed to write data to the TCP connection: {e}",
-                );
-                IggyError::TcpError
-            })?;
+            for val in bufs {
+                self.writer.write(val).await.map_err(|e| {
+                    error!(
+                        "Failed to write data to the TCP connection: {e}",
+                    );
+                    IggyError::TcpError
+                })?;
+            }
+            // self.writer.write_vectored(bufs).await.map_err(|e| {
+            //     error!(
+            //         "Failed to write data to the TCP connection: {e}",
+            //     );
+            //     IggyError::TcpError
+            // })?;
             self.writer.flush().await.map_err(|e| {
                 error!(
                     "Failed to write data to the TCP connection: {e}",
@@ -50,10 +60,10 @@ impl StreamPair for TokioTcpStream {
 
     fn read_buf<'a>(
         &'a mut self,
-        buf: &'a mut [u8],
+        buf: &'a mut BytesMut,
     ) -> Pin<Box<dyn Future<Output = Result<usize, iggy_common::IggyError>> + 
Send + 'a>> {
         Box::pin(async move {
-            self.reader.read_exact(buf).await.map_err(|e| {
+            self.reader.read_buf(buf).await.map_err(|e| {
                 error!(
                     "Failed to read data from the TCP connection: {e}",
                 );
diff --git a/core/sdk/src/connection/tcp/tcp.rs 
b/core/sdk/src/connection/tcp/tcp.rs
index d2e6f563..cf5757f4 100644
--- a/core/sdk/src/connection/tcp/tcp.rs
+++ b/core/sdk/src/connection/tcp/tcp.rs
@@ -11,6 +11,7 @@ use tracing::error;
 
 pub type TokioCompatStream = tokio_util::compat::Compat<tokio::net::TcpStream>;
 
+#[derive(Debug)]
 pub struct TokioTcpFactory {
     pub(crate) config: Arc<TcpClientConfig>,
     client_address: Arc<sync::Mutex<Option<SocketAddr>>>,
diff --git a/core/sdk/src/driver/mod.rs b/core/sdk/src/driver/mod.rs
index 672274a8..dd7c7dc6 100644
--- a/core/sdk/src/driver/mod.rs
+++ b/core/sdk/src/driver/mod.rs
@@ -34,6 +34,7 @@ where
     pending: Arc<DashMap<u64, sync::OneShotSender<Bytes>>>,
 }
 
+// TODO add errChan
 impl<R> Driver for QuicDriver<R>
 where
     R: Runtime,
@@ -69,7 +70,7 @@ where
 
                     if let Err(e) = 
stream.send_vectored(&data.as_slices()).await {
                         error!("Failed to send vectored: {e}");
-                        continue;
+                        break;
                     }
 
                     let mut at_most = cfg.response_buffer_size as usize;
diff --git a/core/sdk/src/driver/tcp.rs b/core/sdk/src/driver/tcp.rs
index d6809edf..2bfabe29 100644
--- a/core/sdk/src/driver/tcp.rs
+++ b/core/sdk/src/driver/tcp.rs
@@ -7,6 +7,7 @@ use tracing::error;
 
 use crate::{connection::{tcp::tcp::TokioTcpFactory, StreamPair}, 
driver::Driver, proto::{connection::{IggyCore, InboundResult}, runtime::{sync, 
Runtime}}};
 
+#[derive(Debug)]
 pub struct TokioTcpDriver<R>
 where
     R: Runtime
@@ -48,6 +49,7 @@ where
             let mut rx_buf = BytesMut::new();
             loop {
                 nt.notified().await;
+                // TODO убирать txBuf, если не удается его прочитать и 
отсылать ошибку
                 while let Some(data) = {
                     let mut guard = core.lock().await;
                     guard.poll_transmit()
diff --git a/core/sdk/src/proto/connection.rs b/core/sdk/src/proto/connection.rs
index 8d5e396d..2517b0d4 100644
--- a/core/sdk/src/proto/connection.rs
+++ b/core/sdk/src/proto/connection.rs
@@ -18,6 +18,7 @@ const ALREADY_EXISTS_STATUSES: &[u32] = &[
     IggyErrorDiscriminants::ConsumerGroupNameAlreadyExists as u32,
 ];
 
+#[derive(Debug)]
 pub struct IggyCoreConfig {
     max_retries: Option<u32>,
     reestablish_after: IggyDuration,
@@ -29,6 +30,7 @@ impl Default for IggyCoreConfig {
     }
 }
 
+#[derive(Debug)]
 pub struct TxBuf {
     pub id: u64,
     hdr_len: [u8; 4],
@@ -63,6 +65,7 @@ pub enum InboundResult {
     Error(IggyError),   
 }
 
+#[derive(Debug)]
 pub struct IggyCore {
     pub(crate) state: ClientState,
     last_connect: Option<IggyTimestamp>,
diff --git a/core/sdk/src/proto/runtime.rs b/core/sdk/src/proto/runtime.rs
index 022a5adf..3bb7bf0b 100644
--- a/core/sdk/src/proto/runtime.rs
+++ b/core/sdk/src/proto/runtime.rs
@@ -26,6 +26,7 @@ pub fn notify() -> sync::Notify {
     tokio::sync::Notify::new()
 }
 
+#[derive(Debug)]
 pub struct TokioRuntime;
 
 impl Runtime for TokioRuntime {
diff --git a/core/sdk/src/transport_adapter/async.rs 
b/core/sdk/src/transport_adapter/async.rs
index 2ca08c89..7f422bad 100644
--- a/core/sdk/src/transport_adapter/async.rs
+++ b/core/sdk/src/transport_adapter/async.rs
@@ -5,7 +5,7 @@ use async_broadcast::{Receiver, Sender, broadcast};
 
 use async_trait::async_trait;
 use bytes::Bytes;
-use iggy_binary_protocol::BinaryTransport;
+use iggy_binary_protocol::{BinaryClient, BinaryTransport, Client};
 use iggy_common::{ClientState, Command, DiagnosticEvent, IggyDuration, 
IggyError};
 use tokio::sync::Notify;
 use tracing::{error, trace};
@@ -20,6 +20,7 @@ use crate::{
     transport_adapter::RespFut,
 };
 
+#[derive(Debug)]
 pub struct AsyncTransportAdapter<F: ConnectionFactory, R: Runtime, D: Driver> {
     factory: Arc<F>,
     rt: Arc<R>,
@@ -77,6 +78,8 @@ where
                         return Ok(());
                     }
                     Err(e) => {
+                        error!("got error: {e}");
+                        panic!("{e}");
                         self.core.lock().await.on_transport_disconnected();
                         order = self.core.lock().await.poll_connect()?;
                         if matches!(order, Order::Noop) {
@@ -165,3 +168,34 @@ where
         }
     }
 }
+
+#[async_trait]
+impl<F, R, D> Client for AsyncTransportAdapter<F, R, D> 
+where
+    F: ConnectionFactory + Send + Sync + 'static + std::fmt::Debug,
+    R: Runtime + Send + Sync + 'static + std::fmt::Debug,
+    D: Driver + Send + Sync + std::fmt::Debug,
+{
+    async fn connect(&self) -> Result<(), IggyError> {
+        AsyncTransportAdapter::connect(self).await
+    }
+
+    async fn disconnect(&self) -> Result<(), IggyError> {
+        AsyncTransportAdapter::shutdown(self).await
+    }
+
+    async fn shutdown(&self) -> Result<(), IggyError> {
+        AsyncTransportAdapter::shutdown(self).await
+    }
+
+    async fn subscribe_events(&self) -> Receiver<DiagnosticEvent> {
+        self.events.1.clone()
+    }
+}
+
+impl<F, R, D> BinaryClient for AsyncTransportAdapter<F, R, D> 
+where
+    F: ConnectionFactory + Send + Sync + 'static + std::fmt::Debug,
+    R: Runtime + Send + Sync + 'static + std::fmt::Debug,
+    D: Driver + Send + Sync + std::fmt::Debug,
+{}

Reply via email to