This is an automated email from the ASF dual-hosted git repository.

piotr pushed a commit to branch fix_rust_sdk_reconnection
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit 861a948125ef92eac23460d1fafc5181be7cbab0
Author: spetz <[email protected]>
AuthorDate: Thu Mar 5 19:48:28 2026 +0100

    fix(rust): auto-reconnect on transport errors after server restart
---
 Cargo.lock                                         |   2 +-
 Cargo.toml                                         |  57 +----
 DEPENDENCIES.md                                    |   2 +-
 core/ai/mcp/Cargo.toml                             |   6 +-
 core/integration/Cargo.toml                        |   7 +-
 core/integration/tests/server/scenarios/mod.rs     |   1 +
 .../scenarios/reconnect_after_restart_scenario.rs  | 257 +++++++++++++++++++++
 core/integration/tests/server/specific.rs          |  32 ++-
 core/sdk/Cargo.toml                                |   2 +-
 core/sdk/src/quic/quic_client.rs                   |   8 +-
 core/sdk/src/tcp/tcp_client.rs                     |   1 +
 core/sdk/src/websocket/websocket_client.rs         |   6 +
 12 files changed, 314 insertions(+), 67 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 9bd245b86..23b34e3a8 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -5011,7 +5011,7 @@ checksum = 
"cd62e6b5e86ea8eeeb8db1de02880a6abc01a397b2ebb64b5d74ac255318f5cb"
 
 [[package]]
 name = "iggy"
