This is an automated email from the ASF dual-hosted git repository.

numinnex pushed a commit to branch integration_tests
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit 8277a2bc8f65853e6d98a013f8a74961c8c76193
Author: numinex <[email protected]>
AuthorDate: Thu May 14 09:56:56 2026 +0200

    hello world test
---
 Cargo.lock                                |   1 +
 core/integration-vsr/tests/hello_world.rs |   2 +-
 core/message_bus/src/transports/quic.rs   |   2 +
 core/metadata/src/impls/metadata.rs       |  12 +
 core/metadata/src/stm/user.rs             |  42 +++-
 core/server-ng/Cargo.toml                 |   1 +
 core/server-ng/src/bootstrap.rs           | 357 +++++++++++++++++++++++++++---
 core/server-ng/src/login_register.rs      |   6 +-
 core/server-ng/src/session_manager.rs     |  35 +--
 9 files changed, 413 insertions(+), 45 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 0a6290bd8..ffa4b93ec 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -11340,6 +11340,7 @@ dependencies = [
  "async_zip",
  "axum",
  "axum-server",
+ "bytemuck",
  "bytes",
  "chrono",
  "clap",
diff --git a/core/integration-vsr/tests/hello_world.rs 
b/core/integration-vsr/tests/hello_world.rs
index bcb94f461..4e2af7d56 100644
--- a/core/integration-vsr/tests/hello_world.rs
+++ b/core/integration-vsr/tests/hello_world.rs
@@ -20,7 +20,7 @@ use iggy::prelude::*;
 use integration::iggy_harness;
 
 #[iggy_harness(
-    test_client_transport = [Tcp, Quic, WebSocket],
+    test_client_transport = [Tcp, WebSocket],
     server(executable_path = "iggy-server-ng")
 )]
 async fn hello_world(harness: &TestHarness) {
diff --git a/core/message_bus/src/transports/quic.rs 
b/core/message_bus/src/transports/quic.rs
index b4b13ffda..a464b102c 100644
--- a/core/message_bus/src/transports/quic.rs
+++ b/core/message_bus/src/transports/quic.rs
@@ -429,6 +429,8 @@ async fn reader_task(
             }
             Err(e) => {
                 debug!(%label, %peer, "quic reader: read error: {e:?}");
+                let _keep_in_tx_alive = &in_tx;
+                shutdown_fut.await;
                 return;
             }
         }
diff --git a/core/metadata/src/impls/metadata.rs 
b/core/metadata/src/impls/metadata.rs
index 3e5487b48..5ce922bd9 100644
--- a/core/metadata/src/impls/metadata.rs
+++ b/core/metadata/src/impls/metadata.rs
@@ -696,6 +696,10 @@ where
                     let _ = sender.send(reply.clone());
                 }
 
+                if prepare_header.operation == Operation::Register {
+                    continue;
+                }
+
                 let generic_reply = reply.into_generic();
                 let reply_buffers = freeze_client_reply(generic_reply);
                 emit_sim_event(SimEventKind::ClientReplyEmitted, &event);
@@ -842,6 +846,14 @@ where
             "submit_register_in_process: gate flipped between check and 
dispatch"
         );
         self.on_replicate(prepare).await;
