This is an automated email from the ASF dual-hosted git repository.
bashirbekov pushed a commit to branch draft-sans-new
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/draft-sans-new by this push:
new 85c68e54 test
85c68e54 is described below
commit 85c68e544e36b01cf36e43503356dc2a60a01c77
Author: haze518 <[email protected]>
AuthorDate: Mon Aug 18 09:13:25 2025 +0600
test
---
CLAUDE.md | 0
Cargo.lock | 1 +
core/integration/src/tcp_client.rs | 18 ++++----
core/sdk/Cargo.toml | 1 +
core/sdk/src/connection/mod.rs | 86 ++++++++++++++++++--------------------
results | 35 ++++++----------
6 files changed, 65 insertions(+), 76 deletions(-)
diff --git a/CLAUDE.md b/CLAUDE.md
deleted file mode 100644
index e69de29b..00000000
diff --git a/Cargo.lock b/Cargo.lock
index 6abbab02..514bf0e4 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -3818,6 +3818,7 @@ dependencies = [
"iggy_common",
"mockall",
"num_cpus",
+ "parking_lot 0.12.4",
"quinn",
"reqwest",
"reqwest-middleware",
diff --git a/core/integration/src/tcp_client.rs
b/core/integration/src/tcp_client.rs
index 6b9af2bb..ea0f816f 100644
--- a/core/integration/src/tcp_client.rs
+++ b/core/integration/src/tcp_client.rs
@@ -48,18 +48,18 @@ impl ClientFactory for TcpClientFactory {
Box::pin(tokio_tcp(addr))
});
- // let client = NewTcpClient::create(Arc::new(config),
factory).unwrap_or_else(|e| {
- // panic!(
- // "Failed to create NewTcpClient, iggy-server has address {},
error: {:?}",
- // self.server_addr, e
- // )
- // });
- let client = TcpClient::create(Arc::new(config)).unwrap_or_else(|e| {
+ let client = NewTcpClient::create(Arc::new(config),
factory).unwrap_or_else(|e| {
panic!(
- "Failed to create TcpClient, iggy-server has address {},
error: {:?}",
+ "Failed to create NewTcpClient, iggy-server has address {},
error: {:?}",
self.server_addr, e
)
});
+ // let client = TcpClient::create(Arc::new(config)).unwrap_or_else(|e|
{
+ // panic!(
+ // "Failed to create TcpClient, iggy-server has address {},
error: {:?}",
+ // self.server_addr, e
+ // )
+ // });
Client::connect(&client).await.unwrap_or_else(|e| {
if self.tls_enabled {
@@ -77,7 +77,7 @@ impl ClientFactory for TcpClientFactory {
)
}
});
- ClientWrapper::Tcp(client)
+ ClientWrapper::New(client)
}
fn transport(&self) -> Transport {
diff --git a/core/sdk/Cargo.toml b/core/sdk/Cargo.toml
index 47554f6d..351427c4 100644
--- a/core/sdk/Cargo.toml
+++ b/core/sdk/Cargo.toml
@@ -51,6 +51,7 @@ futures-util = { workspace = true }
iggy_binary_protocol = { workspace = true }
iggy_common = { workspace = true }
num_cpus = "1.17.0"
+parking_lot = "0.12.4"
quinn = { workspace = true }
reqwest = { workspace = true }
reqwest-middleware = { workspace = true }
diff --git a/core/sdk/src/connection/mod.rs b/core/sdk/src/connection/mod.rs
index 0d3b8b70..836cb121 100644
--- a/core/sdk/src/connection/mod.rs
+++ b/core/sdk/src/connection/mod.rs
@@ -1,23 +1,13 @@
use std::{
- collections::{HashMap, VecDeque},
- fmt::Debug,
- io,
- net::SocketAddr,
- ops::Deref,
- pin::Pin,
- str::FromStr,
- sync::{
- Arc, Mutex,
- atomic::{AtomicBool, AtomicU64, Ordering},
- },
- task::{Context, Poll, Waker, ready},
- time::Duration,
+ collections::{HashMap, VecDeque}, fmt::Debug, io, mem::MaybeUninit,
net::SocketAddr, ops::Deref, pin::Pin, str::FromStr, sync::{
+ atomic::{AtomicBool, AtomicU64, Ordering}, Arc
+ }, task::{ready, Context, Poll, Waker}, time::Duration
};
use crate::protocol::{ControlAction, ProtocolCore, ProtocolCoreConfig,
Response, TxBuf};
use async_broadcast::{Receiver, Sender, broadcast};
use async_trait::async_trait;
-use bytes::{Buf, Bytes, BytesMut};
+use bytes::{Buf, BufMut, Bytes, BytesMut};
use futures::{AsyncRead, AsyncWrite, FutureExt, future::poll_fn,
task::AtomicWaker};
use iggy_binary_protocol::{BinaryClient, BinaryTransport, Client};
use iggy_common::{
@@ -25,9 +15,9 @@ use iggy_common::{
};
use tokio::{
net::TcpStream,
- sync::Mutex as TokioMutex,
time::{Sleep, sleep},
};
+use parking_lot::Mutex;
use tokio_util::compat::{Compat, TokioAsyncReadCompatExt,
TokioAsyncWriteCompatExt};
use tracing::{debug, error, info, trace, warn};
@@ -114,8 +104,8 @@ impl<S: AsyncIO> ConnectionRef<S> {
socket: None,
current_send: None,
send_offset: 0,
- recv_buffer: BytesMut::with_capacity(16384), // Pre-allocate
larger buffer
- recv_scratch: vec![0u8; 8192], // Reusable scratch buffer
+ recv_buffer: BytesMut::with_capacity(16 * 1024), //
Pre-allocate larger buffer
+ // recv_scratch: vec![0u8; 8192], // Reusable scratch buffer
wait_timer: None,
config,
waiters: Arc::new(Waiters {
@@ -134,7 +124,7 @@ impl<S: AsyncIO> ConnectionRef<S> {
impl<S: AsyncIO> ConnectionRef<S> {
fn state(&self) -> ClientState {
- let state = self.0.state.lock().unwrap();
+ let state = self.0.state.lock();
state.inner.core.state()
}
}
@@ -173,7 +163,7 @@ impl<T> Waiters<T> {
}
fn alloc(&self) -> RequestId {
let id = self.next_id.fetch_add(1, Ordering::Relaxed);
- self.map.lock().unwrap().insert(
+ self.map.lock().insert(
id,
WaitEntry {
waker: AtomicWaker::new(),
@@ -183,7 +173,7 @@ impl<T> Waiters<T> {
id
}
fn complete(&self, id: RequestId, val: T) -> bool {
- if let Some(entry) = self.map.lock().unwrap().get_mut(&id) {
+ if let Some(entry) = self.map.lock().get_mut(&id) {
entry.result = Some(val);
entry.waker.wake();
true
@@ -201,7 +191,7 @@ struct WaitFuture<T> {
impl<T> Future for WaitFuture<T> {
type Output = T;
fn poll(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
- let mut waiters = self.waiters.map.lock().unwrap();
+ let mut waiters = self.waiters.map.lock();
if let Some(val) = {
if let Some(e) = waiters.get_mut(&self.id) {
e.result.take()
@@ -245,7 +235,8 @@ where
current_send: Option<TxBuf>,
send_offset: usize,
recv_buffer: BytesMut,
- recv_scratch: Vec<u8>, // Reusable scratch buffer to avoid allocations
+
+ // recv_scratch: Vec<u8>, // Reusable scratch buffer to avoid allocations
wait_timer: Option<Pin<Box<Sleep>>>,
config: Arc<TcpClientConfig>,
@@ -450,40 +441,45 @@ where
}
fn drive_receive(&mut self, cx: &mut Context<'_>) -> io::Result<bool> {
- if self.socket.is_none() {
- // No socket means we're not connected - this is not an error state
- return Ok(false);
- }
+ // let Some(socket) = self.socket.as_mut() else {
+ // return Ok(false); // нет сокета — нет приёма
+ // };
let mut progress = false;
- // Limit read attempts to prevent blocking the executor
for _ in 0..16 {
+ if self.recv_buffer.spare_capacity_mut().is_empty() {
+ self.recv_buffer.reserve(8192);
+ }
+
+ let spare: &mut [MaybeUninit<u8>] =
self.recv_buffer.spare_capacity_mut();
+
+ let buf: &mut [u8] = unsafe {
+ &mut *(spare as *mut [MaybeUninit<u8>] as *mut [u8])
+ };
+
let n = {
- let socket = self
- .socket
+ let socket = self.socket
.as_mut()
.ok_or(io::Error::new(io::ErrorKind::NotConnected, "No
socket"))?;
- let mut pinned = Pin::new(&mut *socket);
- match pinned.as_mut().poll_read(cx, &mut self.recv_scratch)? {
+ match Pin::new(&mut *socket).poll_read(cx, buf)? {
Poll::Pending => return Ok(progress),
Poll::Ready(0) => {
- // EOF - connection was closed gracefully
- tracing::debug!("EOF detected, initiating graceful
disconnect");
self.inner.core.disconnect();
self.socket = None;
- // Complete all pending waiters with connection error
self.complete_all_waiters_with_error(IggyError::CannotEstablishConnection);
- return Ok(true); // Made progress by handling
disconnect
+ return Ok(true);
}
Poll::Ready(n) => n,
}
};
- self.recv_buffer.extend_from_slice(&self.recv_scratch[..n]);
+
+ unsafe { self.recv_buffer.advance_mut(n); }
+
self.process_incoming()?;
progress = true;
}
-
+
Ok(progress)
}
@@ -543,7 +539,7 @@ impl<S: AsyncIO> Future for ConnectionDriver<S> {
type Output = Result<(), io::Error>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
- let st = &mut *self.0.state.lock().unwrap();
+ let st = &mut *self.0.state.lock();
let mut keep_going = false;
// 1) First process user commands
@@ -692,7 +688,7 @@ impl<S: AsyncIO + Send + Sync + 'static> NewTcpClient<S> {
async fn send_raw(&self, code: u32, payload: Bytes) -> Result<Bytes,
IggyError> {
let (wait_future, waker) = {
- let mut state = self.state.0.state.lock().unwrap();
+ let mut state = self.state.0.state.lock();
state.enqueue_message(code, payload)
};
// Wake outside of the lock to avoid deadlock
@@ -708,7 +704,7 @@ impl<S: AsyncIO + Debug + Send + Sync + 'static> Client for
NewTcpClient<S> {
async fn connect(&self) -> Result<(), IggyError> {
let address =
SocketAddr::from_str(&self.config.server_address).unwrap();
let (fut, waker) = {
- let mut state = self.state.0.state.lock().unwrap();
+ let mut state = self.state.0.state.lock();
state.enqueu_command(ClientCommand::Connect(address))
};
// Wake outside of the lock to avoid deadlock
@@ -721,7 +717,7 @@ impl<S: AsyncIO + Debug + Send + Sync + 'static> Client for
NewTcpClient<S> {
async fn disconnect(&self) -> Result<(), IggyError> {
let (fut, waker) = {
- let mut state = self.state.0.state.lock().unwrap();
+ let mut state = self.state.0.state.lock();
state.enqueu_command(ClientCommand::Disconnect)
};
// Wake outside of the lock to avoid deadlock
@@ -734,7 +730,7 @@ impl<S: AsyncIO + Debug + Send + Sync + 'static> Client for
NewTcpClient<S> {
async fn shutdown(&self) -> Result<(), IggyError> {
let (fut, waker) = {
- let mut state = self.state.0.state.lock().unwrap();
+ let mut state = self.state.0.state.lock();
state.enqueu_command(ClientCommand::Shutdown)
};
// Wake outside of the lock to avoid deadlock
@@ -789,7 +785,7 @@ impl<S: AsyncIO> Drop for NewTcpClient<S> {
// Use disconnect instead of shutdown to allow future reconnections
{
tracing::debug!("NewTcpClient Drop: Signaling disconnect to
driver");
- let mut state = self.state.0.state.lock().unwrap();
+ let mut state = self.state.0.state.lock();
state.inner.core.disconnect();
let waker = state.take_waker();
drop(state); // Drop lock early
@@ -805,6 +801,6 @@ impl<S: AsyncIO> Drop for NewTcpClient<S> {
}
}
-#[cfg(test)]
-mod tests;
+// #[cfg(test)]
+// mod tests;
diff --git a/results b/results
index 405f2a5a..87dfc7f0 100644
--- a/results
+++ b/results
@@ -1,34 +1,25 @@
-original
+origin mu
-2025-07-31T05:36:07.386650Z INFO bench_report::prints: Benchmark: Pinned
Producer And Consumer, 8 producers, 8 consumers, 8 streams, 1 topic per stream,
1 partitions per topic, 6400000 messages, 200 messages per batch, 32000 message
batches, 1000 bytes per message, 6.4GB of data processed
+2025-08-18T02:45:09.210113Z INFO bench_report::prints: Producers Results:
Total throughput: 1351.77 MB/s, 1351766 messages/s, average throughput per
Producer: 168.97 MB/s, p50 latency: 0.66 ms, p90 latency: 1.90 ms, p95 latency:
4.26 ms, p99 latency: 9.89 ms, p999 latency: 14.34 ms, p9999 latency: 78.54 ms,
average latency: 1.18 ms, median latency: 0.66 ms, min: 0.28 ms, max: 93.78 ms,
std dev: 1.53 ms, total time: 2.43 s
-2025-07-31T05:36:07.386667Z INFO bench_report::prints: Producers Results:
Total throughput: 1422.49 MB/s, 1422490 messages/s, average throughput per
Producer: 177.81 MB/s, p50 latency: 0.95 ms, p90 latency: 1.74 ms, p95 latency:
2.25 ms, p99 latency: 4.45 ms, p999 latency: 11.78 ms, p9999 latency: 31.65 ms,
average latency: 1.11 ms, median latency: 0.95 ms, min: 0.28 ms, max: 33.41 ms,
std dev: 0.52 ms, total time: 2.25 s
+2025-08-18T02:45:09.210122Z INFO bench_report::prints: Consumers Results:
Total throughput: 1122.59 MB/s, 1122586 messages/s, average throughput per
Consumer: 140.32 MB/s, p50 latency: 0.76 ms, p90 latency: 2.23 ms, p95 latency:
4.94 ms, p99 latency: 10.65 ms, p999 latency: 52.93 ms, p9999 latency: 88.92
ms, average latency: 1.41 ms, median latency: 0.76 ms, min: 0.33 ms, max: 93.86
ms, std dev: 2.58 ms, total time: 2.85 s
-2025-07-31T05:36:07.386673Z INFO bench_report::prints: Consumers Results:
Total throughput: 1421.86 MB/s, 1421858 messages/s, average throughput per
Consumer: 177.73 MB/s, p50 latency: 0.87 ms, p90 latency: 1.73 ms, p95 latency:
2.24 ms, p99 latency: 4.63 ms, p999 latency: 11.98 ms, p9999 latency: 32.95 ms,
average latency: 1.05 ms, median latency: 0.87 ms, min: 0.28 ms, max: 40.72 ms,
std dev: 0.85 ms, total time: 2.25 s
+2025-08-18T02:45:09.210209Z INFO bench_report::prints: Aggregate Results:
Total throughput: 2474.35 MB/s, 2474352 messages/s, average throughput per
Actor: 154.65 MB/s, p50 latency: 0.71 ms, p90 latency: 2.07 ms, p95 latency:
4.60 ms, p99 latency: 10.27 ms, p999 latency: 33.63 ms, p9999 latency: 83.73
ms, average latency: 1.29 ms, median latency: 0.71 ms, min: 0.28 ms, max: 93.86
ms, std dev: 2.54 ms, total time: 2.85 s
-2025-07-31T05:36:07.386677Z INFO bench_report::prints: Aggregate Results:
Total throughput: 2844.35 MB/s, 2844348 messages/s, average throughput per
Actor: 177.77 MB/s, p50 latency: 0.91 ms, p90 latency: 1.73 ms, p95 latency:
2.24 ms, p99 latency: 4.54 ms, p999 latency: 11.88 ms, p9999 latency: 32.30 ms,
average latency: 1.08 ms, median latency: 0.91 ms, min: 0.28 ms, max: 40.72 ms,
std dev: 0.82 ms, total time: 2.25 s
+------------------------------------------------------------------------------------------------------------
+parking_lot mu
+2025-08-18T02:49:38.057470Z INFO bench_report::prints: Producers Results:
Total throughput: 1562.26 MB/s, 1562260 messages/s, average throughput per
Producer: 195.28 MB/s, p50 latency: 0.60 ms, p90 latency: 1.53 ms, p95 latency:
3.55 ms, p99 latency: 8.94 ms, p999 latency: 13.70 ms, p9999 latency: 19.95 ms,
average latency: 1.01 ms, median latency: 0.60 ms, min: 0.28 ms, max: 14.94 ms,
std dev: 0.16 ms, total time: 2.13 s
-------------------------------------------------------------------------------------------------------------------------------------------------
+2025-08-18T02:49:38.057476Z INFO bench_report::prints: Consumers Results:
Total throughput: 1396.08 MB/s, 1396084 messages/s, average throughput per
Consumer: 174.51 MB/s, p50 latency: 0.67 ms, p90 latency: 1.60 ms, p95 latency:
3.62 ms, p99 latency: 9.27 ms, p999 latency: 14.32 ms, p9999 latency: 20.57 ms,
average latency: 1.09 ms, median latency: 0.67 ms, min: 0.26 ms, max: 15.46 ms,
std dev: 0.27 ms, total time: 2.22 s
-new
+2025-08-18T02:49:38.057516Z INFO bench_report::prints: Aggregate Results:
Total throughput: 2958.34 MB/s, 2958345 messages/s, average throughput per
Actor: 184.90 MB/s, p50 latency: 0.64 ms, p90 latency: 1.57 ms, p95 latency:
3.59 ms, p99 latency: 9.11 ms, p999 latency: 14.01 ms, p9999 latency: 20.26 ms,
average latency: 1.05 ms, median latency: 0.64 ms, min: 0.26 ms, max: 15.46 ms,
std dev: 0.22 ms, total time: 2.22 s
-2025-08-17T12:28:19.140828Z INFO bench_report::prints: Benchmark: Pinned
Producer And Consumer, 8 producers, 8 consumers, 8 streams, 1 topic per stream,
1 partitions per topic, 32000000 messages, 1000 messages per batch, 32000
message batches, 1000 bytes per message, 32GB of data processed
+zero copy
-2025-08-17T12:28:19.140848Z INFO bench_report::prints: Producers Results:
Total throughput: 1823.34 MB/s, 1823338 messages/s, average throughput per
Producer: 227.92 MB/s, p50 latency: 4.15 ms, p90 latency: 6.07 ms, p95 latency:
6.75 ms, p99 latency: 8.16 ms, p999 latency: 10.84 ms, p9999 latency: 16.72 ms,
average latency: 4.35 ms, median latency: 4.15 ms, min: 0.70 ms, max: 36.29 ms,
std dev: 0.23 ms, total time: 8.78 s
+2025-08-18T03:12:25.653912Z INFO bench_report::prints: Producers Results:
Total throughput: 1707.87 MB/s, 1707876 messages/s, average throughput per
Producer: 213.48 MB/s, p50 latency: 0.53 ms, p90 latency: 1.51 ms, p95 latency:
3.12 ms, p99 latency: 7.64 ms, p999 latency: 11.16 ms, p9999 latency: 16.51 ms,
average latency: 0.91 ms, median latency: 0.53 ms, min: 0.24 ms, max: 18.96 ms,
std dev: 0.28 ms, total time: 1.86 s
-2025-08-17T12:28:19.140853Z INFO bench_report::prints: Consumers Results:
Total throughput: 1819.61 MB/s, 1819613 messages/s, average throughput per
Consumer: 227.45 MB/s, p50 latency: 4.19 ms, p90 latency: 5.81 ms, p95 latency:
6.43 ms, p99 latency: 7.87 ms, p999 latency: 10.93 ms, p9999 latency: 20.03 ms,
average latency: 4.34 ms, median latency: 4.19 ms, min: 0.52 ms, max: 26.16 ms,
std dev: 0.29 ms, total time: 8.78 s
+2025-08-18T03:12:25.653918Z INFO bench_report::prints: Consumers Results:
Total throughput: 1704.61 MB/s, 1704612 messages/s, average throughput per
Consumer: 213.08 MB/s, p50 latency: 0.49 ms, p90 latency: 1.40 ms, p95 latency:
3.07 ms, p99 latency: 7.89 ms, p999 latency: 13.37 ms, p9999 latency: 17.70 ms,
average latency: 0.87 ms, median latency: 0.49 ms, min: 0.18 ms, max: 11.00 ms,
std dev: 0.27 ms, total time: 1.86 s
-2025-08-17T12:28:19.140857Z INFO bench_report::prints: Aggregate Results:
Total throughput: 3642.95 MB/s, 3642951 messages/s, average throughput per
Actor: 227.68 MB/s, p50 latency: 4.17 ms, p90 latency: 5.94 ms, p95 latency:
6.59 ms, p99 latency: 8.02 ms, p999 latency: 10.88 ms, p9999 latency: 18.37 ms,
average latency: 4.35 ms, median latency: 4.17 ms, min: 0.52 ms, max: 36.29 ms,
std dev: 0.27 ms, total time: 8.78 s
-
-------------------------------------------------------------
-
-old
-2025-08-17T12:31:11.021495Z INFO bench_report::prints: Benchmark: Pinned
Producer And Consumer, 8 producers, 8 consumers, 8 streams, 1 topic per stream,
1 partitions per topic, 32000000 messages, 1000 messages per batch, 32000
message batches, 1000 bytes per message, 32GB of data processed
-
-2025-08-17T12:31:11.021538Z INFO bench_report::prints: Producers Results:
Total throughput: 2214.78 MB/s, 2214730 messages/s, average throughput per
Producer: 276.85 MB/s, p50 latency: 2.77 ms, p90 latency: 4.02 ms, p95 latency:
4.46 ms, p99 latency: 5.59 ms, p999 latency: 15.81 ms, p9999 latency: 1078.97
ms, average latency: 3.57 ms, median latency: 2.77 ms, min: 0.50 ms, max:
680.17 ms, std dev: 8.48 ms, total time: 7.25 s
-
-2025-08-17T12:31:11.021553Z INFO bench_report::prints: Consumers Results:
Total throughput: 2216.73 MB/s, 2216677 messages/s, average throughput per
Consumer: 277.09 MB/s, p50 latency: 2.31 ms, p90 latency: 3.30 ms, p95 latency:
3.69 ms, p99 latency: 4.69 ms, p999 latency: 14.08 ms, p9999 latency: 1078.30
ms, average latency: 3.08 ms, median latency: 2.31 ms, min: 0.49 ms, max:
1248.46 ms, std dev: 13.95 ms, total time: 7.25 s
-
-2025-08-17T12:31:11.021564Z INFO bench_report::prints: Aggregate Results:
Total throughput: 4431.51 MB/s, 4431408 messages/s, average throughput per
Actor: 276.97 MB/s, p50 latency: 2.54 ms, p90 latency: 3.66 ms, p95 latency:
4.07 ms, p99 latency: 5.14 ms, p999 latency: 14.95 ms, p9999 latency: 1078.63
ms, average latency: 3.32 ms, median latency: 2.54 ms, min: 0.49 ms, max:
1248.46 ms, std dev: 11.17 ms, total time: 7.25 s
\ No newline at end of file
+2025-08-18T03:12:25.653987Z INFO bench_report::prints: Aggregate Results:
Total throughput: 3412.48 MB/s, 3412487 messages/s, average throughput per
Actor: 213.28 MB/s, p50 latency: 0.51 ms, p90 latency: 1.45 ms, p95 latency:
3.10 ms, p99 latency: 7.77 ms, p999 latency: 12.26 ms, p9999 latency: 17.10 ms,
average latency: 0.89 ms, median latency: 0.51 ms, min: 0.18 ms, max: 18.96 ms,
std dev: 0.27 ms, total time: 1.86 s
\ No newline at end of file