This is an automated email from the ASF dual-hosted git repository.

piotr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/master by this push:
     new 315bfb098 fix(rust): auto-reconnect on transport errors after server 
restart (#2880)
315bfb098 is described below

commit 315bfb098df673bb0b5ee9446f09128b5c24f5ba
Author: Piotr Gankiewicz <[email protected]>
AuthorDate: Mon Mar 9 12:31:40 2026 +0100

    fix(rust): auto-reconnect on transport errors after server restart (#2880)
    
    The SDK's send_raw_with_response() only triggered reconnect for a
    subset of errors (Disconnected, EmptyResponse, Unauthenticated,
    StaleClient). Transport-specific write errors like TcpError,
    ConnectionClosed, WebSocketSendError, and QuicError were not in
    the list, causing producers and consumers to loop forever on a
    dead connection instead of reconnecting.
    
    Added all transport-specific errors to the reconnect match in
    TCP, WebSocket, and QUIC clients. TLS connections use the same
    error types and code path, so they are also covered.
    
    Integration tests verify both producer and consumer reconnect
    across all three stateful transports (TCP, WebSocket, QUIC).
---
 Cargo.lock                                         |   2 +-
 Cargo.toml                                         |   2 +-
 DEPENDENCIES.md                                    |   2 +-
 core/integration/tests/server/scenarios/mod.rs     |   1 +
 .../scenarios/reconnect_after_restart_scenario.rs  | 226 +++++++++++++++++++++
 core/integration/tests/server/specific.rs          |  30 ++-
 core/sdk/Cargo.toml                                |   2 +-
 core/sdk/src/quic/quic_client.rs                   |   8 +-
 core/sdk/src/tcp/tcp_client.rs                     |   1 +
 core/sdk/src/websocket/websocket_client.rs         |   6 +
 10 files changed, 273 insertions(+), 7 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 9bd245b86..23b34e3a8 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -5011,7 +5011,7 @@ checksum = 
"cd62e6b5e86ea8eeeb8db1de02880a6abc01a397b2ebb64b5d74ac255318f5cb"
 
 [[package]]
 name = "iggy"
-version = "0.9.1-edge.1"
+version = "0.9.1-edge.2"
 dependencies = [
  "async-broadcast",
  "async-dropper",
diff --git a/Cargo.toml b/Cargo.toml
index d2bbfe567..e0140f2b2 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -157,7 +157,7 @@ humantime = "2.3.0"
 hwlocality = "1.0.0-alpha.11"
 iceberg = "0.8.0"
 iceberg-catalog-rest = "0.8.0"
-iggy = { path = "core/sdk", version = "0.9.1-edge.1" }
+iggy = { path = "core/sdk", version = "0.9.1-edge.2" }
 iggy_binary_protocol = { path = "core/binary_protocol", version = 
"0.9.1-edge.1" }
 iggy_common = { path = "core/common", version = "0.9.1-edge.1" }
 iggy_connector_sdk = { path = "core/connectors/sdk", version = "0.2.1-edge.1" }
diff --git a/DEPENDENCIES.md b/DEPENDENCIES.md
index e3f450a7e..31c1302ff 100644
--- a/DEPENDENCIES.md
+++ b/DEPENDENCIES.md
@@ -439,7 +439,7 @@ ident_case: 1.0.1, "Apache-2.0 OR MIT",
 idna: 1.1.0, "Apache-2.0 OR MIT",
 idna_adapter: 1.2.1, "Apache-2.0 OR MIT",
 if_chain: 1.0.3, "Apache-2.0 OR MIT",
-iggy: 0.9.1-edge.1, "Apache-2.0",
+iggy: 0.9.1-edge.2, "Apache-2.0",
 iggy-bench: 0.4.1-edge.1, "Apache-2.0",
 iggy-bench-dashboard-server: 0.6.2-edge.1, "Apache-2.0",
 iggy-cli: 0.11.1-edge.1, "Apache-2.0",
diff --git a/core/integration/tests/server/scenarios/mod.rs 
b/core/integration/tests/server/scenarios/mod.rs
index 5a3b3f7a4..fb510234f 100644
--- a/core/integration/tests/server/scenarios/mod.rs
+++ b/core/integration/tests/server/scenarios/mod.rs
@@ -38,6 +38,7 @@ pub mod offset_scenario;
 pub mod permissions_scenario;
 pub mod purge_delete_scenario;
 pub mod read_during_persistence_scenario;
+pub mod reconnect_after_restart_scenario;
 pub mod segment_rotation_race_scenario;
 pub mod single_message_per_batch_scenario;
 pub mod snapshot_scenario;
diff --git 
a/core/integration/tests/server/scenarios/reconnect_after_restart_scenario.rs 
b/core/integration/tests/server/scenarios/reconnect_after_restart_scenario.rs
new file mode 100644
index 000000000..2d2a967f7
--- /dev/null
+++ 
b/core/integration/tests/server/scenarios/reconnect_after_restart_scenario.rs
@@ -0,0 +1,226 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use futures::StreamExt;
+use iggy::prelude::*;
+use iggy_common::TransportProtocol;
+use integration::harness::{TestBinary, TestHarness};
+use std::str::FromStr;
+use tokio::time::{Duration, sleep, timeout};
+
+const STREAM_NAME: &str = "test-reconnect-stream";
+const TOPIC_NAME: &str = "test-reconnect-topic";
+
+pub async fn run_producer(harness: &mut TestHarness) {
+    let client = create_client(harness);
+    Client::connect(&client).await.expect("Failed to connect");
+
+    let producer = client
+        .producer(STREAM_NAME, TOPIC_NAME)
+        .expect("Failed to create producer builder")
+        .create_stream_if_not_exists()
+        .create_topic_if_not_exists(
+            1,
+            None,
+            IggyExpiry::NeverExpire,
+            MaxTopicSize::ServerDefault,
+        )
+        .send_retries(Some(10), Some(IggyDuration::from_str("2s").unwrap()))
+        .build();
+
+    producer
+        .init()
+        .await
+        .expect("Failed to initialize producer");
+
+    let msg = IggyMessage::from_str("before-restart").unwrap();
+    producer
+        .send(vec![msg])
+        .await
+        .expect("Initial send should succeed");
+
+    // Stop the server, attempt a send while it is down, then restart.
+    // The SDK should auto-reconnect and deliver the message.
+    harness.server_mut().stop().expect("Failed to stop server");
+    sleep(Duration::from_secs(2)).await;
+
+    let send_handle = tokio::spawn(async move {
+        let msg = IggyMessage::from_str("after-restart").unwrap();
+        producer.send(vec![msg]).await
+    });
+
+    sleep(Duration::from_secs(1)).await;
+    harness
+        .server_mut()
+        .start()
+        .expect("Failed to start server");
+
+    let send_result = timeout(Duration::from_secs(60), send_handle)
+        .await
+        .expect("Timed out waiting for send after server restart")
+        .expect("Send task panicked");
+    send_result.expect("Send after server restart should succeed");
+
+    let poll_client = harness
+        .root_client()
+        .await
+        .expect("Failed to create polling client");
+
+    let polled = poll_client
+        .poll_messages(
+            &Identifier::named(STREAM_NAME).unwrap(),
+            &Identifier::named(TOPIC_NAME).unwrap(),
+            Some(0),
+            &Consumer::default(),
+            &PollingStrategy::offset(0),
+            10,
+            false,
+        )
+        .await
+        .expect("Failed to poll messages after restart");
+
+    assert!(
+        !polled.messages.is_empty(),
+        "Expected at least one message after server restart"
+    );
+}
+
+pub async fn run_consumer(harness: &mut TestHarness) {
+    let setup_client = harness
+        .root_client()
+        .await
+        .expect("Failed to create setup client");
+
+    setup_client
+        .create_stream(STREAM_NAME)
+        .await
+        .expect("Failed to create stream");
+    setup_client
+        .create_topic(
+            &Identifier::named(STREAM_NAME).unwrap(),
+            TOPIC_NAME,
+            1,
+            Default::default(),
+            None,
+            IggyExpiry::NeverExpire,
+            MaxTopicSize::ServerDefault,
+        )
+        .await
+        .expect("Failed to create topic");
+
+    send_messages(&setup_client, "pre-restart", 3).await;
+    drop(setup_client);
+
+    let client = create_client(harness);
+    Client::connect(&client).await.expect("Failed to connect");
+
+    let mut consumer = client
+        .consumer("test-consumer", STREAM_NAME, TOPIC_NAME, 0)
+        .expect("Failed to create consumer builder")
+        .polling_strategy(PollingStrategy::next())
+        .batch_length(10)
+        .poll_interval(IggyDuration::from_str("100ms").unwrap())
+        .polling_retry_interval(IggyDuration::from_str("500ms").unwrap())
+        .build();
+
+    consumer
+        .init()
+        .await
+        .expect("Failed to initialize consumer");
+
+    assert_eq!(
+        consume_messages(&mut consumer, 3, Duration::from_secs(10)).await,
+        3,
+        "Should consume all pre-restart messages"
+    );
+
+    harness.server_mut().stop().expect("Failed to stop server");
+    sleep(Duration::from_secs(2)).await;
+    harness
+        .server_mut()
+        .start()
+        .expect("Failed to start server");
+
+    let post_client = harness
+        .root_client()
+        .await
+        .expect("Failed to create post-restart client");
+    send_messages(&post_client, "post-restart", 3).await;
+
+    assert_eq!(
+        consume_messages(&mut consumer, 3, Duration::from_secs(30)).await,
+        3,
+        "Should consume all post-restart messages after reconnect"
+    );
+}
+
+fn create_client(harness: &TestHarness) -> IggyClient {
+    let transport = harness.transport().expect("No transport configured");
+    let server = harness.server();
+
+    let addr = match transport {
+        TransportProtocol::Tcp => server.tcp_addr().expect("TCP address not 
available"),
+        TransportProtocol::Quic => server.quic_addr().expect("QUIC address not 
available"),
+        TransportProtocol::WebSocket => server
+            .websocket_addr()
+            .expect("WebSocket address not available"),
+        TransportProtocol::Http => panic!("HTTP is stateless and does not 
support reconnect"),
+    };
+
+    let protocol_prefix = match transport {
+        TransportProtocol::Tcp => "iggy",
+        TransportProtocol::Quic => "iggy+quic",
+        TransportProtocol::WebSocket => "iggy+ws",
+        TransportProtocol::Http => unreachable!(),
+    };
+
+    let connection_string = 
format!("{protocol_prefix}://iggy:iggy@{addr}?heartbeat_interval=5min");
+    IggyClient::from_connection_string(&connection_string)
+        .expect("Failed to create client from connection string")
+}
+
+async fn send_messages(client: &IggyClient, prefix: &str, count: u32) {
+    for i in 0..count {
+        let msg = IggyMessage::from_str(&format!("{prefix}-{i}")).unwrap();
+        client
+            .send_messages(
+                &Identifier::named(STREAM_NAME).unwrap(),
+                &Identifier::named(TOPIC_NAME).unwrap(),
+                &Partitioning::partition_id(0),
+                &mut [msg],
+            )
+            .await
+            .expect("Failed to send message");
+    }
+}
+
+async fn consume_messages(consumer: &mut IggyConsumer, expected: u32, 
max_wait: Duration) -> u32 {
+    let mut consumed = 0u32;
+    let deadline = tokio::time::Instant::now() + max_wait;
+    while consumed < expected && tokio::time::Instant::now() < deadline {
+        let poll_result: Option<Result<ReceivedMessage, IggyError>> =
+            tokio::time::timeout(Duration::from_millis(500), consumer.next())
+                .await
+                .ok()
+                .flatten();
+        if let Some(Ok(_)) = poll_result {
+            consumed += 1;
+        }
+    }
+    consumed
+}
diff --git a/core/integration/tests/server/specific.rs 
b/core/integration/tests/server/specific.rs
index ad569b1bd..2a105939d 100644
--- a/core/integration/tests/server/specific.rs
+++ b/core/integration/tests/server/specific.rs
@@ -18,8 +18,8 @@
  */
 
 use crate::server::scenarios::{
-    message_size_scenario, segment_rotation_race_scenario, 
single_message_per_batch_scenario,
-    tcp_tls_scenario, websocket_tls_scenario,
+    message_size_scenario, reconnect_after_restart_scenario, 
segment_rotation_race_scenario,
+    single_message_per_batch_scenario, tcp_tls_scenario, 
websocket_tls_scenario,
 };
 use integration::iggy_harness;
 
@@ -60,6 +60,32 @@ async fn 
should_handle_single_message_per_batch_with_delayed_persistence(harness
     single_message_per_batch_scenario::run(harness, 5).await;
 }
 
+#[iggy_harness(
+    test_client_transport = [Tcp, WebSocket, Quic],
+    server(
+        tcp.socket.override_defaults = true,
+        tcp.socket.nodelay = true,
+        quic.max_idle_timeout = "500s",
+        quic.keep_alive_interval = "15s"
+    )
+)]
+async fn producer_reconnect_after_server_restart(harness: &mut TestHarness) {
+    reconnect_after_restart_scenario::run_producer(harness).await;
+}
+
+#[iggy_harness(
+    test_client_transport = [Tcp, WebSocket, Quic],
+    server(
+        tcp.socket.override_defaults = true,
+        tcp.socket.nodelay = true,
+        quic.max_idle_timeout = "500s",
+        quic.keep_alive_interval = "15s"
+    )
+)]
+async fn consumer_reconnect_after_server_restart(harness: &mut TestHarness) {
+    reconnect_after_restart_scenario::run_consumer(harness).await;
+}
+
 /// This test configures the server to trigger frequent segment rotations and 
runs
 /// multiple concurrent producers across all protocols (TCP, HTTP, QUIC, 
WebSocket)
 /// to maximize the chance of hitting the race condition between 
persist_messages_to_disk
diff --git a/core/sdk/Cargo.toml b/core/sdk/Cargo.toml
index 8bb6863ea..7a6748ea9 100644
--- a/core/sdk/Cargo.toml
+++ b/core/sdk/Cargo.toml
@@ -17,7 +17,7 @@
 
 [package]
 name = "iggy"
-version = "0.9.1-edge.1"
+version = "0.9.1-edge.2"
 description = "Iggy is the persistent message streaming platform written in 
Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing 
millions of messages per second."
 edition = "2024"
 license = "Apache-2.0"
diff --git a/core/sdk/src/quic/quic_client.rs b/core/sdk/src/quic/quic_client.rs
index 9ca335765..0a2972f31 100644
--- a/core/sdk/src/quic/quic_client.rs
+++ b/core/sdk/src/quic/quic_client.rs
@@ -118,7 +118,13 @@ impl BinaryTransport for QuicClient {
         let error = result.unwrap_err();
         if !matches!(
             error,
-            IggyError::Disconnected | IggyError::EmptyResponse | 
IggyError::Unauthenticated
+            IggyError::Disconnected
+                | IggyError::EmptyResponse
+                | IggyError::Unauthenticated
+                | IggyError::StaleClient
+                | IggyError::NotConnected
+                | IggyError::CannotEstablishConnection
+                | IggyError::QuicError
         ) {
             return Err(error);
         }
diff --git a/core/sdk/src/tcp/tcp_client.rs b/core/sdk/src/tcp/tcp_client.rs
index 0ff62179c..01aa47449 100644
--- a/core/sdk/src/tcp/tcp_client.rs
+++ b/core/sdk/src/tcp/tcp_client.rs
@@ -122,6 +122,7 @@ impl BinaryTransport for TcpClient {
                 | IggyError::StaleClient
                 | IggyError::NotConnected
                 | IggyError::CannotEstablishConnection
+                | IggyError::TcpError
         ) {
             return Err(error);
         }
diff --git a/core/sdk/src/websocket/websocket_client.rs 
b/core/sdk/src/websocket/websocket_client.rs
index c64233daf..7da76d559 100644
--- a/core/sdk/src/websocket/websocket_client.rs
+++ b/core/sdk/src/websocket/websocket_client.rs
@@ -119,6 +119,12 @@ impl BinaryTransport for WebSocketClient {
                 | IggyError::EmptyResponse
                 | IggyError::Unauthenticated
                 | IggyError::StaleClient
+                | IggyError::NotConnected
+                | IggyError::CannotEstablishConnection
+                | IggyError::TcpError
+                | IggyError::ConnectionClosed
+                | IggyError::WebSocketSendError
+                | IggyError::WebSocketReceiveError
         ) {
             return Err(error);
         }

Reply via email to