This is an automated email from the ASF dual-hosted git repository.
bashirbekov pushed a commit to branch draft-sans
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/draft-sans by this push:
new 4309efc0 del
4309efc0 is described below
commit 4309efc0f2956a28838a6f24b6e850b0f4f8f91d
Author: haze518 <[email protected]>
AuthorDate: Sat Jul 12 08:54:20 2025 +0600
del
---
core/integration/tests/sdk/producer/mod.rs | 39 +++++++++---------------------
core/sdk/src/connection/mod.rs | 4 +--
core/sdk/src/connection/quic/mod.rs | 4 +--
core/sdk/src/connection/tcp/mod.rs | 26 ++++++++++++++------
core/sdk/src/connection/tcp/tcp.rs | 1 +
core/sdk/src/driver/mod.rs | 3 ++-
core/sdk/src/driver/tcp.rs | 2 ++
core/sdk/src/proto/connection.rs | 3 +++
core/sdk/src/proto/runtime.rs | 1 +
core/sdk/src/transport_adapter/async.rs | 36 ++++++++++++++++++++++++++-
10 files changed, 77 insertions(+), 42 deletions(-)
diff --git a/core/integration/tests/sdk/producer/mod.rs
b/core/integration/tests/sdk/producer/mod.rs
index b022634a..90f9fe83 100644
--- a/core/integration/tests/sdk/producer/mod.rs
+++ b/core/integration/tests/sdk/producer/mod.rs
@@ -19,6 +19,7 @@
mod background;
use std::sync::Arc;
+use std::time::Duration;
use bytes::Bytes;
use iggy::clients::client::IggyClient;
@@ -28,7 +29,8 @@ use iggy::prelude::*;
use iggy::proto::connection::{IggyCore, IggyCoreConfig};
use iggy::proto::runtime::{sync, TokioRuntime};
use iggy::transport_adapter::r#async::AsyncTransportAdapter;
-use integration::test_server::TestServer;
+use integration::test_server::{login_root, TestServer};
+use tokio::time::sleep;
const STREAM_ID: u32 = 1;
const TOPIC_ID: u32 = 1;
@@ -72,12 +74,13 @@ async fn cleanup(system_client: &IggyClient) {
}
-async fn async_send() {
- let mut test_server = TestServer::default();
- test_server.start();
+#[tokio::test]
+async fn test_async_send() {
+ // let mut test_server = TestServer::default();
+ // test_server.start();
let tcp_client_config = TcpClientConfig {
- server_address: test_server.get_raw_tcp_addr().unwrap(),
+ server_address: "127.0.0.1:8090".to_string(),
..TcpClientConfig::default()
};
@@ -85,30 +88,10 @@ async fn async_send() {
let core =
Arc::new(sync::Mutex::new(IggyCore::new(IggyCoreConfig::default())));
let rt: Arc<TokioRuntime> = Arc::new(TokioRuntime{});
let notify = Arc::new(sync::Notify::new());
- let dirver = TokioTcpDriver::new(core, rt.clone(), notify.clone(),
tcp_factory);
- let adapter = AsyncTransportAdapter::new(tcp_factory, rt, core, dirver,
notify);
+ let dirver = TokioTcpDriver::new(core.clone(), rt.clone(), notify.clone(),
tcp_factory.clone());
+ let adapter = Box::new(AsyncTransportAdapter::new(tcp_factory, rt, core,
dirver, notify));
- adapter.connect().await;
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- let client =
Box::new(TcpClient::create(Arc::new(tcp_client_config)).unwrap());
- let client = IggyClient::create(client, None, None);
+ let client = IggyClient::create(adapter, None, None);
client.connect().await.unwrap();
assert!(client.ping().await.is_ok(), "Failed to ping server");
diff --git a/core/sdk/src/connection/mod.rs b/core/sdk/src/connection/mod.rs
index d9024e5c..e028ad38 100644
--- a/core/sdk/src/connection/mod.rs
+++ b/core/sdk/src/connection/mod.rs
@@ -1,6 +1,6 @@
use std::{io::IoSlice, pin::Pin};
-use bytes::Bytes;
+use bytes::{Bytes, BytesMut};
use iggy_common::IggyError;
pub mod tcp;
@@ -20,5 +20,5 @@ pub trait StreamConnectionFactory: ConnectionFactory {
pub trait StreamPair: Send {
fn send_vectored<'a>(&'a mut self, bufs: &'a [IoSlice<'_>]) -> Pin<Box<dyn
Future<Output = Result<(), IggyError>> + Send + 'a>>;
- fn read_buf<'a>(&'a mut self, buf: &'a mut [u8]) -> Pin<Box<dyn
Future<Output = Result<usize, IggyError>> + Send + 'a>>;
+ fn read_buf<'a>(&'a mut self, buf: &'a mut BytesMut) -> Pin<Box<dyn
Future<Output = Result<usize, IggyError>> + Send + 'a>>;
}
diff --git a/core/sdk/src/connection/quic/mod.rs
b/core/sdk/src/connection/quic/mod.rs
index 1733c3b8..0f58b4a1 100644
--- a/core/sdk/src/connection/quic/mod.rs
+++ b/core/sdk/src/connection/quic/mod.rs
@@ -1,7 +1,7 @@
use std::io::IoSlice;
use std::sync::Arc;
use std::{io, net::SocketAddr, pin::Pin, time::Duration};
-use bytes::Bytes;
+use bytes::{Bytes, BytesMut};
use iggy_common::{IggyError, QuicClientConfig};
use rustls::crypto::CryptoProvider;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
@@ -33,7 +33,7 @@ impl StreamPair for QuinnStreamPair {
})
}
- fn read_buf<'a>(&'a mut self, mut buf: &'a mut [u8]) -> Pin<Box<dyn
Future<Output = Result<usize, IggyError>> + Send + 'a>> {
+ fn read_buf<'a>(&'a mut self, mut buf: &'a mut BytesMut) -> Pin<Box<dyn
Future<Output = Result<usize, IggyError>> + Send + 'a>> {
Box::pin(async move {
self.recv.read_buf(&mut buf).await.map_err(|e| {
error!("Failed to read chunk: {e}");
diff --git a/core/sdk/src/connection/tcp/mod.rs
b/core/sdk/src/connection/tcp/mod.rs
index 5c07fffd..51993511 100644
--- a/core/sdk/src/connection/tcp/mod.rs
+++ b/core/sdk/src/connection/tcp/mod.rs
@@ -1,6 +1,7 @@
pub mod tcp;
pub mod tls;
+use bytes::BytesMut;
use futures::{AsyncRead, AsyncWrite};
use tracing::error;
use std::{io, net::SocketAddr, pin::Pin};
@@ -21,6 +22,7 @@ pub trait SocketFactory {
fn connect(&self) -> Pin<Box<dyn Future<Output = io::Result<Self::Stream>>
+ Send>>;
}
+#[derive(Debug)]
pub struct TokioTcpStream {
reader: BufReader<OwnedReadHalf>,
writer: BufWriter<OwnedWriteHalf>,
@@ -32,12 +34,20 @@ impl StreamPair for TokioTcpStream {
bufs: &'a [io::IoSlice<'_>],
) -> Pin<Box<dyn Future<Output = Result<(), iggy_common::IggyError>> +
Send + 'a>> {
Box::pin(async move {
- self.writer.write_vectored(bufs).await.map_err(|e| {
- error!(
- "Failed to write data to the TCP connection: {e}",
- );
- IggyError::TcpError
- })?;
+ for val in bufs {
+ self.writer.write(val).await.map_err(|e| {
+ error!(
+ "Failed to write data to the TCP connection: {e}",
+ );
+ IggyError::TcpError
+ })?;
+ }
+ // self.writer.write_vectored(bufs).await.map_err(|e| {
+ // error!(
+ // "Failed to write data to the TCP connection: {e}",
+ // );
+ // IggyError::TcpError
+ // })?;
self.writer.flush().await.map_err(|e| {
error!(
"Failed to write data to the TCP connection: {e}",
@@ -50,10 +60,10 @@ impl StreamPair for TokioTcpStream {
fn read_buf<'a>(
&'a mut self,
- buf: &'a mut [u8],
+ buf: &'a mut BytesMut,
) -> Pin<Box<dyn Future<Output = Result<usize, iggy_common::IggyError>> +
Send + 'a>> {
Box::pin(async move {
- self.reader.read_exact(buf).await.map_err(|e| {
+ self.reader.read_buf(buf).await.map_err(|e| {
error!(
"Failed to read data from the TCP connection: {e}",
);
diff --git a/core/sdk/src/connection/tcp/tcp.rs
b/core/sdk/src/connection/tcp/tcp.rs
index d2e6f563..cf5757f4 100644
--- a/core/sdk/src/connection/tcp/tcp.rs
+++ b/core/sdk/src/connection/tcp/tcp.rs
@@ -11,6 +11,7 @@ use tracing::error;
pub type TokioCompatStream = tokio_util::compat::Compat<tokio::net::TcpStream>;
+#[derive(Debug)]
pub struct TokioTcpFactory {
pub(crate) config: Arc<TcpClientConfig>,
client_address: Arc<sync::Mutex<Option<SocketAddr>>>,
diff --git a/core/sdk/src/driver/mod.rs b/core/sdk/src/driver/mod.rs
index 672274a8..dd7c7dc6 100644
--- a/core/sdk/src/driver/mod.rs
+++ b/core/sdk/src/driver/mod.rs
@@ -34,6 +34,7 @@ where
pending: Arc<DashMap<u64, sync::OneShotSender<Bytes>>>,
}
+// TODO add errChan
impl<R> Driver for QuicDriver<R>
where
R: Runtime,
@@ -69,7 +70,7 @@ where
if let Err(e) =
stream.send_vectored(&data.as_slices()).await {
error!("Failed to send vectored: {e}");
- continue;
+ break;
}
let mut at_most = cfg.response_buffer_size as usize;
diff --git a/core/sdk/src/driver/tcp.rs b/core/sdk/src/driver/tcp.rs
index d6809edf..2bfabe29 100644
--- a/core/sdk/src/driver/tcp.rs
+++ b/core/sdk/src/driver/tcp.rs
@@ -7,6 +7,7 @@ use tracing::error;
use crate::{connection::{tcp::tcp::TokioTcpFactory, StreamPair},
driver::Driver, proto::{connection::{IggyCore, InboundResult}, runtime::{sync,
Runtime}}};
+#[derive(Debug)]
pub struct TokioTcpDriver<R>
where
R: Runtime
@@ -48,6 +49,7 @@ where
let mut rx_buf = BytesMut::new();
loop {
nt.notified().await;
+ // TODO убирать txBuf, если не удается его прочитать и
отсылать ошибку
while let Some(data) = {
let mut guard = core.lock().await;
guard.poll_transmit()
diff --git a/core/sdk/src/proto/connection.rs b/core/sdk/src/proto/connection.rs
index 8d5e396d..2517b0d4 100644
--- a/core/sdk/src/proto/connection.rs
+++ b/core/sdk/src/proto/connection.rs
@@ -18,6 +18,7 @@ const ALREADY_EXISTS_STATUSES: &[u32] = &[
IggyErrorDiscriminants::ConsumerGroupNameAlreadyExists as u32,
];
+#[derive(Debug)]
pub struct IggyCoreConfig {
max_retries: Option<u32>,
reestablish_after: IggyDuration,
@@ -29,6 +30,7 @@ impl Default for IggyCoreConfig {
}
}
+#[derive(Debug)]
pub struct TxBuf {
pub id: u64,
hdr_len: [u8; 4],
@@ -63,6 +65,7 @@ pub enum InboundResult {
Error(IggyError),
}
+#[derive(Debug)]
pub struct IggyCore {
pub(crate) state: ClientState,
last_connect: Option<IggyTimestamp>,
diff --git a/core/sdk/src/proto/runtime.rs b/core/sdk/src/proto/runtime.rs
index 022a5adf..3bb7bf0b 100644
--- a/core/sdk/src/proto/runtime.rs
+++ b/core/sdk/src/proto/runtime.rs
@@ -26,6 +26,7 @@ pub fn notify() -> sync::Notify {
tokio::sync::Notify::new()
}
+#[derive(Debug)]
pub struct TokioRuntime;
impl Runtime for TokioRuntime {
diff --git a/core/sdk/src/transport_adapter/async.rs
b/core/sdk/src/transport_adapter/async.rs
index 2ca08c89..7f422bad 100644
--- a/core/sdk/src/transport_adapter/async.rs
+++ b/core/sdk/src/transport_adapter/async.rs
@@ -5,7 +5,7 @@ use async_broadcast::{Receiver, Sender, broadcast};
use async_trait::async_trait;
use bytes::Bytes;
-use iggy_binary_protocol::BinaryTransport;
+use iggy_binary_protocol::{BinaryClient, BinaryTransport, Client};
use iggy_common::{ClientState, Command, DiagnosticEvent, IggyDuration,
IggyError};
use tokio::sync::Notify;
use tracing::{error, trace};
@@ -20,6 +20,7 @@ use crate::{
transport_adapter::RespFut,
};
+#[derive(Debug)]
pub struct AsyncTransportAdapter<F: ConnectionFactory, R: Runtime, D: Driver> {
factory: Arc<F>,
rt: Arc<R>,
@@ -77,6 +78,8 @@ where
return Ok(());
}
Err(e) => {
+ error!("got error: {e}");
+ panic!("{e}");
self.core.lock().await.on_transport_disconnected();
order = self.core.lock().await.poll_connect()?;
if matches!(order, Order::Noop) {
@@ -165,3 +168,34 @@ where
}
}
}
+
+#[async_trait]
+impl<F, R, D> Client for AsyncTransportAdapter<F, R, D>
+where
+ F: ConnectionFactory + Send + Sync + 'static + std::fmt::Debug,
+ R: Runtime + Send + Sync + 'static + std::fmt::Debug,
+ D: Driver + Send + Sync + std::fmt::Debug,
+{
+ async fn connect(&self) -> Result<(), IggyError> {
+ AsyncTransportAdapter::connect(self).await
+ }
+
+ async fn disconnect(&self) -> Result<(), IggyError> {
+ AsyncTransportAdapter::shutdown(self).await
+ }
+
+ async fn shutdown(&self) -> Result<(), IggyError> {
+ AsyncTransportAdapter::shutdown(self).await
+ }
+
+ async fn subscribe_events(&self) -> Receiver<DiagnosticEvent> {
+ self.events.1.clone()
+ }
+}
+
+impl<F, R, D> BinaryClient for AsyncTransportAdapter<F, R, D>
+where
+ F: ConnectionFactory + Send + Sync + 'static + std::fmt::Debug,
+ R: Runtime + Send + Sync + 'static + std::fmt::Debug,
+ D: Driver + Send + Sync + std::fmt::Debug,
+{}