This is an automated email from the ASF dual-hosted git repository.

gkoszyk pushed a commit to branch io_uring_tpc
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/io_uring_tpc by this push:
     new 68ed9c627 fix(io_uring): enable message flush test, update websocket 
cli docs (#2255)
68ed9c627 is described below

commit 68ed9c6276a683acef154f3eaaaec1986b0d2fbe
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Mon Oct 13 13:43:13 2025 +0200

    fix(io_uring): enable message flush test, update websocket cli docs (#2255)
---
 core/common/src/types/args/mod.rs                       |  4 ++--
 core/integration/tests/cli/general/test_help_command.rs | 17 ++++++++++++++++-
 .../tests/cli/message/test_message_flush_command.rs     |  1 -
 core/server/src/slab/streams.rs                         | 14 ++++++++++++++
 4 files changed, 32 insertions(+), 4 deletions(-)

diff --git a/core/common/src/types/args/mod.rs 
b/core/common/src/types/args/mod.rs
index 220be6e16..bba968d24 100644
--- a/core/common/src/types/args/mod.rs
+++ b/core/common/src/types/args/mod.rs
@@ -28,7 +28,7 @@ use serde::{Deserialize, Serialize};
 #[derive(Parser, Debug, Clone, Deserialize, Serialize, Default)]
 #[command(author, version, about, long_about = None)]
 pub struct ArgsOptional {
-    /// The transport to use. Valid values are `quic`, `http` and `tcp`
+    /// The transport to use. Valid values are `quic`, `http`, `tcp` and `ws`
     ///
     /// [default: tcp]
     #[arg(long)]
@@ -224,7 +224,7 @@ pub struct ArgsOptional {
 /// The arguments used by the `ClientProviderConfig` to create a client.
 #[derive(Debug, Clone)]
 pub struct Args {
-    /// The transport to use. Valid values are `quic`, `http` and `tcp`
+    /// The transport to use. Valid values are `quic`, `http`, `tcp` and `ws`
     pub transport: String,
 
     /// Optional encryption key for the message payload used by the client
diff --git a/core/integration/tests/cli/general/test_help_command.rs 
b/core/integration/tests/cli/general/test_help_command.rs
index 8481bc456..623b546e1 100644
--- a/core/integration/tests/cli/general/test_help_command.rs
+++ b/core/integration/tests/cli/general/test_help_command.rs
@@ -54,7 +54,7 @@ Commands:
 
 Options:
       --transport <TRANSPORT>
-          The transport to use. Valid values are `quic`, `http` and `tcp`
+          The transport to use. Valid values are `quic`, `http`, `tcp` and `ws`
 {CLAP_INDENT}
           [default: tcp]
 
@@ -174,6 +174,21 @@ Options:
       --quic-validate-certificate
           Flag to enable certificate validation for QUIC
 
+      --websocket-server-address <WEBSOCKET_SERVER_ADDRESS>
+          The optional server address for the WebSocket transport
+{CLAP_INDENT}
+          [default: 127.0.0.1:8095]
+
+      --websocket-reconnection-max-retries <WEBSOCKET_RECONNECTION_MAX_RETRIES>
+          The optional number of max reconnect retries for the WebSocket 
transport
+{CLAP_INDENT}
+          [default: 3]
+
+      --websocket-reconnection-interval <WEBSOCKET_RECONNECTION_INTERVAL>
+          The optional reconnect interval for the WebSocket transport
+{CLAP_INDENT}
+          [default: "1s"]
+
   -q, --quiet
           Quiet mode (disabled stdout printing)
 
diff --git a/core/integration/tests/cli/message/test_message_flush_command.rs 
b/core/integration/tests/cli/message/test_message_flush_command.rs
index 3531bd7cf..83541b8cd 100644
--- a/core/integration/tests/cli/message/test_message_flush_command.rs
+++ b/core/integration/tests/cli/message/test_message_flush_command.rs
@@ -169,7 +169,6 @@ impl IggyCmdTestCase for TestMessageFetchCmd {
 
 #[tokio::test]
 #[parallel]
-#[ignore = "flush_unsaved_buffer not yet implemented (todo!)"]
 pub async fn should_be_successful() {
     let mut iggy_cmd_test = IggyCmdTest::default();
 
diff --git a/core/server/src/slab/streams.rs b/core/server/src/slab/streams.rs
index 694d925b0..bb639dade 100644
--- a/core/server/src/slab/streams.rs
+++ b/core/server/src/slab/streams.rs
@@ -1249,6 +1249,20 @@ impl Streams {
     ) -> Result<u32, IggyError> {
         let batch_count = batches.count();
         let batch_size = batches.size();
+
+        if batch_count == 0 {
+            return Ok(0);
+        }
+
+        let has_segments =
+            self.with_partition_by_id(stream_id, topic_id, partition_id, |(.., 
log)| {
+                log.has_segments()
+            });
+
+        if !has_segments {
+            return Ok(0);
+        }
+
         // Extract storage before async operations
         let (messages_writer, index_writer) =
             self.with_partition_by_id(stream_id, topic_id, partition_id, |(.., 
log)| {

Reply via email to