This is an automated email from the ASF dual-hosted git repository. piotr pushed a commit to branch fix_rust_sdk_reconnection in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 861a948125ef92eac23460d1fafc5181be7cbab0 Author: spetz <[email protected]> AuthorDate: Thu Mar 5 19:48:28 2026 +0100 fix(rust): auto-reconnect on transport errors after server restart --- Cargo.lock | 2 +- Cargo.toml | 57 +---- DEPENDENCIES.md | 2 +- core/ai/mcp/Cargo.toml | 6 +- core/integration/Cargo.toml | 7 +- core/integration/tests/server/scenarios/mod.rs | 1 + .../scenarios/reconnect_after_restart_scenario.rs | 257 +++++++++++++++++++++ core/integration/tests/server/specific.rs | 32 ++- core/sdk/Cargo.toml | 2 +- core/sdk/src/quic/quic_client.rs | 8 +- core/sdk/src/tcp/tcp_client.rs | 1 + core/sdk/src/websocket/websocket_client.rs | 6 + 12 files changed, 314 insertions(+), 67 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9bd245b86..23b34e3a8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5011,7 +5011,7 @@ checksum = "cd62e6b5e86ea8eeeb8db1de02880a6abc01a397b2ebb64b5d74ac255318f5cb" [[package]] name = "iggy" -version = "0.9.1-edge.1" +version = "0.9.1-edge.2" dependencies = [ "async-broadcast", "async-dropper", diff --git a/Cargo.toml b/Cargo.toml index d2bbfe567..1f7af68e2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -96,19 +96,7 @@ clap_complete = "4.5.66" clock = { path = "core/clock" } colored = "3.1.1" comfy-table = "7.2.2" -compio = { version = "0.18.0", features = [ - "runtime", - "macros", - "io-uring", - "time", - "rustls", - "ring", - "net", - "quic", - "tls", - "ws", - "fs", -] } +compio = { version = "0.18.0", features = ["runtime", "macros", "io-uring", "time", "rustls", "ring", "net", "quic", "tls", "ws", "fs"] } # Pin compio-driver >= 0.11.2 to fix musl compilation (compio-rs/compio#668) compio-driver = "0.11.2" configs = { path = "core/configs", version = "0.1.0" } @@ -157,7 +145,7 @@ humantime = "2.3.0" hwlocality = "1.0.0-alpha.11" iceberg = "0.8.0" iceberg-catalog-rest = "0.8.0" -iggy = { path = "core/sdk", version = "0.9.1-edge.1" } +iggy = { path = "core/sdk", version = "0.9.1-edge.2" } iggy_binary_protocol = { path = "core/binary_protocol", version = "0.9.1-edge.1" } iggy_common = { path = "core/common", version = "0.9.1-edge.1" } iggy_connector_sdk = { path = "core/connectors/sdk", version = "0.2.1-edge.1" } @@ -185,22 +173,9 @@ octocrab = "0.49.5" once_cell = "1.21.3" opentelemetry = { version = "0.31.0", features = ["trace", "logs"] } opentelemetry-appender-tracing = { version = "0.31.1", features = ["log"] } -opentelemetry-otlp = { version = "0.31.0", features = [ - "logs", - "trace", - "grpc-tonic", - "http", - "http-proto", - "reqwest-client", -] } +opentelemetry-otlp = { version = "0.31.0", features = ["logs", "trace", "grpc-tonic", "http", "http-proto", "reqwest-client"] } opentelemetry-semantic-conventions = "0.31.0" -opentelemetry_sdk = { version = "0.31.0", features = [ - "logs", - "trace", - "experimental_async_runtime", - "experimental_logs_batch_log_processor_with_async_runtime", - "experimental_trace_batch_span_processor_with_async_runtime", -] } +opentelemetry_sdk = { version = "0.31.0", features = ["logs", "trace", "experimental_async_runtime", "experimental_logs_batch_log_processor_with_async_runtime", "experimental_trace_batch_span_processor_with_async_runtime"] } papaya = "0.2.3" parquet = "57.3.0" partitions = { path = "core/partitions" } @@ -245,13 +220,7 @@ server = { path = "core/server" } simd-json = { version = "0.17.0", features = ["serde_impl"] } slab = "0.4.12" socket2 = "0.6.2" -sqlx = { version = "0.8.6", features = [ - "runtime-tokio-rustls", - "postgres", - "chrono", - "uuid", - "json", -] } +sqlx = { version = "0.8.6", features = ["runtime-tokio-rustls", "postgres", "chrono", "uuid", "json"] } static-toml = "1.3.0" strum = { version = "0.27.2", features = ["derive"] } strum_macros = "0.27.2" @@ -270,11 +239,7 @@ tower-http = { version = "0.6.8", features = ["add-extension", "cors", "trace"] tracing = "0.1.44" tracing-appender = "0.2.4" tracing-opentelemetry = "0.32.1" -tracing-subscriber = { version = "0.3.22", default-features = false, features = [ - "fmt", - "env-filter", - "ansi", -] } +tracing-subscriber = { version = "0.3.22", default-features = false, features = ["fmt", "env-filter", "ansi"] } trait-variant = "0.1.2" tungstenite = "0.28.0" twox-hash = { version = "2.1.2", features = ["xxhash32"] } @@ -283,15 +248,7 @@ uuid = { version = "1.20.0", features = ["v4", "v7", "fast-rng", "serde", "zeroc vergen-git2 = { version = "9.1.0", features = ["build", "cargo", "rustc", "si"] } walkdir = "2.5.0" wasm-bindgen = "0.2" -web-sys = { version = "0.3", features = [ - "Window", - "Location", - "HtmlSelectElement", - "Clipboard", - "Navigator", - "ResizeObserver", - "ResizeObserverEntry", -] } +web-sys = { version = "0.3", features = ["Window", "Location", "HtmlSelectElement", "Clipboard", "Navigator", "ResizeObserver", "ResizeObserverEntry"] } webpki-roots = "1.0.6" yew = { version = "0.22", features = ["csr"] } yew-router = "0.19" diff --git a/DEPENDENCIES.md b/DEPENDENCIES.md index e3f450a7e..31c1302ff 100644 --- a/DEPENDENCIES.md +++ b/DEPENDENCIES.md @@ -439,7 +439,7 @@ ident_case: 1.0.1, "Apache-2.0 OR MIT", idna: 1.1.0, "Apache-2.0 OR MIT", idna_adapter: 1.2.1, "Apache-2.0 OR MIT", if_chain: 1.0.3, "Apache-2.0 OR MIT", -iggy: 0.9.1-edge.1, "Apache-2.0", +iggy: 0.9.1-edge.2, "Apache-2.0", iggy-bench: 0.4.1-edge.1, "Apache-2.0", iggy-bench-dashboard-server: 0.6.2-edge.1, "Apache-2.0", iggy-cli: 0.11.1-edge.1, "Apache-2.0", diff --git a/core/ai/mcp/Cargo.toml b/core/ai/mcp/Cargo.toml index 78046be4e..b2a6f40bb 100644 --- a/core/ai/mcp/Cargo.toml +++ b/core/ai/mcp/Cargo.toml @@ -42,11 +42,7 @@ opentelemetry-otlp = { workspace = true } opentelemetry-semantic-conventions = { workspace = true } opentelemetry_sdk = { workspace = true, features = ["rt-tokio"] } reqwest = { workspace = true } -rmcp = { workspace = true, features = [ - "server", - "transport-io", - "transport-streamable-http-server", -] } +rmcp = { workspace = true, features = ["server", "transport-io", "transport-streamable-http-server"] } serde = { workspace = true } serde_json = { workspace = true } socket2 = "0.6" diff --git a/core/integration/Cargo.toml b/core/integration/Cargo.toml index 13afb04dc..c76b6fe7f 100644 --- a/core/integration/Cargo.toml +++ b/core/integration/Cargo.toml @@ -54,12 +54,7 @@ rcgen = { workspace = true } reqwest = { workspace = true } reqwest-middleware = { workspace = true } reqwest-retry = { workspace = true } -rmcp = { workspace = true, features = [ - "client", - "reqwest", - "transport-streamable-http-client", - "transport-streamable-http-client-reqwest", -] } +rmcp = { workspace = true, features = ["client", "reqwest", "transport-streamable-http-client", "transport-streamable-http-client-reqwest"] } serde = { workspace = true } serde_json = { workspace = true } serial_test = { workspace = true } diff --git a/core/integration/tests/server/scenarios/mod.rs b/core/integration/tests/server/scenarios/mod.rs index 5a3b3f7a4..fb510234f 100644 --- a/core/integration/tests/server/scenarios/mod.rs +++ b/core/integration/tests/server/scenarios/mod.rs @@ -38,6 +38,7 @@ pub mod offset_scenario; pub mod permissions_scenario; pub mod purge_delete_scenario; pub mod read_during_persistence_scenario; +pub mod reconnect_after_restart_scenario; pub mod segment_rotation_race_scenario; pub mod single_message_per_batch_scenario; pub mod snapshot_scenario; diff --git a/core/integration/tests/server/scenarios/reconnect_after_restart_scenario.rs b/core/integration/tests/server/scenarios/reconnect_after_restart_scenario.rs new file mode 100644 index 000000000..67e8cc632 --- /dev/null +++ b/core/integration/tests/server/scenarios/reconnect_after_restart_scenario.rs @@ -0,0 +1,257 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use futures::StreamExt; +use iggy::prelude::*; +use iggy_common::TransportProtocol; +use integration::harness::{TestBinary, TestHarness}; +use std::str::FromStr; +use tokio::time::{Duration, sleep, timeout}; + +const STREAM_NAME: &str = "test-reconnect-stream"; +const TOPIC_NAME: &str = "test-reconnect-topic"; + +fn connection_string(harness: &TestHarness) -> String { + let transport = harness.transport().expect("No transport configured"); + let server = harness.server(); + + let addr = match transport { + TransportProtocol::Tcp => server.tcp_addr().expect("TCP address not available"), + TransportProtocol::Quic => server.quic_addr().expect("QUIC address not available"), + TransportProtocol::WebSocket => server + .websocket_addr() + .expect("WebSocket address not available"), + TransportProtocol::Http => panic!("HTTP is stateless and does not support reconnect"), + }; + + let protocol_prefix = match transport { + TransportProtocol::Tcp => "iggy", + TransportProtocol::Quic => "iggy+quic", + TransportProtocol::WebSocket => "iggy+ws", + TransportProtocol::Http => unreachable!(), + }; + + // Use a very long heartbeat so the client's send/poll is the first operation + // to hit the broken connection, exercising the write-path error handling + // rather than the heartbeat's read-based detection. + format!("{protocol_prefix}://iggy:iggy@{addr}?heartbeat_interval=5min") +} + +/// Verifies that the producer can auto-reconnect and resume sending messages +/// after the server is stopped and restarted. +pub async fn run_producer(harness: &mut TestHarness) { + let conn_str = connection_string(harness); + + let client = IggyClient::from_connection_string(&conn_str) + .expect("Failed to create client from connection string"); + Client::connect(&client).await.expect("Failed to connect"); + + let producer = client + .producer(STREAM_NAME, TOPIC_NAME) + .expect("Failed to create producer builder") + .create_stream_if_not_exists() + .create_topic_if_not_exists( + 1, + None, + IggyExpiry::NeverExpire, + MaxTopicSize::ServerDefault, + ) + .send_retries(Some(10), Some(IggyDuration::from_str("2s").unwrap())) + .build(); + + producer + .init() + .await + .expect("Failed to initialize producer"); + + let msg = IggyMessage::from_str("before-restart").unwrap(); + producer + .send(vec![msg]) + .await + .expect("Initial send should succeed"); + + // Stop the server and try to send while it is down — this forces an error + // on the write path. Then start the server back up so the SDK's reconnect + // logic can re-establish the connection. + harness.server_mut().stop().expect("Failed to stop server"); + sleep(Duration::from_secs(2)).await; + + let send_handle = tokio::spawn(async move { + let msg = IggyMessage::from_str("after-restart").unwrap(); + producer.send(vec![msg]).await + }); + + sleep(Duration::from_secs(1)).await; + harness + .server_mut() + .start() + .expect("Failed to start server"); + + let send_result = timeout(Duration::from_secs(60), send_handle) + .await + .expect("Timed out waiting for send after server restart") + .expect("Send task panicked"); + send_result.expect("Send after server restart should succeed"); + + let poll_client = harness + .root_client() + .await + .expect("Failed to create polling client"); + + let polled = poll_client + .poll_messages( + &Identifier::named(STREAM_NAME).unwrap(), + &Identifier::named(TOPIC_NAME).unwrap(), + Some(0), + &Consumer::default(), + &PollingStrategy::offset(0), + 10, + false, + ) + .await + .expect("Failed to poll messages after restart"); + + assert!( + !polled.messages.is_empty(), + "Expected at least one message after server restart" + ); +} + +/// Verifies that the consumer can auto-reconnect and resume polling messages +/// after the server is stopped and restarted. +pub async fn run_consumer(harness: &mut TestHarness) { + let conn_str = connection_string(harness); + + // Seed the stream/topic with messages before creating the consumer. + let setup_client = harness + .root_client() + .await + .expect("Failed to create setup client"); + + setup_client + .create_stream(STREAM_NAME) + .await + .expect("Failed to create stream"); + setup_client + .create_topic( + &Identifier::named(STREAM_NAME).unwrap(), + TOPIC_NAME, + 1, + Default::default(), + None, + IggyExpiry::NeverExpire, + MaxTopicSize::ServerDefault, + ) + .await + .expect("Failed to create topic"); + + for i in 0..3u32 { + let msg = IggyMessage::from_str(&format!("pre-restart-{i}")).unwrap(); + setup_client + .send_messages( + &Identifier::named(STREAM_NAME).unwrap(), + &Identifier::named(TOPIC_NAME).unwrap(), + &Partitioning::partition_id(0), + &mut [msg], + ) + .await + .expect("Failed to send seed message"); + } + drop(setup_client); + + // Create the consumer client with a long heartbeat. + let client = IggyClient::from_connection_string(&conn_str) + .expect("Failed to create client from connection string"); + Client::connect(&client).await.expect("Failed to connect"); + + let mut consumer = client + .consumer("test-consumer", STREAM_NAME, TOPIC_NAME, 0) + .expect("Failed to create consumer builder") + .polling_strategy(PollingStrategy::next()) + .batch_length(10) + .poll_interval(IggyDuration::from_str("100ms").unwrap()) + .polling_retry_interval(IggyDuration::from_str("500ms").unwrap()) + .build(); + + consumer + .init() + .await + .expect("Failed to initialize consumer"); + + // Consume the pre-restart messages to confirm the consumer works. + let mut consumed_before = 0u32; + let deadline = tokio::time::Instant::now() + Duration::from_secs(10); + while consumed_before < 3 && tokio::time::Instant::now() < deadline { + let poll_result: Option<Result<ReceivedMessage, IggyError>> = + tokio::time::timeout(Duration::from_millis(500), consumer.next()) + .await + .ok() + .flatten(); + if let Some(Ok(_)) = poll_result { + consumed_before += 1; + } + } + assert_eq!( + consumed_before, 3, + "Should consume all pre-restart messages" + ); + + // Stop the server, wait for the connection to break, then restart. + harness.server_mut().stop().expect("Failed to stop server"); + sleep(Duration::from_secs(2)).await; + harness + .server_mut() + .start() + .expect("Failed to start server"); + + // Send new messages after the server is back up. + let post_client = harness + .root_client() + .await + .expect("Failed to create post-restart client"); + for i in 0..3u32 { + let msg = IggyMessage::from_str(&format!("post-restart-{i}")).unwrap(); + post_client + .send_messages( + &Identifier::named(STREAM_NAME).unwrap(), + &Identifier::named(TOPIC_NAME).unwrap(), + &Partitioning::partition_id(0), + &mut [msg], + ) + .await + .expect("Failed to send post-restart message"); + } + + // The consumer should auto-reconnect and resume polling. + let mut consumed_after = 0u32; + let deadline = tokio::time::Instant::now() + Duration::from_secs(30); + while consumed_after < 3 && tokio::time::Instant::now() < deadline { + let poll_result: Option<Result<ReceivedMessage, IggyError>> = + tokio::time::timeout(Duration::from_millis(500), consumer.next()) + .await + .ok() + .flatten(); + if let Some(Ok(_)) = poll_result { + consumed_after += 1; + } + } + assert_eq!( + consumed_after, 3, + "Should consume all post-restart messages after reconnect" + ); +} diff --git a/core/integration/tests/server/specific.rs b/core/integration/tests/server/specific.rs index ad569b1bd..4f5d33f4a 100644 --- a/core/integration/tests/server/specific.rs +++ b/core/integration/tests/server/specific.rs @@ -18,8 +18,8 @@ */ use crate::server::scenarios::{ - message_size_scenario, segment_rotation_race_scenario, single_message_per_batch_scenario, - tcp_tls_scenario, websocket_tls_scenario, + message_size_scenario, reconnect_after_restart_scenario, segment_rotation_race_scenario, + single_message_per_batch_scenario, tcp_tls_scenario, websocket_tls_scenario, }; use integration::iggy_harness; @@ -60,6 +60,34 @@ async fn should_handle_single_message_per_batch_with_delayed_persistence(harness single_message_per_batch_scenario::run(harness, 5).await; } +/// Tests that the producer can auto-reconnect and resume sending messages after the server is restarted. +#[iggy_harness( + test_client_transport = [Tcp, WebSocket, Quic], + server( + tcp.socket.override_defaults = true, + tcp.socket.nodelay = true, + quic.max_idle_timeout = "500s", + quic.keep_alive_interval = "15s" + ) +)] +async fn producer_reconnect_after_server_restart(harness: &mut TestHarness) { + reconnect_after_restart_scenario::run_producer(harness).await; +} + +/// Tests that the consumer can auto-reconnect and resume polling messages after the server is restarted. +#[iggy_harness( + test_client_transport = [Tcp, WebSocket, Quic], + server( + tcp.socket.override_defaults = true, + tcp.socket.nodelay = true, + quic.max_idle_timeout = "500s", + quic.keep_alive_interval = "15s" + ) +)] +async fn consumer_reconnect_after_server_restart(harness: &mut TestHarness) { + reconnect_after_restart_scenario::run_consumer(harness).await; +} + /// This test configures the server to trigger frequent segment rotations and runs /// multiple concurrent producers across all protocols (TCP, HTTP, QUIC, WebSocket) /// to maximize the chance of hitting the race condition between persist_messages_to_disk diff --git a/core/sdk/Cargo.toml b/core/sdk/Cargo.toml index 8bb6863ea..7a6748ea9 100644 --- a/core/sdk/Cargo.toml +++ b/core/sdk/Cargo.toml @@ -17,7 +17,7 @@ [package] name = "iggy" -version = "0.9.1-edge.1" +version = "0.9.1-edge.2" description = "Iggy is the persistent message streaming platform written in Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing millions of messages per second." edition = "2024" license = "Apache-2.0" diff --git a/core/sdk/src/quic/quic_client.rs b/core/sdk/src/quic/quic_client.rs index 9ca335765..0a2972f31 100644 --- a/core/sdk/src/quic/quic_client.rs +++ b/core/sdk/src/quic/quic_client.rs @@ -118,7 +118,13 @@ impl BinaryTransport for QuicClient { let error = result.unwrap_err(); if !matches!( error, - IggyError::Disconnected | IggyError::EmptyResponse | IggyError::Unauthenticated + IggyError::Disconnected + | IggyError::EmptyResponse + | IggyError::Unauthenticated + | IggyError::StaleClient + | IggyError::NotConnected + | IggyError::CannotEstablishConnection + | IggyError::QuicError ) { return Err(error); } diff --git a/core/sdk/src/tcp/tcp_client.rs b/core/sdk/src/tcp/tcp_client.rs index 0ff62179c..01aa47449 100644 --- a/core/sdk/src/tcp/tcp_client.rs +++ b/core/sdk/src/tcp/tcp_client.rs @@ -122,6 +122,7 @@ impl BinaryTransport for TcpClient { | IggyError::StaleClient | IggyError::NotConnected | IggyError::CannotEstablishConnection + | IggyError::TcpError ) { return Err(error); } diff --git a/core/sdk/src/websocket/websocket_client.rs b/core/sdk/src/websocket/websocket_client.rs index c64233daf..7da76d559 100644 --- a/core/sdk/src/websocket/websocket_client.rs +++ b/core/sdk/src/websocket/websocket_client.rs @@ -119,6 +119,12 @@ impl BinaryTransport for WebSocketClient { | IggyError::EmptyResponse | IggyError::Unauthenticated | IggyError::StaleClient + | IggyError::NotConnected + | IggyError::CannotEstablishConnection + | IggyError::TcpError + | IggyError::ConnectionClosed + | IggyError::WebSocketSendError + | IggyError::WebSocketReceiveError ) { return Err(error); }
