This is an automated email from the ASF dual-hosted git repository.
gkoszyk pushed a commit to branch io_uring_tpc
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/io_uring_tpc by this push:
new 7090bc86e fix(io_uring): resolve clippy lints and add WebSocket to
benchmark tool (#2256)
7090bc86e is described below
commit 7090bc86e61c0e2735c6969e172428f0ccd9fc8c
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Mon Oct 13 15:00:19 2025 +0200
fix(io_uring): resolve clippy lints and add WebSocket to benchmark tool
(#2256)
- Fix clippy warnings in websocket server code
- Add WebSocket transport support to iggy-bench (alias: ws)
- Update default WebSocket address to 8092
---
core/bench/src/args/common.rs | 1 +
core/bench/src/args/defaults.rs | 2 ++
core/bench/src/args/transport.rs | 42 ++++++++++++++++++++++
core/common/src/types/args/mod.rs | 4 +--
.../tests/cli/general/test_help_command.rs | 2 +-
core/server/src/configs/websocket.rs | 40 ++++++++++-----------
core/server/src/websocket/connection_handler.rs | 6 ++--
core/server/src/websocket/websocket_listener.rs | 3 +-
8 files changed, 72 insertions(+), 28 deletions(-)
diff --git a/core/bench/src/args/common.rs b/core/bench/src/args/common.rs
index a31972f42..6dd36b20c 100644
--- a/core/bench/src/args/common.rs
+++ b/core/bench/src/args/common.rs
@@ -360,6 +360,7 @@ impl IggyBenchArgs {
BenchmarkTransportCommand::Tcp(_) => "tcp",
BenchmarkTransportCommand::Quic(_) => "quic",
BenchmarkTransportCommand::Http(_) => "http",
+ BenchmarkTransportCommand::WebSocket(_) => "ws",
};
let actors = match &self.benchmark_kind {
diff --git a/core/bench/src/args/defaults.rs b/core/bench/src/args/defaults.rs
index ee2cbf8a4..7695782fb 100644
--- a/core/bench/src/args/defaults.rs
+++ b/core/bench/src/args/defaults.rs
@@ -29,6 +29,8 @@ pub const DEFAULT_QUIC_SERVER_ADDRESS: &str =
"127.0.0.1:8080";
pub const DEFAULT_QUIC_SERVER_NAME: &str = "localhost";
pub const DEFAULT_QUIC_VALIDATE_CERTIFICATE: bool = false;
+pub const DEFAULT_WEBSOCKET_SERVER_ADDRESS: &str = "127.0.0.1:8092";
+
pub const DEFAULT_MESSAGES_PER_BATCH: NonZeroU32 = u32!(1000);
pub const DEFAULT_MESSAGE_BATCHES: NonZeroU32 = u32!(1000);
pub const DEFAULT_MESSAGE_SIZE: NonZeroU32 = u32!(1000);
diff --git a/core/bench/src/args/transport.rs b/core/bench/src/args/transport.rs
index 7fa8ed42a..ab23ac356 100644
--- a/core/bench/src/args/transport.rs
+++ b/core/bench/src/args/transport.rs
@@ -19,6 +19,7 @@
use super::defaults::{
DEFAULT_HTTP_SERVER_ADDRESS, DEFAULT_QUIC_CLIENT_ADDRESS,
DEFAULT_QUIC_SERVER_ADDRESS,
DEFAULT_QUIC_SERVER_NAME, DEFAULT_QUIC_VALIDATE_CERTIFICATE,
DEFAULT_TCP_SERVER_ADDRESS,
+ DEFAULT_WEBSOCKET_SERVER_ADDRESS,
};
use super::{output::BenchmarkOutputCommand, props::BenchmarkTransportProps};
use clap::{Parser, Subcommand};
@@ -30,6 +31,8 @@ pub enum BenchmarkTransportCommand {
Http(HttpArgs),
Tcp(TcpArgs),
Quic(QuicArgs),
+ #[command(alias = "ws")]
+ WebSocket(WebSocketArgs),
}
impl Serialize for BenchmarkTransportCommand {
@@ -41,6 +44,7 @@ impl Serialize for BenchmarkTransportCommand {
Self::Http(_) => "http",
Self::Tcp(_) => "tcp",
Self::Quic(_) => "quic",
+ Self::WebSocket(_) => "websocket",
};
serializer.serialize_str(variant_str)
}
@@ -72,6 +76,7 @@ impl BenchmarkTransportProps for BenchmarkTransportCommand {
Self::Http(args) => args,
Self::Tcp(args) => args,
Self::Quic(args) => args,
+ Self::WebSocket(args) => args,
}
}
@@ -222,3 +227,40 @@ impl BenchmarkTransportProps for QuicArgs {
self.output.as_ref()
}
}
+
+#[derive(Parser, Debug, Clone)]
+pub struct WebSocketArgs {
+ /// Address of the WebSocket iggy-server
+ #[arg(long, default_value_t = DEFAULT_WEBSOCKET_SERVER_ADDRESS.to_owned())]
+ pub server_address: String,
+
+ /// Optional output command, used to output results (charts, raw json
data) to a directory
+ #[command(subcommand)]
+ pub output: Option<BenchmarkOutputCommand>,
+}
+
+impl BenchmarkTransportProps for WebSocketArgs {
+ fn transport(&self) -> &TransportProtocol {
+ &TransportProtocol::WebSocket
+ }
+
+ fn server_address(&self) -> &str {
+ &self.server_address
+ }
+
+ fn validate_certificate(&self) -> bool {
+ panic!("Cannot validate certificate for WebSocket transport!")
+ }
+
+ fn client_address(&self) -> &str {
+ panic!("Setting client address for WebSocket transport is not
supported!")
+ }
+
+ fn nodelay(&self) -> bool {
+ panic!("Setting nodelay for WebSocket transport is not supported!")
+ }
+
+ fn output_command(&self) -> Option<&BenchmarkOutputCommand> {
+ self.output.as_ref()
+ }
+}
diff --git a/core/common/src/types/args/mod.rs
b/core/common/src/types/args/mod.rs
index bba968d24..e4e1a56f1 100644
--- a/core/common/src/types/args/mod.rs
+++ b/core/common/src/types/args/mod.rs
@@ -201,7 +201,7 @@ pub struct ArgsOptional {
/// The optional server address for the WebSocket transport
///
- /// [default: 127.0.0.1:8095]
+ /// [default: 127.0.0.1:8092]
#[arg(long)]
#[serde(skip_serializing_if = "Option::is_none")]
pub websocket_server_address: Option<String>,
@@ -403,7 +403,7 @@ impl Default for Args {
quic_max_idle_timeout: 10000,
quic_validate_certificate: false,
quic_heartbeat_interval: "5s".to_string(),
- websocket_server_address: "127.0.0.1:8095".to_string(),
+ websocket_server_address: "127.0.0.1:8092".to_string(),
websocket_reconnection_enabled: true,
websocket_reconnection_max_retries: None,
websocket_reconnection_interval: "1s".to_string(),
diff --git a/core/integration/tests/cli/general/test_help_command.rs
b/core/integration/tests/cli/general/test_help_command.rs
index 623b546e1..9ae5fcb95 100644
--- a/core/integration/tests/cli/general/test_help_command.rs
+++ b/core/integration/tests/cli/general/test_help_command.rs
@@ -177,7 +177,7 @@ Options:
--websocket-server-address <WEBSOCKET_SERVER_ADDRESS>
The optional server address for the WebSocket transport
{CLAP_INDENT}
- [default: 127.0.0.1:8095]
+ [default: 127.0.0.1:8092]
--websocket-reconnection-max-retries <WEBSOCKET_RECONNECTION_MAX_RETRIES>
The optional number of max reconnect retries for the WebSocket
transport
diff --git a/core/server/src/configs/websocket.rs
b/core/server/src/configs/websocket.rs
index 6d8a5b95b..7841edfbe 100644
--- a/core/server/src/configs/websocket.rs
+++ b/core/server/src/configs/websocket.rs
@@ -53,34 +53,34 @@ impl WebSocketConfig {
let mut config = TungsteniteConfig::default();
- if let Some(read_buf_size_str) = &self.read_buffer_size {
- if let Ok(byte_size) = read_buf_size_str.parse::<IggyByteSize>() {
- config = config.read_buffer_size(byte_size.as_bytes_u64() as
usize);
- }
+ if let Some(read_buf_size_str) = &self.read_buffer_size
+ && let Ok(byte_size) = read_buf_size_str.parse::<IggyByteSize>()
+ {
+ config = config.read_buffer_size(byte_size.as_bytes_u64() as
usize);
}
- if let Some(write_buf_size_str) = &self.write_buffer_size {
- if let Ok(byte_size) = write_buf_size_str.parse::<IggyByteSize>() {
- config = config.write_buffer_size(byte_size.as_bytes_u64() as
usize);
- }
+ if let Some(write_buf_size_str) = &self.write_buffer_size
+ && let Ok(byte_size) = write_buf_size_str.parse::<IggyByteSize>()
+ {
+ config = config.write_buffer_size(byte_size.as_bytes_u64() as
usize);
}
- if let Some(max_write_buf_size_str) = &self.max_write_buffer_size {
- if let Ok(byte_size) =
max_write_buf_size_str.parse::<IggyByteSize>() {
- config = config.max_write_buffer_size(byte_size.as_bytes_u64()
as usize);
- }
+ if let Some(max_write_buf_size_str) = &self.max_write_buffer_size
+ && let Ok(byte_size) =
max_write_buf_size_str.parse::<IggyByteSize>()
+ {
+ config = config.max_write_buffer_size(byte_size.as_bytes_u64() as
usize);
}
- if let Some(msg_size_str) = &self.max_message_size {
- if let Ok(byte_size) = msg_size_str.parse::<IggyByteSize>() {
- config = config.max_message_size(Some(byte_size.as_bytes_u64()
as usize));
- }
+ if let Some(msg_size_str) = &self.max_message_size
+ && let Ok(byte_size) = msg_size_str.parse::<IggyByteSize>()
+ {
+ config = config.max_message_size(Some(byte_size.as_bytes_u64() as
usize));
}
- if let Some(frame_size_str) = &self.max_frame_size {
- if let Ok(byte_size) = frame_size_str.parse::<IggyByteSize>() {
- config = config.max_frame_size(Some(byte_size.as_bytes_u64()
as usize));
- }
+ if let Some(frame_size_str) = &self.max_frame_size
+ && let Ok(byte_size) = frame_size_str.parse::<IggyByteSize>()
+ {
+ config = config.max_frame_size(Some(byte_size.as_bytes_u64() as
usize));
}
config = config.accept_unmasked_frames(self.accept_unmasked_frames);
diff --git a/core/server/src/websocket/connection_handler.rs
b/core/server/src/websocket/connection_handler.rs
index 8f100d8c2..33f7f26b5 100644
--- a/core/server/src/websocket/connection_handler.rs
+++ b/core/server/src/websocket/connection_handler.rs
@@ -69,14 +69,14 @@ pub(crate) async fn handle_connection(
let length =
u32::from_le_bytes(initial_buffer[0..INITIAL_BYTES_LENGTH].try_into().unwrap());
let (res, mut code_buffer_out) = sender.read(code_buffer).await;
- let _ = res?;
+ res?;
let code: u32 =
u32::from_le_bytes(code_buffer_out[0..INITIAL_BYTES_LENGTH].try_into().unwrap());
initial_buffer.clear();
code_buffer_out.clear();
- length_buffer = BytesMut::from(initial_buffer);
- code_buffer = BytesMut::from(code_buffer_out);
+ length_buffer = initial_buffer;
+ code_buffer = code_buffer_out;
debug!("Received a WebSocket request, length: {length}, code: {code}");
let command = ServerCommand::from_code_and_reader(code, sender, length
- 4).await?;
diff --git a/core/server/src/websocket/websocket_listener.rs
b/core/server/src/websocket/websocket_listener.rs
index 6d884750b..b3fc5969f 100644
--- a/core/server/src/websocket/websocket_listener.rs
+++ b/core/server/src/websocket/websocket_listener.rs
@@ -92,7 +92,6 @@ async fn accept_loop(
) -> Result<(), IggyError> {
loop {
let shard = shard.clone();
- let ws_config = ws_config.clone();
let accept_future = listener.accept();
futures::select! {
@@ -110,7 +109,7 @@ async fn accept_loop(
shard_info!(shard.id, "Accepted new WebSocket
connection from: {}", remote_addr);
let shard_clone = shard.clone();
- let ws_config_clone = ws_config.clone();
+ let ws_config_clone = ws_config;
let registry = shard.task_registry.clone();
let registry_clone = registry.clone();