-version = "0.9.1-edge.1"
+version = "0.9.1-edge.2"
 dependencies = [
  "async-broadcast",
  "async-dropper",
diff --git a/Cargo.toml b/Cargo.toml
index d2bbfe567..1f7af68e2 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -96,19 +96,7 @@ clap_complete = "4.5.66"
 clock = { path = "core/clock" }
 colored = "3.1.1"
 comfy-table = "7.2.2"
-compio = { version = "0.18.0", features = [
-    "runtime",
-    "macros",
-    "io-uring",
-    "time",
-    "rustls",
-    "ring",
-    "net",
-    "quic",
-    "tls",
-    "ws",
-    "fs",
-] }
+compio = { version = "0.18.0", features = ["runtime", "macros", "io-uring", 
"time", "rustls", "ring", "net", "quic", "tls", "ws", "fs"] }
 # Pin compio-driver >= 0.11.2 to fix musl compilation (compio-rs/compio#668)
 compio-driver = "0.11.2"
 configs = { path = "core/configs", version = "0.1.0" }
@@ -157,7 +145,7 @@ humantime = "2.3.0"
 hwlocality = "1.0.0-alpha.11"
 iceberg = "0.8.0"
 iceberg-catalog-rest = "0.8.0"
-iggy = { path = "core/sdk", version = "0.9.1-edge.1" }
+iggy = { path = "core/sdk", version = "0.9.1-edge.2" }
 iggy_binary_protocol = { path = "core/binary_protocol", version = 
"0.9.1-edge.1" }
 iggy_common = { path = "core/common", version = "0.9.1-edge.1" }
 iggy_connector_sdk = { path = "core/connectors/sdk", version = "0.2.1-edge.1" }
@@ -185,22 +173,9 @@ octocrab = "0.49.5"
 once_cell = "1.21.3"
 opentelemetry = { version = "0.31.0", features = ["trace", "logs"] }
 opentelemetry-appender-tracing = { version = "0.31.1", features = ["log"] }
-opentelemetry-otlp = { version = "0.31.0", features = [
-    "logs",
-    "trace",
-    "grpc-tonic",
-    "http",
-    "http-proto",
-    "reqwest-client",
-] }
+opentelemetry-otlp = { version = "0.31.0", features = ["logs", "trace", 
"grpc-tonic", "http", "http-proto", "reqwest-client"] }
 opentelemetry-semantic-conventions = "0.31.0"
-opentelemetry_sdk = { version = "0.31.0", features = [
-    "logs",
-    "trace",
-    "experimental_async_runtime",
-    "experimental_logs_batch_log_processor_with_async_runtime",
-    "experimental_trace_batch_span_processor_with_async_runtime",
-] }
+opentelemetry_sdk = { version = "0.31.0", features = ["logs", "trace", 
"experimental_async_runtime", 
"experimental_logs_batch_log_processor_with_async_runtime", 
"experimental_trace_batch_span_processor_with_async_runtime"] }
 papaya = "0.2.3"
 parquet = "57.3.0"
 partitions = { path = "core/partitions" }
@@ -245,13 +220,7 @@ server = { path = "core/server" }
 simd-json = { version = "0.17.0", features = ["serde_impl"] }
 slab = "0.4.12"
 socket2 = "0.6.2"
-sqlx = { version = "0.8.6", features = [
-    "runtime-tokio-rustls",
-    "postgres",
-    "chrono",
-    "uuid",
-    "json",
-] }
+sqlx = { version = "0.8.6", features = ["runtime-tokio-rustls", "postgres", 
"chrono", "uuid", "json"] }
 static-toml = "1.3.0"
 strum = { version = "0.27.2", features = ["derive"] }
 strum_macros = "0.27.2"
@@ -270,11 +239,7 @@ tower-http = { version = "0.6.8", features = 
["add-extension", "cors", "trace"]
 tracing = "0.1.44"
 tracing-appender = "0.2.4"
 tracing-opentelemetry = "0.32.1"
-tracing-subscriber = { version = "0.3.22", default-features = false, features 
= [
-    "fmt",
-    "env-filter",
-    "ansi",
-] }
+tracing-subscriber = { version = "0.3.22", default-features = false, features 
= ["fmt", "env-filter", "ansi"] }
 trait-variant = "0.1.2"
 tungstenite = "0.28.0"
 twox-hash = { version = "2.1.2", features = ["xxhash32"] }
@@ -283,15 +248,7 @@ uuid = { version = "1.20.0", features = ["v4", "v7", 
"fast-rng", "serde", "zeroc
 vergen-git2 = { version = "9.1.0", features = ["build", "cargo", "rustc", 
"si"] }
 walkdir = "2.5.0"
 wasm-bindgen = "0.2"
-web-sys = { version = "0.3", features = [
-    "Window",
-    "Location",
-    "HtmlSelectElement",
-    "Clipboard",
-    "Navigator",
-    "ResizeObserver",
-    "ResizeObserverEntry",
-] }
+web-sys = { version = "0.3", features = ["Window", "Location", 
"HtmlSelectElement", "Clipboard", "Navigator", "ResizeObserver", 
"ResizeObserverEntry"] }
 webpki-roots = "1.0.6"
 yew = { version = "0.22", features = ["csr"] }
 yew-router = "0.19"
diff --git a/DEPENDENCIES.md b/DEPENDENCIES.md
index e3f450a7e..31c1302ff 100644
--- a/DEPENDENCIES.md
+++ b/DEPENDENCIES.md
@@ -439,7 +439,7 @@ ident_case: 1.0.1, "Apache-2.0 OR MIT",
 idna: 1.1.0, "Apache-2.0 OR MIT",
 idna_adapter: 1.2.1, "Apache-2.0 OR MIT",
 if_chain: 1.0.3, "Apache-2.0 OR MIT",
-iggy: 0.9.1-edge.1, "Apache-2.0",
+iggy: 0.9.1-edge.2, "Apache-2.0",
 iggy-bench: 0.4.1-edge.1, "Apache-2.0",
 iggy-bench-dashboard-server: 0.6.2-edge.1, "Apache-2.0",
 iggy-cli: 0.11.1-edge.1, "Apache-2.0",
diff --git a/core/ai/mcp/Cargo.toml b/core/ai/mcp/Cargo.toml
index 78046be4e..b2a6f40bb 100644
--- a/core/ai/mcp/Cargo.toml
+++ b/core/ai/mcp/Cargo.toml
@@ -42,11 +42,7 @@ opentelemetry-otlp = { workspace = true }
 opentelemetry-semantic-conventions = { workspace = true }
 opentelemetry_sdk = { workspace = true, features = ["rt-tokio"] }
 reqwest = { workspace = true }
-rmcp = { workspace = true, features = [
-    "server",
-    "transport-io",
-    "transport-streamable-http-server",
-] }
+rmcp = { workspace = true, features = ["server", "transport-io", 
"transport-streamable-http-server"] }
 serde = { workspace = true }
 serde_json = { workspace = true }
 socket2 = "0.6"
diff --git a/core/integration/Cargo.toml b/core/integration/Cargo.toml
index 13afb04dc..c76b6fe7f 100644
--- a/core/integration/Cargo.toml
+++ b/core/integration/Cargo.toml
@@ -54,12 +54,7 @@ rcgen = { workspace = true }
 reqwest = { workspace = true }
 reqwest-middleware = { workspace = true }
 reqwest-retry = { workspace = true }
-rmcp = { workspace = true, features = [
-    "client",
-    "reqwest",
-    "transport-streamable-http-client",
-    "transport-streamable-http-client-reqwest",
-] }
+rmcp = { workspace = true, features = ["client", "reqwest", 
"transport-streamable-http-client", "transport-streamable-http-client-reqwest"] 
}
 serde = { workspace = true }
 serde_json = { workspace = true }
 serial_test = { workspace = true }
diff --git a/core/integration/tests/server/scenarios/mod.rs 
b/core/integration/tests/server/scenarios/mod.rs
index 5a3b3f7a4..fb510234f 100644
--- a/core/integration/tests/server/scenarios/mod.rs
+++ b/core/integration/tests/server/scenarios/mod.rs
@@ -38,6 +38,7 @@ pub mod offset_scenario;
 pub mod permissions_scenario;
 pub mod purge_delete_scenario;
 pub mod read_during_persistence_scenario;
+pub mod reconnect_after_restart_scenario;
 pub mod segment_rotation_race_scenario;
 pub mod single_message_per_batch_scenario;
 pub mod snapshot_scenario;
diff --git 
a/core/integration/tests/server/scenarios/reconnect_after_restart_scenario.rs 
b/core/integration/tests/server/scenarios/reconnect_after_restart_scenario.rs
new file mode 100644
index 000000000..67e8cc632
--- /dev/null
+++ 
b/core/integration/tests/server/scenarios/reconnect_after_restart_scenario.rs
@@ -0,0 +1,257 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use futures::StreamExt;
+use iggy::prelude::*;
+use iggy_common::TransportProtocol;
+use integration::harness::{TestBinary, TestHarness};
+use std::str::FromStr;
+use tokio::time::{Duration, sleep, timeout};
+
+const STREAM_NAME: &str = "test-reconnect-stream";
+const TOPIC_NAME: &str = "test-reconnect-topic";
+
+fn connection_string(harness: &TestHarness) -> String {
+    let transport = harness.transport().expect("No transport configured");
+    let server = harness.server();
+
+    let addr = match transport {
+        TransportProtocol::Tcp => server.tcp_addr().expect("TCP address not 
available"),
+        TransportProtocol::Quic => server.quic_addr().expect("QUIC address not 
available"),
+        TransportProtocol::WebSocket => server
+            .websocket_addr()
+            .expect("WebSocket address not available"),
+        TransportProtocol::Http => panic!("HTTP is stateless and does not 
support reconnect"),
+    };
+
+    let protocol_prefix = match transport {
+        TransportProtocol::Tcp => "iggy",
+        TransportProtocol::Quic => "iggy+quic",
+        TransportProtocol::WebSocket => "iggy+ws",
+        TransportProtocol::Http => unreachable!(),
+    };
+
+    // Use a very long heartbeat so the client's send/poll is the first 
operation
+    // to hit the broken connection, exercising the write-path error handling
+    // rather than the heartbeat's read-based detection.
+    format!("{protocol_prefix}://iggy:iggy@{addr}?heartbeat_interval=5min")
+}
+
+/// Verifies that the producer can auto-reconnect and resume sending messages
+/// after the server is stopped and restarted.
+pub async fn run_producer(harness: &mut TestHarness) {
+    let conn_str = connection_string(harness);
+
+    let client = IggyClient::from_connection_string(&conn_str)
+        .expect("Failed to create client from connection string");
+    Client::connect(&client).await.expect("Failed to connect");
+
+    let producer = client
+        .producer(STREAM_NAME, TOPIC_NAME)
+        .expect("Failed to create producer builder")
+        .create_stream_if_not_exists()
+        .create_topic_if_not_exists(
+            1,
+            None,
+            IggyExpiry::NeverExpire,
+            MaxTopicSize::ServerDefault,
+        )
+        .send_retries(Some(10), Some(IggyDuration::from_str("2s").unwrap()))
+        .build();
+
+    producer
+        .init()
+        .await
+        .expect("Failed to initialize producer");
+
+    let msg = IggyMessage::from_str("before-restart").unwrap();
+    producer
+        .send(vec![msg])
+        .await
+        .expect("Initial send should succeed");
+
+    // Stop the server and try to send while it is down — this forces an error
+    // on the write path. Then start the server back up so the SDK's reconnect
+    // logic can re-establish the connection.
+    harness.server_mut().stop().expect("Failed to stop server");
+    sleep(Duration::from_secs(2)).await;
+
+    let send_handle = tokio::spawn(async move {
+        let msg = IggyMessage::from_str("after-restart").unwrap();
+        producer.send(vec![msg]).await
+    });
+
+    sleep(Duration::from_secs(1)).await;
+    harness
+        .server_mut()
+        .start()
+        .expect("Failed to start server");
+
+    let send_result = timeout(Duration::from_secs(60), send_handle)
+        .await
+        .expect("Timed out waiting for send after server restart")
+        .expect("Send task panicked");
+    send_result.expect("Send after server restart should succeed");
+
+    let poll_client = harness
+        .root_client()
+        .await
+        .expect("Failed to create polling client");
+
+    let polled = poll_client
+        .poll_messages(
+            &Identifier::named(STREAM_NAME).unwrap(),
+            &Identifier::named(TOPIC_NAME).unwrap(),
+            Some(0),
+            &Consumer::default(),
+            &PollingStrategy::offset(0),
+            10,
+            false,
+        )
+        .await
+        .expect("Failed to poll messages after restart");
+
+    assert!(
+        !polled.messages.is_empty(),
+        "Expected at least one message after server restart"
+    );
+}
+
+/// Verifies that the consumer can auto-reconnect and resume polling messages
+/// after the server is stopped and restarted.
+pub async fn run_consumer(harness: &mut TestHarness) {
+    let conn_str = connection_string(harness);
+
+    // Seed the stream/topic with messages before creating the consumer.
+    let setup_client = harness
+        .root_client()
+        .await
+        .expect("Failed to create setup client");
+
+    setup_client
+        .create_stream(STREAM_NAME)
+        .await
+        .expect("Failed to create stream");
+    setup_client
+        .create_topic(
+            &Identifier::named(STREAM_NAME).unwrap(),
+            TOPIC_NAME,
+            1,
+            Default::default(),
+            None,
+            IggyExpiry::NeverExpire,
+            MaxTopicSize::ServerDefault,
+        )
+        .await
+        .expect("Failed to create topic");
+
+    for i in 0..3u32 {
+        let msg = IggyMessage::from_str(&format!("pre-restart-{i}")).unwrap();
+        setup_client
+            .send_messages(
+                &Identifier::named(STREAM_NAME).unwrap(),
+                &Identifier::named(TOPIC_NAME).unwrap(),
+                &Partitioning::partition_id(0),
+                &mut [msg],
+            )
+            .await
+            .expect("Failed to send seed message");
+    }
+    drop(setup_client);
+
+    // Create the consumer client with a long heartbeat.
+    let client = IggyClient::from_connection_string(&conn_str)
+        .expect("Failed to create client from connection string");
+    Client::connect(&client).await.expect("Failed to connect");
+
+    let mut consumer = client
+        .consumer("test-consumer", STREAM_NAME, TOPIC_NAME, 0)
+        .expect("Failed to create consumer builder")
+        .polling_strategy(PollingStrategy::next())
+        .batch_length(10)
+        .poll_interval(IggyDuration::from_str("100ms").unwrap())
+        .polling_retry_interval(IggyDuration::from_str("500ms").unwrap())
+        .build();
+
+    consumer
+        .init()
+        .await
+        .expect("Failed to initialize consumer");
+
+    // Consume the pre-restart messages to confirm the consumer works.
+    let mut consumed_before = 0u32;
+    let deadline = tokio::time::Instant::now() + Duration::from_secs(10);
+    while consumed_before < 3 && tokio::time::Instant::now() < deadline {
+        let poll_result: Option<Result<ReceivedMessage, IggyError>> =
+            tokio::time::timeout(Duration::from_millis(500), consumer.next())
+                .await
+                .ok()
+                .flatten();
+        if let Some(Ok(_)) = poll_result {
+            consumed_before += 1;
+        }
+    }
+    assert_eq!(
+        consumed_before, 3,
+        "Should consume all pre-restart messages"
+    );
+
+    // Stop the server, wait for the connection to break, then restart.
+    harness.server_mut().stop().expect("Failed to stop server");
+    sleep(Duration::from_secs(2)).await;
+    harness
+        .server_mut()
+        .start()
+        .expect("Failed to start server");
+
+    // Send new messages after the server is back up.
+    let post_client = harness
+        .root_client()
+        .await
+        .expect("Failed to create post-restart client");
+    for i in 0..3u32 {
+        let msg = IggyMessage::from_str(&format!("post-restart-{i}")).unwrap();
+        post_client
+            .send_messages(
+                &Identifier::named(STREAM_NAME).unwrap(),
+                &Identifier::named(TOPIC_NAME).unwrap(),
+                &Partitioning::partition_id(0),
+                &mut [msg],
+            )
+            .await
+            .expect("Failed to send post-restart message");
+    }
+
+    // The consumer should auto-reconnect and resume polling.
+    let mut consumed_after = 0u32;
+    let deadline = tokio::time::Instant::now() + Duration::from_secs(30);
+    while consumed_after < 3 && tokio::time::Instant::now() < deadline {
+        let poll_result: Option<Result<ReceivedMessage, IggyError>> =
+            tokio::time::timeout(Duration::from_millis(500), consumer.next())
+                .await
+                .ok()
+                .flatten();
+        if let Some(Ok(_)) = poll_result {
+            consumed_after += 1;
+        }
+    }
+    assert_eq!(
+        consumed_after, 3,
+        "Should consume all post-restart messages after reconnect"
+    );
+}
diff --git a/core/integration/tests/server/specific.rs 
b/core/integration/tests/server/specific.rs
index ad569b1bd..4f5d33f4a 100644
--- a/core/integration/tests/server/specific.rs
+++ b/core/integration/tests/server/specific.rs
@@ -18,8 +18,8 @@
  */
 
 use crate::server::scenarios::{
-    message_size_scenario, segment_rotation_race_scenario, 
single_message_per_batch_scenario,
-    tcp_tls_scenario, websocket_tls_scenario,
+    message_size_scenario, reconnect_after_restart_scenario, 
segment_rotation_race_scenario,
+    single_message_per_batch_scenario, tcp_tls_scenario, 
websocket_tls_scenario,
 };
 use integration::iggy_harness;
 
@@ -60,6 +60,34 @@ async fn 
should_handle_single_message_per_batch_with_delayed_persistence(harness
     single_message_per_batch_scenario::run(harness, 5).await;
 }
 
+/// Tests that the producer can auto-reconnect and resume sending messages 
after the server is restarted.
+#[iggy_harness(
+    test_client_transport = [Tcp, WebSocket, Quic],
+    server(
+        tcp.socket.override_defaults = true,
+        tcp.socket.nodelay = true,
+        quic.max_idle_timeout = "500s",
+        quic.keep_alive_interval = "15s"
+    )
+)]
+async fn producer_reconnect_after_server_restart(harness: &mut TestHarness) {
+    reconnect_after_restart_scenario::run_producer(harness).await;
+}
+
+/// Tests that the consumer can auto-reconnect and resume polling messages 
after the server is restarted.
+#[iggy_harness(
+    test_client_transport = [Tcp, WebSocket, Quic],
+    server(
+        tcp.socket.override_defaults = true,
+        tcp.socket.nodelay = true,
+        quic.max_idle_timeout = "500s",
+        quic.keep_alive_interval = "15s"
+    )
+)]
+async fn consumer_reconnect_after_server_restart(harness: &mut TestHarness) {
+    reconnect_after_restart_scenario::run_consumer(harness).await;
+}
+
 /// This test configures the server to trigger frequent segment rotations and 
runs
 /// multiple concurrent producers across all protocols (TCP, HTTP, QUIC, 
WebSocket)
 /// to maximize the chance of hitting the race condition between 
persist_messages_to_disk
diff --git a/core/sdk/Cargo.toml b/core/sdk/Cargo.toml
index 8bb6863ea..7a6748ea9 100644
--- a/core/sdk/Cargo.toml
+++ b/core/sdk/Cargo.toml
@@ -17,7 +17,7 @@
 
 [package]
 name = "iggy"
-version = "0.9.1-edge.1"
+version = "0.9.1-edge.2"
 description = "Iggy is the persistent message streaming platform written in 
Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing 
millions of messages per second."
 edition = "2024"
 license = "Apache-2.0"
diff --git a/core/sdk/src/quic/quic_client.rs b/core/sdk/src/quic/quic_client.rs
index 9ca335765..0a2972f31 100644
--- a/core/sdk/src/quic/quic_client.rs
+++ b/core/sdk/src/quic/quic_client.rs
@@ -118,7 +118,13 @@ impl BinaryTransport for QuicClient {
         let error = result.unwrap_err();
         if !matches!(
             error,
-            IggyError::Disconnected | IggyError::EmptyResponse | 
IggyError::Unauthenticated
+            IggyError::Disconnected
+                | IggyError::EmptyResponse
+                | IggyError::Unauthenticated
+                | IggyError::StaleClient
+                | IggyError::NotConnected
+                | IggyError::CannotEstablishConnection
+                | IggyError::QuicError
         ) {
             return Err(error);
         }
diff --git a/core/sdk/src/tcp/tcp_client.rs b/core/sdk/src/tcp/tcp_client.rs
index 0ff62179c..01aa47449 100644
--- a/core/sdk/src/tcp/tcp_client.rs
+++ b/core/sdk/src/tcp/tcp_client.rs
@@ -122,6 +122,7 @@ impl BinaryTransport for TcpClient {
                 | IggyError::StaleClient
                 | IggyError::NotConnected
                 | IggyError::CannotEstablishConnection
+                | IggyError::TcpError
         ) {
             return Err(error);
         }
diff --git a/core/sdk/src/websocket/websocket_client.rs 
b/core/sdk/src/websocket/websocket_client.rs
index c64233daf..7da76d559 100644
--- a/core/sdk/src/websocket/websocket_client.rs
+++ b/core/sdk/src/websocket/websocket_client.rs
@@ -119,6 +119,12 @@ impl BinaryTransport for WebSocketClient {
                 | IggyError::EmptyResponse
                 | IggyError::Unauthenticated
                 | IggyError::StaleClient
+                | IggyError::NotConnected
+                | IggyError::CannotEstablishConnection
+                | IggyError::TcpError
+                | IggyError::ConnectionClosed
+                | IggyError::WebSocketSendError
+                | IggyError::WebSocketReceiveError
         ) {
             return Err(error);
         }

Reply via email to