+        let mut loopback = Vec::new();
+        consensus.drain_loopback_into(&mut loopback);
+        for message in loopback {
+            let prepare_ok: Message<PrepareOkHeader> = message
+                .try_into_typed()
+                .expect("metadata loopback queue must only contain PrepareOk 
messages");
+            self.on_ack(prepare_ok).await;
+        }
 
         match receiver.await {
             Ok(reply) => Ok(reply.header().commit),
diff --git a/core/metadata/src/stm/user.rs b/core/metadata/src/stm/user.rs
index adde8a848..7588e049a 100644
--- a/core/metadata/src/stm/user.rs
+++ b/core/metadata/src/stm/user.rs
@@ -21,7 +21,7 @@ use crate::stm::snapshot::Snapshotable;
 use crate::{collect_handlers, define_state, impl_fill_restore};
 use ahash::AHashMap;
 use bytes::Bytes;
-use iggy_binary_protocol::WireIdentifier;
+use iggy_binary_protocol::primitives::permissions::{WireGlobalPermissions, 
WirePermissions};
 use iggy_binary_protocol::requests::personal_access_tokens::{
     CreatePersonalAccessTokenRequest, DeletePersonalAccessTokenRequest,
 };
@@ -29,6 +29,7 @@ use iggy_binary_protocol::requests::users::{
     ChangePasswordRequest, CreateUserRequest, DeleteUserRequest, 
UpdatePermissionsRequest,
     UpdateUserRequest,
 };
+use iggy_binary_protocol::{WireIdentifier, WireName};
 use iggy_common::{
     GlobalPermissions, IggyExpiry, IggyTimestamp, Permissions, 
PersonalAccessToken,
     StreamPermissions, UserId, UserStatus,
@@ -121,6 +122,45 @@ impl UsersInner {
     }
 }
 
+impl Users {
+    #[must_use]
+    pub fn read<F, R>(&self, f: F) -> R
+    where
+        F: FnOnce(&UsersInner) -> R,
+    {
+        self.inner.read(f)
+    }
+
+    pub fn ensure_root_user(&self, username: &str, password_hash: &str) {
+        if self.read(|users| !users.items.is_empty()) {
+            return;
+        }
+
+        let username = WireName::new(username).expect("root username must be 
valid");
+        self.inner
+            .do_apply(UsersCommand::CreateUser(CreateUserRequest {
+                username,
+                password: password_hash.to_string(),
+                status: UserStatus::Active.as_code(),
+                permissions: Some(WirePermissions {
+                    global: WireGlobalPermissions {
+                        manage_servers: true,
+                        read_servers: true,
+                        manage_users: true,
+                        read_users: true,
+                        manage_streams: true,
+                        read_streams: true,
+                        manage_topics: true,
+                        read_topics: true,
+                        poll_messages: true,
+                        send_messages: true,
+                    },
+                    streams: Vec::new(),
+                }),
+            }));
+    }
+}
+
 // TODO(hubcio): Serialize proper reply (e.g. assigned user ID) instead of 
empty Bytes.
 impl StateHandler for CreateUserRequest {
     type State = UsersInner;
diff --git a/core/server-ng/Cargo.toml b/core/server-ng/Cargo.toml
index d11700a19..e11d58bff 100644
--- a/core/server-ng/Cargo.toml
+++ b/core/server-ng/Cargo.toml
@@ -100,6 +100,7 @@ async_zip = { workspace = true }
 axum = { workspace = true }
 axum-server = { workspace = true }
 bytes = { workspace = true }
+bytemuck = { workspace = true }
 chrono = { workspace = true }
 clap = { workspace = true }
 compio = { workspace = true }
diff --git a/core/server-ng/src/bootstrap.rs b/core/server-ng/src/bootstrap.rs
index 2c63fa753..0ad2e6b42 100644
--- a/core/server-ng/src/bootstrap.rs
+++ b/core/server-ng/src/bootstrap.rs
@@ -18,14 +18,21 @@
  */
 
 use crate::config_writer::write_current_config;
+use crate::login_register::LoginRegisterError;
 use crate::server_error::ServerNgError;
+use crate::session_manager::SessionManager;
+use bytes::Bytes;
 use configs::server_ng::ServerNgConfig;
 use consensus::{LocalPipeline, PartitionsHandle, Sequencer, VsrConsensus};
-use iggy_binary_protocol::RequestHeader;
+use iggy_binary_protocol::requests::users::{LoginRegisterRequest, 
LoginRegisterWithPatRequest};
+use iggy_binary_protocol::responses::users::LoginRegisterResponse;
+use iggy_binary_protocol::{
+    Command2, Message, Operation, ReplyHeader, RequestHeader, WireDecode, 
WireEncode,
+};
 use iggy_common::sharding::{IggyNamespace, PartitionLocation, ShardId};
 use iggy_common::{
-    ConsumerGroupOffsets, ConsumerOffsets, IggyByteSize, IggyError, 
PartitionStats, TopicStats,
-    sharding::LocalIdx, variadic,
+    ConsumerGroupOffsets, ConsumerOffsets, IggyByteSize, IggyError, 
IggyTimestamp, PartitionStats,
+    PersonalAccessToken, TopicStats, UserStatus, sharding::LocalIdx, variadic,
 };
 use journal::Journal;
 use journal::prepare_journal::PrepareJournal;
@@ -42,7 +49,7 @@ use message_bus::transports::tls::{
 };
 use message_bus::{
     AcceptedClientFn, AcceptedQuicClientFn, AcceptedReplicaFn, 
AcceptedTlsClientFn,
-    AcceptedWsClientFn, IggyMessageBus, connector,
+    AcceptedWsClientFn, IggyMessageBus, MessageBus, connector,
 };
 use metadata::IggyMetadata;
 use metadata::MuxStateMachine;
@@ -56,10 +63,13 @@ use partitions::{
     IggyIndexWriter, IggyPartition, IggyPartitions, MessagesWriter, 
PartitionsConfig, Segment,
 };
 // TODO: decouple bootstrap/storage helpers and logging from the `server` 
crate.
+use secrecy::ExposeSecret;
 use server::bootstrap::create_directories;
 use server::log::logger::Logging;
 use server::streaming::partitions::storage::{load_consumer_group_offsets, 
load_consumer_offsets};
 use server::streaming::segments::storage::create_segment_storage;
+use server::streaming::users::user::User as LegacyUser;
+use server::streaming::utils::crypto;
 use shard::builder::IggyShardBuilder;
 use shard::shards_table::PapayaShardsTable;
 use shard::{
@@ -220,6 +230,7 @@ pub async fn bootstrap(
     let recovered = 
recover::<ServerNgMuxStateMachine>(Path::new(&config.system.path))
         .await
         .map_err(ServerNgError::MetadataRecovery)?;
+    ensure_default_root_user(&recovered.mux_stm);
     let restored_op = recovered.last_applied_op.unwrap_or_else(|| {
         recovered
             .snapshot
@@ -1257,19 +1268,14 @@ fn make_replica_message_handler(shard: 
&Rc<ServerNgShard>) -> MessageHandler {
 
 fn make_client_request_handler(shard: &Rc<ServerNgShard>) -> RequestHandler {
     let shard = Rc::clone(shard);
+    let sessions = Rc::new(RefCell::new(SessionManager::new()));
     Rc::new(move |client_id, message| {
-        let request = match message.try_into_typed::<RequestHeader>() {
-            Ok(request) => request,
-            Err(error) => {
-                warn!(client_id, error = %error, "dropping client request with 
invalid header");
-                return;
-            }
-        };
-        let request = request.transmute_header(|header, new_header: &mut 
RequestHeader| {
-            *new_header = header;
-            new_header.client = client_id;
-        });
-        shard.dispatch(request.into_generic());
+        let shard = Rc::clone(&shard);
+        let sessions = Rc::clone(&sessions);
+        compio::runtime::spawn(async move {
+            handle_client_request(&shard, &sessions, client_id, message).await;
+        })
+        .detach();
     })
 }
 
@@ -1284,25 +1290,324 @@ fn make_deferred_replica_message_handler(shard_handle: 
&ServerNgShardHandle) ->
 
 fn make_deferred_client_request_handler(shard_handle: &ServerNgShardHandle) -> 
RequestHandler {
     let shard_handle = Rc::clone(shard_handle);
+    let sessions = Rc::new(RefCell::new(SessionManager::new()));
     Rc::new(move |client_id, message| {
-        let Some(shard) = upgrade_shard_handle(&shard_handle) else {
+        let shard_handle = Rc::clone(&shard_handle);
+        let sessions = Rc::clone(&sessions);
+        compio::runtime::spawn(async move {
+            let Some(shard) = upgrade_shard_handle(&shard_handle) else {
+                return;
+            };
+            handle_client_request(&shard, &sessions, client_id, message).await;
+        })
+        .detach();
+    })
+}
+
+#[allow(clippy::future_not_send)]
+async fn handle_client_request(
+    shard: &Rc<ServerNgShard>,
+    sessions: &Rc<RefCell<SessionManager>>,
+    transport_client_id: u128,
+    message: Message<iggy_binary_protocol::GenericHeader>,
+) {
+    let request = match message.try_into_typed::<RequestHeader>() {
+        Ok(request) => request,
+        Err(error) => {
+            warn!(
+                transport_client_id,
+                error = %error,
+                "dropping client request with invalid header"
+            );
             return;
-        };
-        let request = match message.try_into_typed::<RequestHeader>() {
-            Ok(request) => request,
+        }
+    };
+
+    ensure_transport_connection(shard, sessions, transport_client_id);
+
+    let header = *request.header();
+    if header.operation == Operation::Register && header.session == 0 && 
header.request == 0 {
+        handle_login_register_request(shard, sessions, transport_client_id, 
request).await;
+        return;
+    }
+
+    let bound = sessions.borrow().get_session(transport_client_id);
+    let request = request.transmute_header(|header, new_header: &mut 
RequestHeader| {
+        *new_header = header;
+        new_header.client = transport_client_id;
+        if let Some((bound_client_id, bound_session)) = bound {
+            new_header.client = bound_client_id;
+            new_header.session = bound_session;
+        }
+    });
+    shard.dispatch(request.into_generic());
+}
+
+fn ensure_transport_connection(
+    shard: &Rc<ServerNgShard>,
+    sessions: &Rc<RefCell<SessionManager>>,
+    transport_client_id: u128,
+) {
+    let Some(meta) = shard.bus.client_meta(transport_client_id) else {
+        return;
+    };
+    sessions
+        .borrow_mut()
+        .ensure_connection(transport_client_id, meta.peer_addr);
+}
+
+#[allow(clippy::future_not_send)]
+async fn handle_login_register_request(
+    shard: &Rc<ServerNgShard>,
+    sessions: &Rc<RefCell<SessionManager>>,
+    transport_client_id: u128,
+    request: Message<RequestHeader>,
+) {
+    let body = request_body(&request);
+    let vsr_client_id = request.header().client;
+
+    if let Ok(wire_request) = LoginRegisterRequest::decode_from(body) {
+        match verify_login_credentials(
+            shard,
+            wire_request.username.as_str(),
+            wire_request.password.expose_secret(),
+        ) {
+            Ok(user_id) => {
+                if let Err(error) = complete_login_register(
+                    shard,
+                    sessions,
+                    transport_client_id,
+                    vsr_client_id,
+                    request.header(),
+                    user_id,
+                )
+                .await
+                {
+                    warn!(transport_client_id, error = %error, "login/register 
failed");
+                }
+                return;
+            }
+            Err(LoginRegisterError::InvalidCredentials) => {}
             Err(error) => {
-                warn!(client_id, error = %error, "dropping client request with 
invalid header");
+                warn!(transport_client_id, error = %error, "login/register 
failed");
                 return;
             }
+        }
+    }
+
+    if let Ok(wire_request) = LoginRegisterWithPatRequest::decode_from(body) {
+        match verify_pat_credentials(shard, 
wire_request.token.expose_secret()) {
+            Ok(user_id) => {
+                if let Err(error) = complete_login_register(
+                    shard,
+                    sessions,
+                    transport_client_id,
+                    vsr_client_id,
+                    request.header(),
+                    user_id,
+                )
+                .await
+                {
+                    warn!(
+                        transport_client_id,
+                        error = %error,
+                        "login/register with PAT failed"
+                    );
+                }
+                return;
+            }
+            Err(error) => {
+                warn!(
+                    transport_client_id,
+                    error = %error,
+                    "login/register with PAT failed"
+                );
+                return;
+            }
+        }
+    }
+
+    warn!(
+        transport_client_id,
+        "dropping register request with unsupported payload shape"
+    );
+}
+
+fn request_body(request: &Message<RequestHeader>) -> &[u8] {
+    
&request.as_slice()[std::mem::size_of::<RequestHeader>()..request.header().size 
as usize]
+}
+
+fn verify_login_credentials(
+    shard: &Rc<ServerNgShard>,
+    username: &str,
+    password: &str,
+) -> Result<u32, LoginRegisterError> {
+    shard.plane.inner().0.mux_stm.inner().0.read(|users| {
+        let Some((_, user_id)) = users
+            .index
+            .iter()
+            .find(|(name, _)| name.as_ref() == username)
+        else {
+            return Err(LoginRegisterError::InvalidCredentials);
         };
-        let request = request.transmute_header(|header, new_header: &mut 
RequestHeader| {
-            *new_header = header;
-            new_header.client = client_id;
-        });
-        shard.dispatch(request.into_generic());
+        let Some(user) = users.items.get(*user_id as usize) else {
+            return Err(LoginRegisterError::InvalidCredentials);
+        };
+        if user.status != UserStatus::Active {
+            return Err(LoginRegisterError::UserInactive);
+        }
+        if !crypto::verify_password(password, user.password_hash.as_ref()) {
+            return Err(LoginRegisterError::InvalidCredentials);
+        }
+        Ok(user.id)
     })
 }
 
+fn verify_pat_credentials(
+    shard: &Rc<ServerNgShard>,
+    token: &str,
+) -> Result<u32, LoginRegisterError> {
+    let token_hash = PersonalAccessToken::hash_token(token);
+    let now = IggyTimestamp::now();
+    shard.plane.inner().0.mux_stm.inner().0.read(|users| {
+        let Some((user_id, pat)) =
+            users
+                .personal_access_tokens
+                .iter()
+                .find_map(|(user_id, tokens)| {
+                    tokens
+                        .values()
+                        .find(|pat| pat.token.as_ref() == token_hash)
+                        .map(|pat| (*user_id, pat))
+                })
+        else {
+            return Err(LoginRegisterError::InvalidToken);
+        };
+        if pat.is_expired(now) {
+            return Err(LoginRegisterError::InvalidToken);
+        }
+        let Some(user) = users.items.get(user_id as usize) else {
+            return Err(LoginRegisterError::InvalidToken);
+        };
+        if user.status != UserStatus::Active {
+            return Err(LoginRegisterError::UserInactive);
+        }
+        Ok(user.id)
+    })
+}
+
+#[allow(clippy::future_not_send)]
+async fn complete_login_register(
+    shard: &Rc<ServerNgShard>,
+    sessions: &Rc<RefCell<SessionManager>>,
+    transport_client_id: u128,
+    vsr_client_id: u128,
+    request_header: &RequestHeader,
+    user_id: u32,
+) -> Result<(), LoginRegisterError> {
+    if let Some((_, session)) = 
sessions.borrow().get_session(transport_client_id) {
+        let response = LoginRegisterResponse { user_id, session }.to_bytes();
+        let reply = build_login_register_reply(request_header, vsr_client_id, 
session, &response);
+        let _ = shard
+            .bus
+            .send_to_client(transport_client_id, 
reply.into_generic().into_frozen())
+            .await;
+        return Ok(());
+    }
+
+    {
+        let mut sessions = sessions.borrow_mut();
+        sessions
+            .login(transport_client_id, user_id)
+            .map_err(LoginRegisterError::Session)?;
+    }
+
+    let session = match shard
+        .plane
+        .inner()
+        .0
+        .submit_register_in_process(vsr_client_id)
+        .await
+    {
+        Ok(session) => session,
+        Err(error) => {
+            let _ = sessions
+                .borrow_mut()
+                .reset_to_connected(transport_client_id);
+            return Err(LoginRegisterError::Transient(error));
+        }
+    };
+
+    {
+        let mut sessions = sessions.borrow_mut();
+        sessions
+            .bind_session(transport_client_id, vsr_client_id, session)
+            .map_err(LoginRegisterError::Session)?;
+    }
+
+    let response = LoginRegisterResponse { user_id, session }.to_bytes();
+    let reply = build_login_register_reply(request_header, vsr_client_id, 
session, &response);
+    if let Err(error) = shard
+        .bus
+        .send_to_client(transport_client_id, 
reply.into_generic().into_frozen())
+        .await
+    {
+        warn!(
+            transport_client_id,
+            error = %error,
+            "failed to send login/register reply"
+        );
+    }
+
+    Ok(())
+}
+
+fn ensure_default_root_user(mux_stm: &ServerNgMuxStateMachine) {
+    if !mux_stm.inner().0.read(|users| users.items.is_empty()) {
+        return;
+    }
+
+    let LegacyUser {
+        username, password, ..
+    } = server::bootstrap::create_root_user();
+    mux_stm.inner().0.ensure_root_user(&username, &password);
+}
+
+fn build_login_register_reply(
+    request_header: &RequestHeader,
+    client_id: u128,
+    session: u64,
+    body: &Bytes,
+) -> Message<ReplyHeader> {
+    let total_size = std::mem::size_of::<ReplyHeader>() + body.len();
+    let mut reply = Message::<ReplyHeader>::new(total_size);
+    let header = bytemuck::checked::try_from_bytes_mut::<ReplyHeader>(
+        &mut reply.as_mut_slice()[..std::mem::size_of::<ReplyHeader>()],
+    )
+    .expect("zeroed bytes are valid");
+    *header = ReplyHeader {
+        cluster: request_header.cluster,
+        size: total_size as u32,
+        view: request_header.view,
+        release: request_header.release,
+        command: Command2::Reply,
+        replica: request_header.replica,
+        request_checksum: request_header.request_checksum,
+        client: client_id,
+        op: session,
+        commit: session,
+        timestamp: request_header.timestamp,
+        request: request_header.request,
+        operation: request_header.operation,
+        namespace: request_header.namespace,
+        ..Default::default()
+    };
+    if !body.is_empty() {
+        
reply.as_mut_slice()[std::mem::size_of::<ReplyHeader>()..total_size].copy_from_slice(body);
+    }
+    reply
+}
+
 fn upgrade_shard_handle(shard_handle: &ServerNgShardHandle) -> 
Option<Rc<ServerNgShard>> {
     shard_handle
         .borrow()
diff --git a/core/server-ng/src/login_register.rs 
b/core/server-ng/src/login_register.rs
index f152421ff..e2a7c34ef 100644
--- a/core/server-ng/src/login_register.rs
+++ b/core/server-ng/src/login_register.rs
@@ -75,7 +75,7 @@ pub async fn handle_login_register<V, B, J, S, M>(
     verifier: &V,
     metadata: &LoginMetadata<'_, B, J, S, M>,
     session_manager: &mut SessionManager,
-    connection_id: u64,
+    connection_id: u128,
 ) -> Result<LoginRegisterResponse, LoginRegisterError>
 where
     V: CredentialVerifier,
@@ -117,7 +117,7 @@ pub async fn handle_login_register_with_pat<T, B, J, S, M>(
     token_verifier: &T,
     metadata: &LoginMetadata<'_, B, J, S, M>,
     session_manager: &mut SessionManager,
-    connection_id: u64,
+    connection_id: u128,
 ) -> Result<LoginRegisterResponse, LoginRegisterError>
 where
     T: TokenVerifier,
@@ -156,7 +156,7 @@ async fn complete_register<B, J, S, M>(
     user_id: u32,
     metadata: &LoginMetadata<'_, B, J, S, M>,
     session_manager: &mut SessionManager,
-    connection_id: u64,
+    connection_id: u128,
 ) -> Result<LoginRegisterResponse, LoginRegisterError>
 where
     B: MessageBus,
diff --git a/core/server-ng/src/session_manager.rs 
b/core/server-ng/src/session_manager.rs
index 0527c45e3..894c5adc0 100644
--- a/core/server-ng/src/session_manager.rs
+++ b/core/server-ng/src/session_manager.rs
@@ -77,11 +77,11 @@ pub struct Connection {
 ///   per consensus session). If a client reconnects with the same `client_id`,
 ///   the old connection must be evicted first.
 pub struct SessionManager {
-    connections: HashMap<u64, Connection>,
+    connections: HashMap<u128, Connection>,
     /// Reverse index: `client_id` → `connection_id` for fast lookup when
     /// a consensus reply arrives and needs routing to the right connection.
-    client_to_connection: HashMap<u128, u64>,
-    next_connection_id: u64,
+    client_to_connection: HashMap<u128, u128>,
+    next_connection_id: u128,
 }
 
 impl SessionManager {
@@ -98,12 +98,12 @@ impl SessionManager {
     ///
     /// # Panics
     /// Panics if the connection ID counter overflows `u64::MAX`.
-    pub fn add_connection(&mut self, address: SocketAddr) -> u64 {
+    pub fn add_connection(&mut self, address: SocketAddr) -> u128 {
         let id = self.next_connection_id;
         self.next_connection_id = self
             .next_connection_id
             .checked_add(1)
-            .expect("connection ID overflow (u64::MAX connections without 
restart)");
+            .expect("connection ID overflow (u128::MAX connections without 
restart)");
         self.connections.insert(
             id,
             Connection {
@@ -114,8 +114,15 @@ impl SessionManager {
         id
     }
 
+    pub fn ensure_connection(&mut self, connection_id: u128, address: 
SocketAddr) {
+        self.connections.entry(connection_id).or_insert(Connection {
+            address,
+            state: ConnectionState::Connected,
+        });
+    }
+
     /// Remove a connection (disconnect). Cleans up the reverse index if bound.
-    pub fn remove_connection(&mut self, connection_id: u64) {
+    pub fn remove_connection(&mut self, connection_id: u128) {
         if let Some(conn) = self.connections.remove(&connection_id)
             && let ConnectionState::Bound { client_id, .. } = conn.state
         {
@@ -127,7 +134,7 @@ impl SessionManager {
     ///
     /// # Errors
     /// Returns `Err` if the connection doesn't exist or isn't in `Connected` 
state.
-    pub fn login(&mut self, connection_id: u64, user_id: u32) -> Result<(), 
SessionError> {
+    pub fn login(&mut self, connection_id: u128, user_id: u32) -> Result<(), 
SessionError> {
         let conn = self
             .connections
             .get_mut(&connection_id)
@@ -153,7 +160,7 @@ impl SessionManager {
     ///
     /// # Errors
     /// Returns `Err` if the connection doesn't exist or isn't `Authenticated`.
-    pub fn reset_to_connected(&mut self, connection_id: u64) -> Result<(), 
SessionError> {
+    pub fn reset_to_connected(&mut self, connection_id: u128) -> Result<(), 
SessionError> {
         let conn = self
             .connections
             .get_mut(&connection_id)
@@ -188,7 +195,7 @@ impl SessionManager {
     /// (impossible in single-threaded use).
     pub fn bind_session(
         &mut self,
-        connection_id: u64,
+        connection_id: u128,
         client_id: u128,
         session: u64,
     ) -> Result<(), SessionError> {
@@ -227,7 +234,7 @@ impl SessionManager {
     ///
     /// Returns `(client_id, session)` if the connection is `Bound`, `None` 
otherwise.
     #[must_use]
-    pub fn get_session(&self, connection_id: u64) -> Option<(u128, u64)> {
+    pub fn get_session(&self, connection_id: u128) -> Option<(u128, u64)> {
         let conn = self.connections.get(&connection_id)?;
         match conn.state {
             ConnectionState::Bound {
@@ -239,13 +246,13 @@ impl SessionManager {
 
     /// Look up the connection ID for a client (for routing consensus replies).
     #[must_use]
-    pub fn connection_for_client(&self, client_id: u128) -> Option<u64> {
+    pub fn connection_for_client(&self, client_id: u128) -> Option<u128> {
         self.client_to_connection.get(&client_id).copied()
     }
 
     /// Get connection metadata.
     #[must_use]
-    pub fn get_connection(&self, connection_id: u64) -> Option<&Connection> {
+    pub fn get_connection(&self, connection_id: u128) -> Option<&Connection> {
         self.connections.get(&connection_id)
     }
 
@@ -270,9 +277,9 @@ impl Default for SessionManager {
 
 #[derive(Debug)]
 pub enum SessionError {
-    ConnectionNotFound(u64),
+    ConnectionNotFound(u128),
     InvalidTransition {
-        connection_id: u64,
+        connection_id: u128,
         from: &'static str,
         to: &'static str,
     },

Reply via email to