This is an automated email from the ASF dual-hosted git repository.

numinnex pushed a commit to branch integration_tests
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/integration_tests by this push:
     new 795edc6b3 review nits
795edc6b3 is described below

commit 795edc6b3b716a7fa56fdcdf5d528da82d00670d
Author: numinex <[email protected]>
AuthorDate: Fri May 15 17:58:15 2026 +0200

    review nits
---
 Cargo.lock                                         | 10 ----
 Cargo.toml                                         |  1 -
 core/integration-vsr/Cargo.toml                    | 30 ------------
 core/integration/Cargo.toml                        |  1 +
 core/integration/src/harness/handle/server.rs      | 22 +++++++--
 .../src/harness/orchestrator/builder.rs            | 25 +++++++++-
 .../src/harness/orchestrator/harness.rs            | 53 ++++++++++++++++++++++
 core/integration/src/harness/port_reserver.rs      |  8 ++++
 core/integration/tests/mod.rs                      |  8 ++++
 .../tests => integration/tests/sdk}/hello_world.rs | 13 ++++--
 core/integration/tests/sdk/mod.rs                  |  2 +
 core/message_bus/src/client_listener/quic.rs       |  2 +-
 core/message_bus/src/client_listener/tcp.rs        |  2 +-
 core/message_bus/src/client_listener/tcp_tls.rs    |  2 +-
 core/message_bus/src/client_listener/ws.rs         |  2 +-
 core/message_bus/src/client_listener/wss.rs        |  2 +-
 core/message_bus/src/replica/io.rs                 | 10 ++--
 core/message_bus/tests/graceful_shutdown.rs        |  4 +-
 core/message_bus/tests/quic_client_roundtrip.rs    |  4 +-
 core/message_bus/tests/tcp_client_roundtrip.rs     |  4 +-
 core/message_bus/tests/tcp_tls_client_listener.rs  |  4 +-
 core/message_bus/tests/ws_client_roundtrip.rs      |  4 +-
 core/message_bus/tests/wss_client_listener.rs      |  4 +-
 core/metadata/src/stm/user.rs                      |  5 ++
 core/sdk/src/quic/quic_client.rs                   |  2 +-
 core/sdk/src/websocket/websocket_client.rs         |  2 +-
 core/server-ng/src/bootstrap.rs                    | 21 +++++----
 27 files changed, 165 insertions(+), 82 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index ffa4b93ec..5bf35b52d 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -7082,16 +7082,6 @@ dependencies = [
  "zip 8.6.0",
 ]
 
-[[package]]
-name = "integration-vsr"
-version = "0.0.1"
-dependencies = [
- "iggy",
- "integration",
- "serial_test",
- "tokio",
-]
-
 [[package]]
 name = "interpolate_name"
 version = "0.2.4"
