This is an automated email from the ASF dual-hosted git repository.
gkoszyk pushed a commit to branch io_uring_tpc
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/io_uring_tpc by this push:
new 68ed9c627 fix(io_uring): enable message flush test, update websocket
cli docs (#2255)
68ed9c627 is described below
commit 68ed9c6276a683acef154f3eaaaec1986b0d2fbe
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Mon Oct 13 13:43:13 2025 +0200
fix(io_uring): enable message flush test, update websocket cli docs (#2255)
---
core/common/src/types/args/mod.rs | 4 ++--
core/integration/tests/cli/general/test_help_command.rs | 17 ++++++++++++++++-
.../tests/cli/message/test_message_flush_command.rs | 1 -
core/server/src/slab/streams.rs | 14 ++++++++++++++
4 files changed, 32 insertions(+), 4 deletions(-)
diff --git a/core/common/src/types/args/mod.rs
b/core/common/src/types/args/mod.rs
index 220be6e16..bba968d24 100644
--- a/core/common/src/types/args/mod.rs
+++ b/core/common/src/types/args/mod.rs
@@ -28,7 +28,7 @@ use serde::{Deserialize, Serialize};
#[derive(Parser, Debug, Clone, Deserialize, Serialize, Default)]
#[command(author, version, about, long_about = None)]
pub struct ArgsOptional {
- /// The transport to use. Valid values are `quic`, `http` and `tcp`
+ /// The transport to use. Valid values are `quic`, `http`, `tcp` and `ws`
///
/// [default: tcp]
#[arg(long)]
@@ -224,7 +224,7 @@ pub struct ArgsOptional {
/// The arguments used by the `ClientProviderConfig` to create a client.
#[derive(Debug, Clone)]
pub struct Args {
- /// The transport to use. Valid values are `quic`, `http` and `tcp`
+ /// The transport to use. Valid values are `quic`, `http`, `tcp` and `ws`
pub transport: String,
/// Optional encryption key for the message payload used by the client
diff --git a/core/integration/tests/cli/general/test_help_command.rs
b/core/integration/tests/cli/general/test_help_command.rs
index 8481bc456..623b546e1 100644
--- a/core/integration/tests/cli/general/test_help_command.rs
+++ b/core/integration/tests/cli/general/test_help_command.rs
@@ -54,7 +54,7 @@ Commands:
Options:
--transport <TRANSPORT>
- The transport to use. Valid values are `quic`, `http` and `tcp`
+ The transport to use. Valid values are `quic`, `http`, `tcp` and `ws`
{CLAP_INDENT}
[default: tcp]
@@ -174,6 +174,21 @@ Options:
--quic-validate-certificate
Flag to enable certificate validation for QUIC
+ --websocket-server-address <WEBSOCKET_SERVER_ADDRESS>
+ The optional server address for the WebSocket transport
+{CLAP_INDENT}
+ [default: 127.0.0.1:8095]
+
+ --websocket-reconnection-max-retries <WEBSOCKET_RECONNECTION_MAX_RETRIES>
+ The optional number of max reconnect retries for the WebSocket
transport
+{CLAP_INDENT}
+ [default: 3]
+
+ --websocket-reconnection-interval <WEBSOCKET_RECONNECTION_INTERVAL>
+ The optional reconnect interval for the WebSocket transport
+{CLAP_INDENT}
+ [default: "1s"]
+
-q, --quiet
Quiet mode (disabled stdout printing)
diff --git a/core/integration/tests/cli/message/test_message_flush_command.rs
b/core/integration/tests/cli/message/test_message_flush_command.rs
index 3531bd7cf..83541b8cd 100644
--- a/core/integration/tests/cli/message/test_message_flush_command.rs
+++ b/core/integration/tests/cli/message/test_message_flush_command.rs
@@ -169,7 +169,6 @@ impl IggyCmdTestCase for TestMessageFetchCmd {
#[tokio::test]
#[parallel]
-#[ignore = "flush_unsaved_buffer not yet implemented (todo!)"]
pub async fn should_be_successful() {
let mut iggy_cmd_test = IggyCmdTest::default();
diff --git a/core/server/src/slab/streams.rs b/core/server/src/slab/streams.rs
index 694d925b0..bb639dade 100644
--- a/core/server/src/slab/streams.rs
+++ b/core/server/src/slab/streams.rs
@@ -1249,6 +1249,20 @@ impl Streams {
) -> Result<u32, IggyError> {
let batch_count = batches.count();
let batch_size = batches.size();
+
+ if batch_count == 0 {
+ return Ok(0);
+ }
+
+ let has_segments =
+ self.with_partition_by_id(stream_id, topic_id, partition_id, |(..,
log)| {
+ log.has_segments()
+ });
+
+ if !has_segments {
+ return Ok(0);
+ }
+
// Extract storage before async operations
let (messages_writer, index_writer) =
self.with_partition_by_id(stream_id, topic_id, partition_id, |(..,
log)| {