This is an automated email from the ASF dual-hosted git repository.
bashirbekov pushed a commit to branch draft-sans
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/draft-sans by this push:
new b9e15339 del
b9e15339 is described below
commit b9e1533989cffe7b90fc87e313614385e5338bef
Author: haze518 <[email protected]>
AuthorDate: Thu Jul 10 07:23:07 2025 +0600
del
---
core/sdk/src/connection/mod.rs | 15 ++++++++++++++-
core/sdk/src/connection/quic/mod.rs | 26 ++++++++++++--------------
core/sdk/src/connection/tcp/mod.rs | 13 +++++++++++++
core/sdk/src/driver/mod.rs | 29 ++++++++++++++---------------
core/sdk/src/proto/connection.rs | 7 +++++--
core/sdk/src/transport_adapter/async.rs | 2 +-
6 files changed, 59 insertions(+), 33 deletions(-)
diff --git a/core/sdk/src/connection/mod.rs b/core/sdk/src/connection/mod.rs
index d65a1318..f6e06522 100644
--- a/core/sdk/src/connection/mod.rs
+++ b/core/sdk/src/connection/mod.rs
@@ -1,5 +1,6 @@
-use std::pin::Pin;
+use std::{io::IoSlice, pin::Pin};
+use bytes::Bytes;
use iggy_common::IggyError;
pub mod tcp;
@@ -10,3 +11,15 @@ pub trait ConnectionFactory {
fn is_alive(&self) -> Pin<Box<dyn Future<Output = bool>>>;
fn shutdown(&self) -> Pin<Box<dyn Future<Output = Result<(), IggyError>> +
Send + Sync>>;
}
+
+pub trait StreamConnectionFactory: ConnectionFactory {
+ type Stream: StreamPair;
+
+ fn open_stream(&self) -> Pin<Box<dyn Future<Output = Result<Self::Stream,
IggyError>> + Send + '_>>;
+}
+
+pub trait StreamPair: Send {
+ fn send_vectored<'a>(&'a mut self, bufs: &'a [IoSlice<'_>]) -> Pin<Box<dyn
Future<Output = Result<(), IggyError>> + Send + 'a>>;
+ fn read_chunk<'a>(&'a mut self, at_most: usize) -> Pin<Box<dyn
Future<Output = Result<Option<Bytes>, IggyError>> + Send + 'a>>;
+ fn read_buf<'a>(&'a mut self, buf: &'a mut [u8]) -> Pin<Box<dyn
Future<Output = Result<usize, IggyError>> + Send + 'a>>;
+}
diff --git a/core/sdk/src/connection/quic/mod.rs
b/core/sdk/src/connection/quic/mod.rs
index 422e7f38..647c8531 100644
--- a/core/sdk/src/connection/quic/mod.rs
+++ b/core/sdk/src/connection/quic/mod.rs
@@ -4,26 +4,15 @@ use std::{io, net::SocketAddr, pin::Pin, time::Duration};
use bytes::Bytes;
use iggy_common::{IggyError, QuicClientConfig};
use rustls::crypto::CryptoProvider;
-use tokio::io::AsyncWriteExt;
+use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tracing::{error, warn};
-use crate::connection::ConnectionFactory;
+use crate::connection::{ConnectionFactory, StreamConnectionFactory,
StreamPair};
use crate::proto::runtime::sync;
use crate::quic::skip_server_verification::SkipServerVerification;
use quinn::crypto::rustls::QuicClientConfig as QuinnQuicClientConfig;
use quinn::{ClientConfig, Connection, Endpoint, IdleTimeout, RecvStream,
SendStream, VarInt};
-pub trait StreamPair: Send {
- fn send_vectored<'a>(&'a mut self, bufs: &'a [IoSlice<'_>]) -> Pin<Box<dyn
Future<Output = Result<(), IggyError>> + Send + 'a>>;
- fn read_chunk<'a>(&'a mut self, at_most: usize) -> Pin<Box<dyn
Future<Output = Result<Option<Bytes>, IggyError>> + Send + 'a>>;
-}
-
-pub trait QuicFactory: ConnectionFactory {
- type Stream: StreamPair;
-
- fn open_stream(&self) -> Pin<Box<dyn Future<Output = Result<Self::Stream,
IggyError>> + Send + '_>>;
-}
-
pub struct QuinnStreamPair {
send: SendStream,
recv: RecvStream,
@@ -56,6 +45,15 @@ impl StreamPair for QuinnStreamPair {
Ok(None)
})
}
+
+ fn read_buf<'a>(&'a mut self, mut buf: &'a mut [u8]) -> Pin<Box<dyn
Future<Output = Result<usize, IggyError>> + Send + 'a>> {
+ Box::pin(async move {
+ self.recv.read_buf(&mut buf).await.map_err(|e| {
+ error!("Failed to read chunk: {e}");
+ IggyError::QuicError
+ })
+ })
+ }
}
pub struct QuinnFactory {
@@ -109,7 +107,7 @@ impl ConnectionFactory for QuinnFactory {
}
}
-impl QuicFactory for QuinnFactory {
+impl StreamConnectionFactory for QuinnFactory {
type Stream = QuinnStreamPair;
fn open_stream(&self) -> Pin<Box<dyn Future<Output = Result<Self::Stream,
IggyError>> + Send + '_>> {
diff --git a/core/sdk/src/connection/tcp/mod.rs
b/core/sdk/src/connection/tcp/mod.rs
index f4774631..c4f77003 100644
--- a/core/sdk/src/connection/tcp/mod.rs
+++ b/core/sdk/src/connection/tcp/mod.rs
@@ -3,6 +3,9 @@ pub mod tls;
use std::{io, net::SocketAddr, pin::Pin};
use futures::{AsyncRead, AsyncWrite};
+use tokio::{io::{AsyncReadExt, BufReader}, net::tcp::OwnedReadHalf};
+
+use crate::connection::StreamPair;
pub trait AsyncStream: AsyncRead + AsyncWrite + Unpin + Send + 'static {}
impl<T> AsyncStream for T where T: AsyncRead + AsyncWrite + Unpin + Send +
'static {}
@@ -13,3 +16,13 @@ pub trait SocketFactory {
fn connect(&self) -> Pin<Box<dyn Future<Output = io::Result<Self::Stream>>
+ Send>>;
}
+pub struct TokioTcpStream {
+ reader: BufReader<OwnedReadHalf>,
+ buf:
+}
+
+impl StreamPair for TokioTcpStream {
+ fn read_chunk<'a>(&'a mut self, at_most: usize) -> Pin<Box<dyn
Future<Output = Result<Option<bytes::Bytes>, iggy_common::IggyError>> + Send +
'a>> {
+
+ }
+}
diff --git a/core/sdk/src/driver/mod.rs b/core/sdk/src/driver/mod.rs
index 114a12f7..1d794f53 100644
--- a/core/sdk/src/driver/mod.rs
+++ b/core/sdk/src/driver/mod.rs
@@ -1,13 +1,14 @@
use std::sync::Arc;
-use bytes::Bytes;
+use bytes::{Bytes, BytesMut};
use dashmap::DashMap;
use iggy_common::{IggyError, QuicClientConfig};
use tokio::io::AsyncWriteExt;
use tracing::{error, info, trace, warn};
use crate::{
- connection::quic::{QuicFactory, QuinnFactory, StreamPair},
+ connection::{StreamConnectionFactory, StreamPair},
+ connection::quic::QuinnFactory,
proto::{
connection::{IggyCore, InboundResult},
runtime::{Runtime, sync},
@@ -43,6 +44,7 @@ where
let cfg = self.config.clone();
let pending = self.pending.clone();
rt.spawn(Box::pin(async move {
+ let mut rx_buf = BytesMut::with_capacity(cfg.response_buffer_size
as usize);
loop {
nt.notified().await;
@@ -70,31 +72,28 @@ where
let mut at_most = cfg.response_buffer_size as usize;
loop {
- let buffer = match stream.read_chunk(at_most).await {
- Ok(Some(buf)) => buf,
- Ok(None) => {
- error!("Unexpected EOF in stream");
- break;
- }
- Err(e) => {
- error!("Failed to read response data: {e}");
- break;
- }
+ rx_buf.reserve(at_most);
+
+ match stream.read_buf(&mut rx_buf).await {
+ Ok(0) => { error!("EOF before header/body");
break }
+ Ok(n) => n,
+ Err(e) => { error!("read_buf failed: {e}");
break }
};
let inbound = {
let mut guard = core.lock().await;
- guard.feed_inbound(&buffer)
+ guard.feed_inbound(&rx_buf[..])
};
match inbound {
InboundResult::Need(need) => at_most = need,
- InboundResult::Response(r) => {
+ InboundResult::Ready => {
if let Some((_key, tx)) =
pending.remove(&data.id) {
- let _ = tx.send(r);
+ let _ = tx.send(rx_buf);
}
let mut guard = core.lock().await;
guard.mark_tx_done();
+ rx_buf.clear();
break;
}
InboundResult::Error(e) => {
diff --git a/core/sdk/src/proto/connection.rs b/core/sdk/src/proto/connection.rs
index 19c04ebb..d09a6fc9 100644
--- a/core/sdk/src/proto/connection.rs
+++ b/core/sdk/src/proto/connection.rs
@@ -1,6 +1,6 @@
-use std::{collections::VecDeque, pin::Pin, sync::Arc};
+use std::{collections::VecDeque, io::Cursor, pin::Pin, sync::Arc};
-use bytes::{Bytes, BytesMut};
+use bytes::{BufMut, Bytes, BytesMut};
use iggy_common::{ClientState, Command, IggyDuration, IggyError,
IggyErrorDiscriminants, IggyTimestamp};
use std::io::IoSlice;
use tracing::{error, trace};
@@ -160,6 +160,9 @@ impl IggyCore {
self.current_tx = None
}
+ // TODO делоает копию сейчас
+ // нужно сделать так, чтобы оно либо говорило с какого индекса по какой
payload в ready статусе и при этом убрать rx_buf
+ // либо должно копить у себя и передавать свой буфер наверх, который будет
заполняться
pub fn feed_inbound(&mut self, bytes: &[u8]) -> InboundResult {
if self.rx_buf.len() < RESPONSE_INITIAL_BYTES_LENGTH {
let need = RESPONSE_INITIAL_BYTES_LENGTH - self.rx_buf.len();
diff --git a/core/sdk/src/transport_adapter/async.rs
b/core/sdk/src/transport_adapter/async.rs
index e0a90446..33b0a27d 100644
--- a/core/sdk/src/transport_adapter/async.rs
+++ b/core/sdk/src/transport_adapter/async.rs
@@ -10,7 +10,7 @@ use tokio::sync::Notify;
use tracing::{error, trace};
use crate::{
- connection::{quic::QuicFactory, ConnectionFactory},
+ connection::ConnectionFactory,
driver::Driver,
proto::{
connection::{IggyCore, Order},