diff --git a/Cargo.toml b/Cargo.toml
index 5b06d5f05..6c8cb8434 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -49,7 +49,6 @@ members = [
     "core/consensus",
     "core/harness_derive",
     "core/integration",
-    "core/integration-vsr",
     "core/journal",
     "core/message_bus",
     "core/metadata",
diff --git a/core/integration-vsr/Cargo.toml b/core/integration-vsr/Cargo.toml
deleted file mode 100644
index 4f956b2b9..000000000
--- a/core/integration-vsr/Cargo.toml
+++ /dev/null
@@ -1,30 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-[package]
-name = "integration-vsr"
-version = "0.0.1"
-edition = "2024"
-license = "Apache-2.0"
-publish = false
-
-[dependencies]
-iggy = { workspace = true, features = ["vsr"] }
-integration = { path = "../integration" }
-serial_test = { workspace = true }
-tokio = { workspace = true, features = ["full", "test-util"] }
-
diff --git a/core/integration/Cargo.toml b/core/integration/Cargo.toml
index 9795e41da..373b9268e 100644
--- a/core/integration/Cargo.toml
+++ b/core/integration/Cargo.toml
@@ -26,6 +26,7 @@ publish = false
 # inside the docker containers. This is a temporary workaround (hopefully).
 [features]
 ci-qemu = []
+vsr = ["iggy/vsr"]
 
 [dependencies]
 assert_cmd = { workspace = true }
diff --git a/core/integration/src/harness/handle/server.rs 
b/core/integration/src/harness/handle/server.rs
index 47839b84b..466d6808f 100644
--- a/core/integration/src/harness/handle/server.rs
+++ b/core/integration/src/harness/handle/server.rs
@@ -93,11 +93,23 @@ impl std::fmt::Debug for ServerHandle {
 }
 
 impl ServerHandle {
+    fn default_server_binary() -> &'static str {
+        #[cfg(feature = "vsr")]
+        {
+            "iggy-server-ng"
+        }
+
+        #[cfg(not(feature = "vsr"))]
+        {
+            "iggy-server"
+        }
+    }
+
     fn launched_binary(&self) -> String {
         if let Some(path) = &self.config.executable_path {
             path.clone()
         } else {
-            "iggy-server".to_string()
+            Self::default_server_binary().to_string()
         }
     }
 
@@ -796,16 +808,20 @@ impl TestBinary for ServerHandle {
                 Command::new(path)
             }
         } else {
-            Command::cargo_bin("iggy-server").map_err(|e| 
TestBinaryError::ProcessSpawn {
+            Command::cargo_bin(Self::default_server_binary()).map_err(|e| {
+                TestBinaryError::ProcessSpawn {
                 binary: launched_binary.clone(),
                 source: std::io::Error::other(e.to_string()),
+            }
             })?
         };
 
         command.env("IGGY_SYSTEM_PATH", data_path.display().to_string());
         command.envs(&self.envs);
 
-        // TODO(hubcio): Remove --follower flag when proper clustering is 
implemented
+        // Legacy clustering elects node 0 externally and requires explicit 
followers.
+        // VSR/server-ng elects its own primary and should see symmetric node 
startup.
+        #[cfg(not(feature = "vsr"))]
         if self.server_id > 0 {
             command.arg("--follower");
         }
diff --git a/core/integration/src/harness/orchestrator/builder.rs 
b/core/integration/src/harness/orchestrator/builder.rs
index 058bd6c96..0326f5853 100644
--- a/core/integration/src/harness/orchestrator/builder.rs
+++ b/core/integration/src/harness/orchestrator/builder.rs
@@ -250,7 +250,7 @@ fn build_servers(
         return Ok(Vec::new());
     };
 
-    let node_count = cluster_node_count.unwrap_or(1);
+    let node_count = 
cluster_node_count.unwrap_or_else(default_cluster_node_count);
 
     if node_count == 1 {
         return Ok(vec![ServerHandle::with_config(config, context.clone())]);
@@ -296,6 +296,18 @@ fn build_servers(
     Ok(servers)
 }
 
+fn default_cluster_node_count() -> usize {
+    #[cfg(feature = "vsr")]
+    {
+        3
+    }
+
+    #[cfg(not(feature = "vsr"))]
+    {
+        1
+    }
+}
+
 fn build_cluster_envs(
     node_index: usize,
     cluster_name: &str,
@@ -311,6 +323,11 @@ fn build_cluster_envs(
 
     envs.insert("IGGY_CLUSTER_ENABLED".to_string(), "true".to_string());
     envs.insert("IGGY_CLUSTER_NAME".to_string(), cluster_name.to_string());
+    #[cfg(feature = "vsr")]
+    envs.insert(
+        "IGGY_MESSAGE_BUS_RECONNECT_PERIOD".to_string(),
+        "100ms".to_string(),
+    );
     // Node identity is supplied via `--replica-id` on the command line by
     // ServerHandle::spawn; every cluster env var emitted here is identical
     // across all spawned servers.
@@ -329,6 +346,12 @@ fn build_cluster_envs(
                 tcp.port().to_string(),
             );
         }
+        if let Some(tcp_replica) = addrs.tcp_replica {
+            envs.insert(
+                format!("IGGY_CLUSTER_NODES_{i}_PORTS_TCP_REPLICA"),
+                tcp_replica.port().to_string(),
+            );
+        }
         if let Some(http) = addrs.http {
             envs.insert(
                 format!("IGGY_CLUSTER_NODES_{i}_PORTS_HTTP"),
diff --git a/core/integration/src/harness/orchestrator/harness.rs 
b/core/integration/src/harness/orchestrator/harness.rs
index 3018cb39f..5fc3ceb71 100644
--- a/core/integration/src/harness/orchestrator/harness.rs
+++ b/core/integration/src/harness/orchestrator/harness.rs
@@ -28,9 +28,15 @@ use crate::harness::handle::{
 use crate::harness::traits::{Restartable, TestBinary};
 use futures::executor::block_on;
 use iggy::prelude::{ClientWrapper, IggyClient};
+#[cfg(feature = "vsr")]
+use iggy_common::Client;
 use iggy_common::TransportProtocol;
 use std::path::Path;
 use std::sync::Arc;
+#[cfg(feature = "vsr")]
+use std::time::{Duration, Instant};
+#[cfg(feature = "vsr")]
+use tokio::time::{sleep, timeout};
 use wiremock::matchers::{method, path};
 use wiremock::{Mock, MockServer, ResponseTemplate};
 
@@ -152,6 +158,8 @@ impl TestHarness {
             server.start()?;
         }
 
+        self.wait_for_cluster_ready().await?;
+
         if let Some(seed_fn) = seed {
             let client = self.tcp_root_client().await?;
             seed_fn(client)
@@ -166,6 +174,51 @@ impl TestHarness {
         Ok(())
     }
 
+    async fn wait_for_cluster_ready(&self) -> Result<(), TestBinaryError> {
+        #[cfg(not(feature = "vsr"))]
+        {
+            Ok(())
+        }
+
+        #[cfg(feature = "vsr")]
+        {
+            if self.servers.len() <= 1 {
+                return Ok(());
+            }
+
+            const CLUSTER_READY_TIMEOUT: Duration = Duration::from_secs(15);
+            const CLUSTER_READY_RETRY_INTERVAL: Duration = 
Duration::from_millis(200);
+            const LOGIN_ATTEMPT_TIMEOUT: Duration = Duration::from_millis(750);
+
+            let deadline = Instant::now() + CLUSTER_READY_TIMEOUT;
+            let mut last_error = None;
+
+            while Instant::now() < deadline {
+                match timeout(LOGIN_ATTEMPT_TIMEOUT, 
self.tcp_root_client()).await {
+                    Ok(Ok(client)) => {
+                        let _ = client.disconnect().await;
+                        return Ok(());
+                    }
+                    Ok(Err(error)) => {
+                        last_error = Some(error.to_string());
+                        sleep(CLUSTER_READY_RETRY_INTERVAL).await;
+                    }
+                    Err(_) => {
+                        last_error = Some("login attempt timed 
out".to_string());
+                        sleep(CLUSTER_READY_RETRY_INTERVAL).await;
+                    }
+                }
+            }
+
+            Err(TestBinaryError::InvalidState {
+                message: format!(
+                    "Timed out waiting for VSR cluster readiness: {}",
+                    last_error.unwrap_or_else(|| "unknown error".to_string())
+                ),
+            })
+        }
+    }
+
     async fn start_dependents(&mut self) -> Result<(), TestBinaryError> {
         for server in &mut self.servers {
             server.start_dependents().await?;
diff --git a/core/integration/src/harness/port_reserver.rs 
b/core/integration/src/harness/port_reserver.rs
index cc59a6d02..8f658c179 100644
--- a/core/integration/src/harness/port_reserver.rs
+++ b/core/integration/src/harness/port_reserver.rs
@@ -155,6 +155,7 @@ impl ReservedPort {
 /// Pre-allocated ports for all enabled protocols.
 pub struct PortReserver {
     tcp: Option<ReservedPort>,
+    tcp_replica: Option<ReservedPort>,
     quic: Option<ReservedPort>,
     http: Option<ReservedPort>,
     websocket: Option<ReservedPort>,
@@ -184,6 +185,7 @@ impl SinglePortReserver {
 #[derive(Debug, Clone)]
 pub struct ProtocolAddresses {
     pub tcp: Option<SocketAddr>,
+    pub tcp_replica: Option<SocketAddr>,
     pub quic: Option<SocketAddr>,
     pub http: Option<SocketAddr>,
     pub websocket: Option<SocketAddr>,
@@ -226,6 +228,7 @@ impl PortReserver {
         config: &TestServerConfig,
     ) -> Result<Self, TestBinaryError> {
         let tcp = Some(ReservedPort::tcp(ip_kind)?);
+        let tcp_replica = Some(ReservedPort::tcp(ip_kind)?);
 
         let quic = if config.quic_enabled {
             Some(ReservedPort::udp(ip_kind)?)
@@ -247,6 +250,7 @@ impl PortReserver {
 
         Ok(Self {
             tcp,
+            tcp_replica,
             quic,
             http,
             websocket,
@@ -257,6 +261,7 @@ impl PortReserver {
     pub fn addresses(&self) -> ProtocolAddresses {
         ProtocolAddresses {
             tcp: self.tcp.as_ref().map(ReservedPort::addr),
+            tcp_replica: self.tcp_replica.as_ref().map(ReservedPort::addr),
             quic: self.quic.as_ref().map(ReservedPort::addr),
             http: self.http.as_ref().map(ReservedPort::addr),
             websocket: self.websocket.as_ref().map(ReservedPort::addr),
@@ -268,6 +273,9 @@ impl PortReserver {
         if let Some(tcp) = self.tcp {
             tcp.release();
         }
+        if let Some(tcp_replica) = self.tcp_replica {
+            tcp_replica.release();
+        }
         if let Some(quic) = self.quic {
             quic.release();
         }
diff --git a/core/integration/tests/mod.rs b/core/integration/tests/mod.rs
index 7c543a333..4163aa175 100644
--- a/core/integration/tests/mod.rs
+++ b/core/integration/tests/mod.rs
@@ -31,14 +31,22 @@ use tracing_subscriber::layer::SubscriberExt;
 use tracing_subscriber::util::SubscriberInitExt;
 use tracing_subscriber::{EnvFilter, fmt};
 
+#[cfg(not(feature = "vsr"))]
 mod cli;
+#[cfg(not(feature = "vsr"))]
 mod cluster;
+#[cfg(not(feature = "vsr"))]
 mod config_provider;
+#[cfg(not(feature = "vsr"))]
 mod connectors;
+#[cfg(not(feature = "vsr"))]
 mod data_integrity;
+#[cfg(not(feature = "vsr"))]
 mod mcp;
 mod sdk;
+#[cfg(not(feature = "vsr"))]
 mod server;
+#[cfg(not(feature = "vsr"))]
 mod state;
 
 lazy_static! {
diff --git a/core/integration-vsr/tests/hello_world.rs 
b/core/integration/tests/sdk/hello_world.rs
similarity index 80%
rename from core/integration-vsr/tests/hello_world.rs
rename to core/integration/tests/sdk/hello_world.rs
index 4e2af7d56..0c3746c21 100644
--- a/core/integration-vsr/tests/hello_world.rs
+++ b/core/integration/tests/sdk/hello_world.rs
@@ -19,10 +19,15 @@
 use iggy::prelude::*;
 use integration::iggy_harness;
 
-#[iggy_harness(
-    test_client_transport = [Tcp, WebSocket],
-    server(executable_path = "iggy-server-ng")
-)]
+#[cfg(not(feature = "vsr"))]
+#[iggy_harness]
+async fn hello_world(harness: &TestHarness) {
+    let client = harness.root_client().await.unwrap();
+    client.ping().await.unwrap();
+}
+
+#[cfg(feature = "vsr")]
+#[iggy_harness(test_client_transport = [Tcp, WebSocket])]
 async fn hello_world(harness: &TestHarness) {
     let client = harness.new_client().await.unwrap();
     client
diff --git a/core/integration/tests/sdk/mod.rs 
b/core/integration/tests/sdk/mod.rs
index 6d94bfa61..09a6a8697 100644
--- a/core/integration/tests/sdk/mod.rs
+++ b/core/integration/tests/sdk/mod.rs
@@ -16,4 +16,6 @@
  * under the License.
  */
 
+mod hello_world;
+#[cfg(not(feature = "vsr"))]
 mod producer;
diff --git a/core/message_bus/src/client_listener/quic.rs 
b/core/message_bus/src/client_listener/quic.rs
index a4e191a20..806ec502a 100644
--- a/core/message_bus/src/client_listener/quic.rs
+++ b/core/message_bus/src/client_listener/quic.rs
@@ -61,7 +61,7 @@ use tracing::{debug, error, info};
 ///
 /// Returns [`IggyError::CannotBindToSocket`] if the bind fails.
 #[allow(clippy::future_not_send)]
-pub async fn bind(
+pub fn bind(
     addr: SocketAddr,
     server_config: ServerConfig,
 ) -> Result<(Endpoint, SocketAddr), IggyError> {
diff --git a/core/message_bus/src/client_listener/tcp.rs 
b/core/message_bus/src/client_listener/tcp.rs
index a79368c40..8edbc1c53 100644
--- a/core/message_bus/src/client_listener/tcp.rs
+++ b/core/message_bus/src/client_listener/tcp.rs
@@ -42,7 +42,7 @@ use tracing::{debug, error, info};
 ///
 /// Returns [`IggyError::CannotBindToSocket`] if the bind fails.
 #[allow(clippy::future_not_send)]
-pub async fn bind(addr: SocketAddr) -> Result<(TcpListener, SocketAddr), 
IggyError> {
+pub fn bind(addr: SocketAddr) -> Result<(TcpListener, SocketAddr), IggyError> {
     let listener = bind_reusable_tcp_listener(addr)
         .map_err(|_| IggyError::CannotBindToSocket(addr.to_string()))?;
     let actual = listener
diff --git a/core/message_bus/src/client_listener/tcp_tls.rs 
b/core/message_bus/src/client_listener/tcp_tls.rs
index 9fe199bab..fce2f8ca2 100644
--- a/core/message_bus/src/client_listener/tcp_tls.rs
+++ b/core/message_bus/src/client_listener/tcp_tls.rs
@@ -71,7 +71,7 @@ use tracing::{debug, error, info};
 ///   from `credentials` (cert / key mismatch).
 /// - [`IggyError::CannotBindToSocket`] if the TCP bind fails.
 #[allow(clippy::future_not_send)]
-pub async fn bind(
+pub fn bind(
     addr: SocketAddr,
     credentials: TlsServerCredentials,
 ) -> Result<(TcpListener, Arc<rustls::ServerConfig>, SocketAddr), IggyError> {
diff --git a/core/message_bus/src/client_listener/ws.rs 
b/core/message_bus/src/client_listener/ws.rs
index 961bf8e7e..f6863e9e8 100644
--- a/core/message_bus/src/client_listener/ws.rs
+++ b/core/message_bus/src/client_listener/ws.rs
@@ -56,7 +56,7 @@ use tracing::{debug, error, info};
 ///
 /// Returns [`IggyError::CannotBindToSocket`] if the bind fails.
 #[allow(clippy::future_not_send)]
-pub async fn bind(addr: SocketAddr) -> Result<(TcpListener, SocketAddr), 
IggyError> {
+pub fn bind(addr: SocketAddr) -> Result<(TcpListener, SocketAddr), IggyError> {
     let listener = bind_reusable_tcp_listener(addr)
         .map_err(|_| IggyError::CannotBindToSocket(addr.to_string()))?;
     let actual = listener
diff --git a/core/message_bus/src/client_listener/wss.rs 
b/core/message_bus/src/client_listener/wss.rs
index 96cfd5415..f686c65f8 100644
--- a/core/message_bus/src/client_listener/wss.rs
+++ b/core/message_bus/src/client_listener/wss.rs
@@ -66,7 +66,7 @@ use tracing::{debug, error, info};
 ///   from `credentials` (cert / key mismatch).
 /// - [`IggyError::CannotBindToSocket`] if the TCP bind fails.
 #[allow(clippy::future_not_send)]
-pub async fn bind(
+pub fn bind(
     addr: SocketAddr,
     credentials: TlsServerCredentials,
 ) -> Result<(TcpListener, Arc<rustls::ServerConfig>, SocketAddr), IggyError> {
diff --git a/core/message_bus/src/replica/io.rs 
b/core/message_bus/src/replica/io.rs
index 3e5da9fbc..0cde3bb12 100644
--- a/core/message_bus/src/replica/io.rs
+++ b/core/message_bus/src/replica/io.rs
@@ -207,7 +207,7 @@ pub async fn start_on_shard_zero(
     );
 
     let (replica_listener, replica_bound) = 
bind_replica_listener(replica_listen_addr).await?;
-    let (clients_listener, client_bound) = 
client_listener::tcp::bind(client_listen_addr).await?;
+    let (clients_listener, client_bound) = 
client_listener::tcp::bind(client_listen_addr)?;
 
     let token_for_replica = bus.token();
     let on_accepted_replica_for_listener = on_accepted_replica.clone();
@@ -236,7 +236,7 @@ pub async fn start_on_shard_zero(
 
     let ws_bound = match (ws_listen_addr, on_accepted_ws_client) {
         (Some(addr), Some(on_accepted_ws)) => {
-            let (ws_listener, ws_bound) = 
client_listener::ws::bind(addr).await?;
+            let (ws_listener, ws_bound) = client_listener::ws::bind(addr)?;
             let token_for_ws = bus.token();
             let ws_handle = compio::runtime::spawn(async move {
                 client_listener::ws::run(ws_listener, token_for_ws, 
on_accepted_ws).await;
@@ -257,7 +257,7 @@ pub async fn start_on_shard_zero(
                     .map_err(|e| {
                         IggyError::IoError(format!("QUIC server config build 
failed: {e}"))
                     })?;
-            let (endpoint, quic_bound) = client_listener::quic::bind(addr, 
server_config).await?;
+            let (endpoint, quic_bound) = client_listener::quic::bind(addr, 
server_config)?;
             let token_for_quic = bus.token();
             let handshake_grace = bus.config().handshake_grace;
             let quic_handle = compio::runtime::spawn(async move {
@@ -285,7 +285,7 @@ pub async fn start_on_shard_zero(
     ) {
         (Some(addr), Some(creds), Some(on_accepted_tls)) => {
             let (listener, server_config, tls_bound) =
-                client_listener::tcp_tls::bind(addr, creds).await?;
+                client_listener::tcp_tls::bind(addr, creds)?;
             let token_for_tls = bus.token();
             let tls_handle = compio::runtime::spawn(async move {
                 client_listener::tcp_tls::run(
@@ -308,7 +308,7 @@ pub async fn start_on_shard_zero(
     let wss_bound = match (wss_listen_addr, wss_credentials, 
on_accepted_wss_client) {
         (Some(addr), Some(creds), Some(on_accepted_wss)) => {
             let (listener, server_config, wss_bound) =
-                client_listener::wss::bind(addr, creds).await?;
+                client_listener::wss::bind(addr, creds)?;
             let token_for_wss = bus.token();
             let wss_handle = compio::runtime::spawn(async move {
                 client_listener::wss::run(listener, server_config, 
token_for_wss, on_accepted_wss)
diff --git a/core/message_bus/tests/graceful_shutdown.rs 
b/core/message_bus/tests/graceful_shutdown.rs
index 18038039c..6d9cf3b00 100644
--- a/core/message_bus/tests/graceful_shutdown.rs
+++ b/core/message_bus/tests/graceful_shutdown.rs
@@ -32,7 +32,7 @@ use std::time::Duration;
 async fn drains_all_clients_within_timeout() {
     let bus = Rc::new(IggyMessageBus::new(0));
     let on_request: RequestHandler = Rc::new(|_, _| {});
-    let (listener, addr) = bind(loopback()).await.unwrap();
+    let (listener, addr) = bind(loopback()).unwrap();
 
     let token = bus.token();
     let accept_delegate = install_clients_locally(bus.clone(), on_request);
@@ -87,7 +87,7 @@ async fn drains_all_clients_within_timeout() {
 async fn connection_drain_precedes_slow_background() {
     let bus = Rc::new(IggyMessageBus::new(0));
     let on_request: RequestHandler = Rc::new(|_, _| {});
-    let (listener, addr) = bind(loopback()).await.unwrap();
+    let (listener, addr) = bind(loopback()).unwrap();
 
     let token = bus.token();
     let accept_delegate = install_clients_locally(bus.clone(), on_request);
diff --git a/core/message_bus/tests/quic_client_roundtrip.rs 
b/core/message_bus/tests/quic_client_roundtrip.rs
index 2a42ba040..1feee8d9f 100644
--- a/core/message_bus/tests/quic_client_roundtrip.rs
+++ b/core/message_bus/tests/quic_client_roundtrip.rs
@@ -83,7 +83,7 @@ async fn request_reply_round_trip() {
     let (cert, key) = self_signed();
     let server_cfg = server_config_with_cert(vec![cert.clone()], key, 
&QuicTuning::default())
         .expect("server config");
-    let (endpoint, server_addr) = bind(loopback(), 
server_cfg).await.expect("bind");
+    let (endpoint, server_addr) = bind(loopback(), server_cfg).expect("bind");
 
     let token = bus.token();
     let on_accepted = install_quic_clients_locally(bus.clone(), on_request);
@@ -160,7 +160,7 @@ async fn slow_handshake_does_not_block_subsequent_accept() {
     let (cert, key) = self_signed();
     let server_cfg = server_config_with_cert(vec![cert.clone()], key, 
&QuicTuning::default())
         .expect("server config");
-    let (endpoint, server_addr) = bind(loopback(), 
server_cfg).await.expect("bind");
+    let (endpoint, server_addr) = bind(loopback(), server_cfg).expect("bind");
 
     let token = bus.token();
     let on_accepted = install_quic_clients_locally(bus.clone(), on_request);
diff --git a/core/message_bus/tests/tcp_client_roundtrip.rs 
b/core/message_bus/tests/tcp_client_roundtrip.rs
index 6c8cd5985..b6a22cdf0 100644
--- a/core/message_bus/tests/tcp_client_roundtrip.rs
+++ b/core/message_bus/tests/tcp_client_roundtrip.rs
@@ -49,7 +49,7 @@ async fn request_reply_round_trip() {
         .detach();
     });
 
-    let (listener, addr) = bind(loopback()).await.expect("bind");
+    let (listener, addr) = bind(loopback()).expect("bind");
     let token = bus.token();
     let accept_delegate = install_clients_locally(bus.clone(), on_request);
     let accept_handle = compio::runtime::spawn(async move {
@@ -88,7 +88,7 @@ async fn unexpected_command_is_ignored() {
         let _ = tx.try_send(());
     });
 
-    let (listener, addr) = bind(loopback()).await.unwrap();
+    let (listener, addr) = bind(loopback()).unwrap();
     let token = bus.token();
     let accept_delegate = install_clients_locally(bus.clone(), on_request);
     let accept_handle = compio::runtime::spawn(async move {
diff --git a/core/message_bus/tests/tcp_tls_client_listener.rs 
b/core/message_bus/tests/tcp_tls_client_listener.rs
index 1621e9923..b217229b2 100644
--- a/core/message_bus/tests/tcp_tls_client_listener.rs
+++ b/core/message_bus/tests/tcp_tls_client_listener.rs
@@ -68,7 +68,7 @@ async fn tcp_tls_client_listener_accepts_and_round_trips() {
     let creds = self_signed_for_loopback();
     let cert_chain = creds.cert_chain.clone();
     let (listener, server_cfg, server_addr) =
-        bind(loopback(), creds).await.expect("tls listener bind");
+        bind(loopback(), creds).expect("tls listener bind");
     let token = bus.token();
     let on_accepted = install_tls_clients_locally(Rc::clone(&bus), on_request);
     let accept_handle = compio::runtime::spawn(async move {
@@ -149,7 +149,7 @@ async fn slow_tls_handshake_evicts_registry() {
 
     let creds = self_signed_for_loopback();
     let (listener, server_cfg, server_addr) =
-        bind(loopback(), creds).await.expect("tls listener bind");
+        bind(loopback(), creds).expect("tls listener bind");
     let token = bus.token();
     let on_accepted = install_tls_clients_locally(Rc::clone(&bus), on_request);
     let accept_handle = compio::runtime::spawn(async move {
diff --git a/core/message_bus/tests/ws_client_roundtrip.rs 
b/core/message_bus/tests/ws_client_roundtrip.rs
index e936e3394..951c109e9 100644
--- a/core/message_bus/tests/ws_client_roundtrip.rs
+++ b/core/message_bus/tests/ws_client_roundtrip.rs
@@ -67,7 +67,7 @@ async fn handshake_succeeds_and_round_trip_completes() {
         .detach();
     });
 
-    let (listener, server_addr) = bind(loopback()).await.expect("bind");
+    let (listener, server_addr) = bind(loopback()).expect("bind");
     let token = bus.token();
     let on_accepted = install_ws_clients_locally(bus.clone(), on_request);
     let accept_handle = compio::runtime::spawn(async move {
@@ -124,7 +124,7 @@ async fn handshake_succeeds_without_subprotocol_header() {
     let bus = Rc::new(IggyMessageBus::new(0));
     let on_request: RequestHandler = Rc::new(|_, _| {});
 
-    let (listener, server_addr) = bind(loopback()).await.expect("bind");
+    let (listener, server_addr) = bind(loopback()).expect("bind");
     let token = bus.token();
     let on_accepted = install_ws_clients_locally(bus.clone(), on_request);
     let accept_handle = compio::runtime::spawn(async move {
diff --git a/core/message_bus/tests/wss_client_listener.rs 
b/core/message_bus/tests/wss_client_listener.rs
index cfed3e03a..01a25ef18 100644
--- a/core/message_bus/tests/wss_client_listener.rs
+++ b/core/message_bus/tests/wss_client_listener.rs
@@ -69,7 +69,7 @@ async fn wss_client_listener_accepts_and_round_trips() {
     let creds = self_signed_for_loopback();
     let cert_chain = creds.cert_chain.clone();
     let (listener, server_cfg, server_addr) =
-        bind(loopback(), creds).await.expect("wss listener bind");
+        bind(loopback(), creds).expect("wss listener bind");
     let token = bus.token();
     let on_accepted = install_wss_clients_locally(Rc::clone(&bus), on_request);
     let accept_handle = compio::runtime::spawn(async move {
@@ -148,7 +148,7 @@ async fn slow_handshake_evicts_registry() {
 
     let creds = self_signed_for_loopback();
     let (listener, server_cfg, server_addr) =
-        bind(loopback(), creds).await.expect("wss listener bind");
+        bind(loopback(), creds).expect("wss listener bind");
     let token = bus.token();
     let on_accepted = install_wss_clients_locally(Rc::clone(&bus), on_request);
     let accept_handle = compio::runtime::spawn(async move {
diff --git a/core/metadata/src/stm/user.rs b/core/metadata/src/stm/user.rs
index 7588e049a..705f48bdb 100644
--- a/core/metadata/src/stm/user.rs
+++ b/core/metadata/src/stm/user.rs
@@ -131,6 +131,11 @@ impl Users {
         self.inner.read(f)
     }
 
+    /// Ensures a root user exists in an empty user set.
+    ///
+    /// # Panics
+    ///
+    /// Panics if `username` is not a valid wire-format username.
     pub fn ensure_root_user(&self, username: &str, password_hash: &str) {
         if self.read(|users| !users.items.is_empty()) {
             return;
diff --git a/core/sdk/src/quic/quic_client.rs b/core/sdk/src/quic/quic_client.rs
index 7217bc319..a4d4a590b 100644
--- a/core/sdk/src/quic/quic_client.rs
+++ b/core/sdk/src/quic/quic_client.rs
@@ -270,7 +270,7 @@ impl QuicClient {
 
         #[cfg(feature = "vsr")]
         {
-            return crate::vsr::decode_response(&buffer);
+            crate::vsr::decode_response(&buffer)
         }
 
         #[cfg(not(feature = "vsr"))]
diff --git a/core/sdk/src/websocket/websocket_client.rs 
b/core/sdk/src/websocket/websocket_client.rs
index 9199d648b..395c76884 100644
--- a/core/sdk/src/websocket/websocket_client.rs
+++ b/core/sdk/src/websocket/websocket_client.rs
@@ -651,7 +651,7 @@ impl WebSocketClient {
                 response.put_slice(&response_body);
             }
 
-            return crate::vsr::decode_response(&response.freeze());
+            crate::vsr::decode_response(&response.freeze())
         }
 
         #[cfg(not(feature = "vsr"))]
diff --git a/core/server-ng/src/bootstrap.rs b/core/server-ng/src/bootstrap.rs
index 0ad2e6b42..bedd73393 100644
--- a/core/server-ng/src/bootstrap.rs
+++ b/core/server-ng/src/bootstrap.rs
@@ -1223,7 +1223,7 @@ async fn start_manual_runtime(
         None
     };
 
-    let bound_clients = start_client_listeners(shard, config, topology, 
&accepted_clients).await?;
+    let bound_clients = start_client_listeners(shard, config, topology, 
&accepted_clients)?;
     write_current_config(
         config,
         Some(topology.self_replica_id),
@@ -1505,7 +1505,13 @@ async fn complete_login_register(
     request_header: &RequestHeader,
     user_id: u32,
 ) -> Result<(), LoginRegisterError> {
-    if let Some((_, session)) = 
sessions.borrow().get_session(transport_client_id) {
+    let existing_session = {
+        let sessions = sessions.borrow();
+        sessions
+            .get_session(transport_client_id)
+            .map(|(_, session)| session)
+    };
+    if let Some(session) = existing_session {
         let response = LoginRegisterResponse { user_id, session }.to_bytes();
         let reply = build_login_register_reply(request_header, vsr_client_id, 
session, &response);
         let _ = shard
@@ -1581,13 +1587,14 @@ fn build_login_register_reply(
 ) -> Message<ReplyHeader> {
     let total_size = std::mem::size_of::<ReplyHeader>() + body.len();
     let mut reply = Message::<ReplyHeader>::new(total_size);
+    let header_size = u32::try_from(total_size).expect("reply size must fit 
into u32");
     let header = bytemuck::checked::try_from_bytes_mut::<ReplyHeader>(
         &mut reply.as_mut_slice()[..std::mem::size_of::<ReplyHeader>()],
     )
     .expect("zeroed bytes are valid");
     *header = ReplyHeader {
         cluster: request_header.cluster,
-        size: total_size as u32,
+        size: header_size,
         view: request_header.view,
         release: request_header.release,
         command: Command2::Reply,
@@ -1747,7 +1754,7 @@ fn mint_client_meta(
     ClientConnMeta::new((shard_id << 112) | seq, peer_addr, transport)
 }
 
-async fn start_client_listeners(
+fn start_client_listeners(
     shard: &Rc<ServerNgShard>,
     config: &ServerNgConfig,
     topology: &TcpTopology,
@@ -1757,7 +1764,6 @@ async fn start_client_listeners(
 
     if config.tcp.enabled && !config.tcp.tls.enabled {
         let (listener, bound_addr) = 
client_listener::tcp::bind(topology.client_listen_addr)
-            .await
             .map_err(|source| {
                 error!(
                     addr = %topology.client_listen_addr,
@@ -1776,8 +1782,7 @@ async fn start_client_listeners(
     }
 
     if let Some(ws_addr) = topology.ws_listen_addr {
-        let (listener, bound_addr) =
-            client_listener::ws::bind(ws_addr).await.map_err(|source| {
+        let (listener, bound_addr) = 
client_listener::ws::bind(ws_addr).map_err(|source| {
                 error!(addr = %ws_addr, error = %source, "failed to bind 
websocket listener");
                 source
             })?;
@@ -1805,7 +1810,6 @@ async fn start_client_listeners(
             source
         })?;
         let (endpoint, bound_addr) = client_listener::quic::bind(quic_addr, 
server_config)
-            .await
             .map_err(|source| {
                 error!(addr = %quic_addr, error = %source, "failed to bind 
QUIC listener");
                 source
@@ -1824,7 +1828,6 @@ async fn start_client_listeners(
         let credentials = load_tcp_tls_server_credentials(config)?;
         let (listener, tls_config, bound_addr) =
             client_listener::tcp_tls::bind(topology.client_listen_addr, 
credentials)
-                .await
                 .map_err(|source| {
                     error!(
                         addr = %topology.client_listen_addr,

Reply via email to