This is an automated email from the ASF dual-hosted git repository.
bashirbekov pushed a commit to branch draft-sans
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/draft-sans by this push:
new a9252e1b test
a9252e1b is described below
commit a9252e1b4c473f289216173396df358c27d10351
Author: haze518 <[email protected]>
AuthorDate: Mon Jul 14 11:00:31 2025 +0600
test
---
core/integration/tests/sdk/producer/mod.rs | 6 +++---
core/sdk/src/connection/tcp/mod.rs | 2 +-
core/sdk/src/driver/tcp.rs | 8 +++-----
core/sdk/src/proto/connection.rs | 17 ++++++++++++-----
4 files changed, 19 insertions(+), 14 deletions(-)
diff --git a/core/integration/tests/sdk/producer/mod.rs
b/core/integration/tests/sdk/producer/mod.rs
index 90f9fe83..7834d903 100644
--- a/core/integration/tests/sdk/producer/mod.rs
+++ b/core/integration/tests/sdk/producer/mod.rs
@@ -76,11 +76,11 @@ async fn cleanup(system_client: &IggyClient) {
#[tokio::test]
async fn test_async_send() {
- // let mut test_server = TestServer::default();
- // test_server.start();
+ let mut test_server = TestServer::default();
+ test_server.start();
let tcp_client_config = TcpClientConfig {
- server_address: "127.0.0.1:8090".to_string(),
+ server_address: test_server.get_raw_tcp_addr().unwrap(),
..TcpClientConfig::default()
};
diff --git a/core/sdk/src/connection/tcp/mod.rs
b/core/sdk/src/connection/tcp/mod.rs
index 3a6a15d8..5f733990 100644
--- a/core/sdk/src/connection/tcp/mod.rs
+++ b/core/sdk/src/connection/tcp/mod.rs
@@ -36,7 +36,7 @@ impl StreamPair for TokioTcpStream {
Box::pin(async move {
let mut w = self.writer.lock().await;
w.write_vectored(bufs).await.map_err(|_| IggyError::TcpError)?;
- w.flush().await.map_err(|_| IggyError::TcpError)?;
+ // w.flush().await.map_err(|_| IggyError::TcpError)?;
Ok(())
// for val in bufs {
// self.writer.write(val).await.map_err(|e| {
diff --git a/core/sdk/src/driver/tcp.rs b/core/sdk/src/driver/tcp.rs
index 0e1eec26..818f226b 100644
--- a/core/sdk/src/driver/tcp.rs
+++ b/core/sdk/src/driver/tcp.rs
@@ -1,6 +1,6 @@
use std::{io::Cursor, sync::Arc};
-use bytes::{Bytes, BytesMut};
+use bytes::{Buf, Bytes, BytesMut};
use dashmap::DashMap;
use iggy_common::IggyError;
use tracing::error;
@@ -88,17 +88,15 @@ where
}
};
- let buf = Cursor::new(&rx_buf[..]);
-
let inbound = {
let mut guard = core.lock().await;
- guard.feed_inbound(buf)
+ guard.feed_inbound(&rx_buf[..])
};
match inbound {
InboundResult::Need(need) => at_most = need,
InboundResult::Ready(start, end) => {
- let _ = rx_buf.split_to(start);
+ rx_buf.advance(start);
let frame = rx_buf.split_to(end -
start).freeze();
if let Some((_k, tx)) =
pending.remove(&data.id) {
let _ = tx.send(frame);
diff --git a/core/sdk/src/proto/connection.rs b/core/sdk/src/proto/connection.rs
index 4da78360..62e86264 100644
--- a/core/sdk/src/proto/connection.rs
+++ b/core/sdk/src/proto/connection.rs
@@ -183,14 +183,21 @@ impl IggyCore {
RESPONSE_INITIAL_BYTES_LENGTH
}
- pub fn feed_inbound(&mut self, mut cur: Cursor<&[u8]>) -> InboundResult {
- let buf_len = cur.get_ref().len();
+ pub fn feed_inbound(&mut self, cur: &[u8]) -> InboundResult {
+ let buf_len = cur.len();
if buf_len < RESPONSE_INITIAL_BYTES_LENGTH {
return InboundResult::Need(RESPONSE_INITIAL_BYTES_LENGTH -
buf_len);
}
- let status = cur.get_u32_le();
- let length = cur.get_u32_le();
+ let status = match cur[..4].try_into() {
+ Ok(bytes) => u32::from_le_bytes(bytes),
+ Err(_) => return
InboundResult::Error(IggyError::InvalidNumberEncoding),
+ };
+
+ let length = match cur[4..8].try_into() {
+ Ok(bytes) => u32::from_le_bytes(bytes),
+ Err(_) => return
InboundResult::Error(IggyError::InvalidNumberEncoding),
+ };
if status != 0 {
if ALREADY_EXISTS_STATUSES.contains(&status) {
@@ -219,7 +226,7 @@ impl IggyCore {
return InboundResult::Need(total - buf_len);
}
- InboundResult::Ready(cur.position() as usize, total)
+ InboundResult::Ready(8, total)
}
pub fn on_transport_connected(&mut self) {