This is an automated email from the ASF dual-hosted git repository. numinnex pushed a commit to branch integration_tests in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 8277a2bc8f65853e6d98a013f8a74961c8c76193 Author: numinex <[email protected]> AuthorDate: Thu May 14 09:56:56 2026 +0200 hello world test --- Cargo.lock | 1 + core/integration-vsr/tests/hello_world.rs | 2 +- core/message_bus/src/transports/quic.rs | 2 + core/metadata/src/impls/metadata.rs | 12 + core/metadata/src/stm/user.rs | 42 +++- core/server-ng/Cargo.toml | 1 + core/server-ng/src/bootstrap.rs | 357 +++++++++++++++++++++++++++--- core/server-ng/src/login_register.rs | 6 +- core/server-ng/src/session_manager.rs | 35 +-- 9 files changed, 413 insertions(+), 45 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0a6290bd8..ffa4b93ec 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -11340,6 +11340,7 @@ dependencies = [ "async_zip", "axum", "axum-server", + "bytemuck", "bytes", "chrono", "clap", diff --git a/core/integration-vsr/tests/hello_world.rs b/core/integration-vsr/tests/hello_world.rs index bcb94f461..4e2af7d56 100644 --- a/core/integration-vsr/tests/hello_world.rs +++ b/core/integration-vsr/tests/hello_world.rs @@ -20,7 +20,7 @@ use iggy::prelude::*; use integration::iggy_harness; #[iggy_harness( - test_client_transport = [Tcp, Quic, WebSocket], + test_client_transport = [Tcp, WebSocket], server(executable_path = "iggy-server-ng") )] async fn hello_world(harness: &TestHarness) { diff --git a/core/message_bus/src/transports/quic.rs b/core/message_bus/src/transports/quic.rs index b4b13ffda..a464b102c 100644 --- a/core/message_bus/src/transports/quic.rs +++ b/core/message_bus/src/transports/quic.rs @@ -429,6 +429,8 @@ async fn reader_task( } Err(e) => { debug!(%label, %peer, "quic reader: read error: {e:?}"); + let _keep_in_tx_alive = &in_tx; + shutdown_fut.await; return; } } diff --git a/core/metadata/src/impls/metadata.rs b/core/metadata/src/impls/metadata.rs index 3e5487b48..5ce922bd9 100644 --- a/core/metadata/src/impls/metadata.rs +++ b/core/metadata/src/impls/metadata.rs @@ -696,6 +696,10 @@ where let _ = sender.send(reply.clone()); } + if prepare_header.operation == Operation::Register { + continue; + } + let generic_reply = reply.into_generic(); let reply_buffers = freeze_client_reply(generic_reply); emit_sim_event(SimEventKind::ClientReplyEmitted, &event); @@ -842,6 +846,14 @@ where "submit_register_in_process: gate flipped between check and dispatch" ); self.on_replicate(prepare).await; + let mut loopback = Vec::new(); + consensus.drain_loopback_into(&mut loopback); + for message in loopback { + let prepare_ok: Message<PrepareOkHeader> = message + .try_into_typed() + .expect("metadata loopback queue must only contain PrepareOk messages"); + self.on_ack(prepare_ok).await; + } match receiver.await { Ok(reply) => Ok(reply.header().commit), diff --git a/core/metadata/src/stm/user.rs b/core/metadata/src/stm/user.rs index adde8a848..7588e049a 100644 --- a/core/metadata/src/stm/user.rs +++ b/core/metadata/src/stm/user.rs @@ -21,7 +21,7 @@ use crate::stm::snapshot::Snapshotable; use crate::{collect_handlers, define_state, impl_fill_restore}; use ahash::AHashMap; use bytes::Bytes; -use iggy_binary_protocol::WireIdentifier; +use iggy_binary_protocol::primitives::permissions::{WireGlobalPermissions, WirePermissions}; use iggy_binary_protocol::requests::personal_access_tokens::{ CreatePersonalAccessTokenRequest, DeletePersonalAccessTokenRequest, }; @@ -29,6 +29,7 @@ use iggy_binary_protocol::requests::users::{ ChangePasswordRequest, CreateUserRequest, DeleteUserRequest, UpdatePermissionsRequest, UpdateUserRequest, }; +use iggy_binary_protocol::{WireIdentifier, WireName}; use iggy_common::{ GlobalPermissions, IggyExpiry, IggyTimestamp, Permissions, PersonalAccessToken, StreamPermissions, UserId, UserStatus, @@ -121,6 +122,45 @@ impl UsersInner { } } +impl Users { + #[must_use] + pub fn read<F, R>(&self, f: F) -> R + where + F: FnOnce(&UsersInner) -> R, + { + self.inner.read(f) + } + + pub fn ensure_root_user(&self, username: &str, password_hash: &str) { + if self.read(|users| !users.items.is_empty()) { + return; + } + + let username = WireName::new(username).expect("root username must be valid"); + self.inner + .do_apply(UsersCommand::CreateUser(CreateUserRequest { + username, + password: password_hash.to_string(), + status: UserStatus::Active.as_code(), + permissions: Some(WirePermissions { + global: WireGlobalPermissions { + manage_servers: true, + read_servers: true, + manage_users: true, + read_users: true, + manage_streams: true, + read_streams: true, + manage_topics: true, + read_topics: true, + poll_messages: true, + send_messages: true, + }, + streams: Vec::new(), + }), + })); + } +} + // TODO(hubcio): Serialize proper reply (e.g. assigned user ID) instead of empty Bytes. impl StateHandler for CreateUserRequest { type State = UsersInner; diff --git a/core/server-ng/Cargo.toml b/core/server-ng/Cargo.toml index d11700a19..e11d58bff 100644 --- a/core/server-ng/Cargo.toml +++ b/core/server-ng/Cargo.toml @@ -100,6 +100,7 @@ async_zip = { workspace = true } axum = { workspace = true } axum-server = { workspace = true } bytes = { workspace = true } +bytemuck = { workspace = true } chrono = { workspace = true } clap = { workspace = true } compio = { workspace = true } diff --git a/core/server-ng/src/bootstrap.rs b/core/server-ng/src/bootstrap.rs index 2c63fa753..0ad2e6b42 100644 --- a/core/server-ng/src/bootstrap.rs +++ b/core/server-ng/src/bootstrap.rs @@ -18,14 +18,21 @@ */ use crate::config_writer::write_current_config; +use crate::login_register::LoginRegisterError; use crate::server_error::ServerNgError; +use crate::session_manager::SessionManager; +use bytes::Bytes; use configs::server_ng::ServerNgConfig; use consensus::{LocalPipeline, PartitionsHandle, Sequencer, VsrConsensus}; -use iggy_binary_protocol::RequestHeader; +use iggy_binary_protocol::requests::users::{LoginRegisterRequest, LoginRegisterWithPatRequest}; +use iggy_binary_protocol::responses::users::LoginRegisterResponse; +use iggy_binary_protocol::{ + Command2, Message, Operation, ReplyHeader, RequestHeader, WireDecode, WireEncode, +}; use iggy_common::sharding::{IggyNamespace, PartitionLocation, ShardId}; use iggy_common::{ - ConsumerGroupOffsets, ConsumerOffsets, IggyByteSize, IggyError, PartitionStats, TopicStats, - sharding::LocalIdx, variadic, + ConsumerGroupOffsets, ConsumerOffsets, IggyByteSize, IggyError, IggyTimestamp, PartitionStats, + PersonalAccessToken, TopicStats, UserStatus, sharding::LocalIdx, variadic, }; use journal::Journal; use journal::prepare_journal::PrepareJournal; @@ -42,7 +49,7 @@ use message_bus::transports::tls::{ }; use message_bus::{ AcceptedClientFn, AcceptedQuicClientFn, AcceptedReplicaFn, AcceptedTlsClientFn, - AcceptedWsClientFn, IggyMessageBus, connector, + AcceptedWsClientFn, IggyMessageBus, MessageBus, connector, }; use metadata::IggyMetadata; use metadata::MuxStateMachine; @@ -56,10 +63,13 @@ use partitions::{ IggyIndexWriter, IggyPartition, IggyPartitions, MessagesWriter, PartitionsConfig, Segment, }; // TODO: decouple bootstrap/storage helpers and logging from the `server` crate. +use secrecy::ExposeSecret; use server::bootstrap::create_directories; use server::log::logger::Logging; use server::streaming::partitions::storage::{load_consumer_group_offsets, load_consumer_offsets}; use server::streaming::segments::storage::create_segment_storage; +use server::streaming::users::user::User as LegacyUser; +use server::streaming::utils::crypto; use shard::builder::IggyShardBuilder; use shard::shards_table::PapayaShardsTable; use shard::{ @@ -220,6 +230,7 @@ pub async fn bootstrap( let recovered = recover::<ServerNgMuxStateMachine>(Path::new(&config.system.path)) .await .map_err(ServerNgError::MetadataRecovery)?; + ensure_default_root_user(&recovered.mux_stm); let restored_op = recovered.last_applied_op.unwrap_or_else(|| { recovered .snapshot @@ -1257,19 +1268,14 @@ fn make_replica_message_handler(shard: &Rc<ServerNgShard>) -> MessageHandler { fn make_client_request_handler(shard: &Rc<ServerNgShard>) -> RequestHandler { let shard = Rc::clone(shard); + let sessions = Rc::new(RefCell::new(SessionManager::new())); Rc::new(move |client_id, message| { - let request = match message.try_into_typed::<RequestHeader>() { - Ok(request) => request, - Err(error) => { - warn!(client_id, error = %error, "dropping client request with invalid header"); - return; - } - }; - let request = request.transmute_header(|header, new_header: &mut RequestHeader| { - *new_header = header; - new_header.client = client_id; - }); - shard.dispatch(request.into_generic()); + let shard = Rc::clone(&shard); + let sessions = Rc::clone(&sessions); + compio::runtime::spawn(async move { + handle_client_request(&shard, &sessions, client_id, message).await; + }) + .detach(); }) } @@ -1284,25 +1290,324 @@ fn make_deferred_replica_message_handler(shard_handle: &ServerNgShardHandle) -> fn make_deferred_client_request_handler(shard_handle: &ServerNgShardHandle) -> RequestHandler { let shard_handle = Rc::clone(shard_handle); + let sessions = Rc::new(RefCell::new(SessionManager::new())); Rc::new(move |client_id, message| { - let Some(shard) = upgrade_shard_handle(&shard_handle) else { + let shard_handle = Rc::clone(&shard_handle); + let sessions = Rc::clone(&sessions); + compio::runtime::spawn(async move { + let Some(shard) = upgrade_shard_handle(&shard_handle) else { + return; + }; + handle_client_request(&shard, &sessions, client_id, message).await; + }) + .detach(); + }) +} + +#[allow(clippy::future_not_send)] +async fn handle_client_request( + shard: &Rc<ServerNgShard>, + sessions: &Rc<RefCell<SessionManager>>, + transport_client_id: u128, + message: Message<iggy_binary_protocol::GenericHeader>, +) { + let request = match message.try_into_typed::<RequestHeader>() { + Ok(request) => request, + Err(error) => { + warn!( + transport_client_id, + error = %error, + "dropping client request with invalid header" + ); return; - }; - let request = match message.try_into_typed::<RequestHeader>() { - Ok(request) => request, + } + }; + + ensure_transport_connection(shard, sessions, transport_client_id); + + let header = *request.header(); + if header.operation == Operation::Register && header.session == 0 && header.request == 0 { + handle_login_register_request(shard, sessions, transport_client_id, request).await; + return; + } + + let bound = sessions.borrow().get_session(transport_client_id); + let request = request.transmute_header(|header, new_header: &mut RequestHeader| { + *new_header = header; + new_header.client = transport_client_id; + if let Some((bound_client_id, bound_session)) = bound { + new_header.client = bound_client_id; + new_header.session = bound_session; + } + }); + shard.dispatch(request.into_generic()); +} + +fn ensure_transport_connection( + shard: &Rc<ServerNgShard>, + sessions: &Rc<RefCell<SessionManager>>, + transport_client_id: u128, +) { + let Some(meta) = shard.bus.client_meta(transport_client_id) else { + return; + }; + sessions + .borrow_mut() + .ensure_connection(transport_client_id, meta.peer_addr); +} + +#[allow(clippy::future_not_send)] +async fn handle_login_register_request( + shard: &Rc<ServerNgShard>, + sessions: &Rc<RefCell<SessionManager>>, + transport_client_id: u128, + request: Message<RequestHeader>, +) { + let body = request_body(&request); + let vsr_client_id = request.header().client; + + if let Ok(wire_request) = LoginRegisterRequest::decode_from(body) { + match verify_login_credentials( + shard, + wire_request.username.as_str(), + wire_request.password.expose_secret(), + ) { + Ok(user_id) => { + if let Err(error) = complete_login_register( + shard, + sessions, + transport_client_id, + vsr_client_id, + request.header(), + user_id, + ) + .await + { + warn!(transport_client_id, error = %error, "login/register failed"); + } + return; + } + Err(LoginRegisterError::InvalidCredentials) => {} Err(error) => { - warn!(client_id, error = %error, "dropping client request with invalid header"); + warn!(transport_client_id, error = %error, "login/register failed"); return; } + } + } + + if let Ok(wire_request) = LoginRegisterWithPatRequest::decode_from(body) { + match verify_pat_credentials(shard, wire_request.token.expose_secret()) { + Ok(user_id) => { + if let Err(error) = complete_login_register( + shard, + sessions, + transport_client_id, + vsr_client_id, + request.header(), + user_id, + ) + .await + { + warn!( + transport_client_id, + error = %error, + "login/register with PAT failed" + ); + } + return; + } + Err(error) => { + warn!( + transport_client_id, + error = %error, + "login/register with PAT failed" + ); + return; + } + } + } + + warn!( + transport_client_id, + "dropping register request with unsupported payload shape" + ); +} + +fn request_body(request: &Message<RequestHeader>) -> &[u8] { + &request.as_slice()[std::mem::size_of::<RequestHeader>()..request.header().size as usize] +} + +fn verify_login_credentials( + shard: &Rc<ServerNgShard>, + username: &str, + password: &str, +) -> Result<u32, LoginRegisterError> { + shard.plane.inner().0.mux_stm.inner().0.read(|users| { + let Some((_, user_id)) = users + .index + .iter() + .find(|(name, _)| name.as_ref() == username) + else { + return Err(LoginRegisterError::InvalidCredentials); }; - let request = request.transmute_header(|header, new_header: &mut RequestHeader| { - *new_header = header; - new_header.client = client_id; - }); - shard.dispatch(request.into_generic()); + let Some(user) = users.items.get(*user_id as usize) else { + return Err(LoginRegisterError::InvalidCredentials); + }; + if user.status != UserStatus::Active { + return Err(LoginRegisterError::UserInactive); + } + if !crypto::verify_password(password, user.password_hash.as_ref()) { + return Err(LoginRegisterError::InvalidCredentials); + } + Ok(user.id) }) } +fn verify_pat_credentials( + shard: &Rc<ServerNgShard>, + token: &str, +) -> Result<u32, LoginRegisterError> { + let token_hash = PersonalAccessToken::hash_token(token); + let now = IggyTimestamp::now(); + shard.plane.inner().0.mux_stm.inner().0.read(|users| { + let Some((user_id, pat)) = + users + .personal_access_tokens + .iter() + .find_map(|(user_id, tokens)| { + tokens + .values() + .find(|pat| pat.token.as_ref() == token_hash) + .map(|pat| (*user_id, pat)) + }) + else { + return Err(LoginRegisterError::InvalidToken); + }; + if pat.is_expired(now) { + return Err(LoginRegisterError::InvalidToken); + } + let Some(user) = users.items.get(user_id as usize) else { + return Err(LoginRegisterError::InvalidToken); + }; + if user.status != UserStatus::Active { + return Err(LoginRegisterError::UserInactive); + } + Ok(user.id) + }) +} + +#[allow(clippy::future_not_send)] +async fn complete_login_register( + shard: &Rc<ServerNgShard>, + sessions: &Rc<RefCell<SessionManager>>, + transport_client_id: u128, + vsr_client_id: u128, + request_header: &RequestHeader, + user_id: u32, +) -> Result<(), LoginRegisterError> { + if let Some((_, session)) = sessions.borrow().get_session(transport_client_id) { + let response = LoginRegisterResponse { user_id, session }.to_bytes(); + let reply = build_login_register_reply(request_header, vsr_client_id, session, &response); + let _ = shard + .bus + .send_to_client(transport_client_id, reply.into_generic().into_frozen()) + .await; + return Ok(()); + } + + { + let mut sessions = sessions.borrow_mut(); + sessions + .login(transport_client_id, user_id) + .map_err(LoginRegisterError::Session)?; + } + + let session = match shard + .plane + .inner() + .0 + .submit_register_in_process(vsr_client_id) + .await + { + Ok(session) => session, + Err(error) => { + let _ = sessions + .borrow_mut() + .reset_to_connected(transport_client_id); + return Err(LoginRegisterError::Transient(error)); + } + }; + + { + let mut sessions = sessions.borrow_mut(); + sessions + .bind_session(transport_client_id, vsr_client_id, session) + .map_err(LoginRegisterError::Session)?; + } + + let response = LoginRegisterResponse { user_id, session }.to_bytes(); + let reply = build_login_register_reply(request_header, vsr_client_id, session, &response); + if let Err(error) = shard + .bus + .send_to_client(transport_client_id, reply.into_generic().into_frozen()) + .await + { + warn!( + transport_client_id, + error = %error, + "failed to send login/register reply" + ); + } + + Ok(()) +} + +fn ensure_default_root_user(mux_stm: &ServerNgMuxStateMachine) { + if !mux_stm.inner().0.read(|users| users.items.is_empty()) { + return; + } + + let LegacyUser { + username, password, .. + } = server::bootstrap::create_root_user(); + mux_stm.inner().0.ensure_root_user(&username, &password); +} + +fn build_login_register_reply( + request_header: &RequestHeader, + client_id: u128, + session: u64, + body: &Bytes, +) -> Message<ReplyHeader> { + let total_size = std::mem::size_of::<ReplyHeader>() + body.len(); + let mut reply = Message::<ReplyHeader>::new(total_size); + let header = bytemuck::checked::try_from_bytes_mut::<ReplyHeader>( + &mut reply.as_mut_slice()[..std::mem::size_of::<ReplyHeader>()], + ) + .expect("zeroed bytes are valid"); + *header = ReplyHeader { + cluster: request_header.cluster, + size: total_size as u32, + view: request_header.view, + release: request_header.release, + command: Command2::Reply, + replica: request_header.replica, + request_checksum: request_header.request_checksum, + client: client_id, + op: session, + commit: session, + timestamp: request_header.timestamp, + request: request_header.request, + operation: request_header.operation, + namespace: request_header.namespace, + ..Default::default() + }; + if !body.is_empty() { + reply.as_mut_slice()[std::mem::size_of::<ReplyHeader>()..total_size].copy_from_slice(body); + } + reply +} + fn upgrade_shard_handle(shard_handle: &ServerNgShardHandle) -> Option<Rc<ServerNgShard>> { shard_handle .borrow() diff --git a/core/server-ng/src/login_register.rs b/core/server-ng/src/login_register.rs index f152421ff..e2a7c34ef 100644 --- a/core/server-ng/src/login_register.rs +++ b/core/server-ng/src/login_register.rs @@ -75,7 +75,7 @@ pub async fn handle_login_register<V, B, J, S, M>( verifier: &V, metadata: &LoginMetadata<'_, B, J, S, M>, session_manager: &mut SessionManager, - connection_id: u64, + connection_id: u128, ) -> Result<LoginRegisterResponse, LoginRegisterError> where V: CredentialVerifier, @@ -117,7 +117,7 @@ pub async fn handle_login_register_with_pat<T, B, J, S, M>( token_verifier: &T, metadata: &LoginMetadata<'_, B, J, S, M>, session_manager: &mut SessionManager, - connection_id: u64, + connection_id: u128, ) -> Result<LoginRegisterResponse, LoginRegisterError> where T: TokenVerifier, @@ -156,7 +156,7 @@ async fn complete_register<B, J, S, M>( user_id: u32, metadata: &LoginMetadata<'_, B, J, S, M>, session_manager: &mut SessionManager, - connection_id: u64, + connection_id: u128, ) -> Result<LoginRegisterResponse, LoginRegisterError> where B: MessageBus, diff --git a/core/server-ng/src/session_manager.rs b/core/server-ng/src/session_manager.rs index 0527c45e3..894c5adc0 100644 --- a/core/server-ng/src/session_manager.rs +++ b/core/server-ng/src/session_manager.rs @@ -77,11 +77,11 @@ pub struct Connection { /// per consensus session). If a client reconnects with the same `client_id`, /// the old connection must be evicted first. pub struct SessionManager { - connections: HashMap<u64, Connection>, + connections: HashMap<u128, Connection>, /// Reverse index: `client_id` → `connection_id` for fast lookup when /// a consensus reply arrives and needs routing to the right connection. - client_to_connection: HashMap<u128, u64>, - next_connection_id: u64, + client_to_connection: HashMap<u128, u128>, + next_connection_id: u128, } impl SessionManager { @@ -98,12 +98,12 @@ impl SessionManager { /// /// # Panics /// Panics if the connection ID counter overflows `u64::MAX`. - pub fn add_connection(&mut self, address: SocketAddr) -> u64 { + pub fn add_connection(&mut self, address: SocketAddr) -> u128 { let id = self.next_connection_id; self.next_connection_id = self .next_connection_id .checked_add(1) - .expect("connection ID overflow (u64::MAX connections without restart)"); + .expect("connection ID overflow (u128::MAX connections without restart)"); self.connections.insert( id, Connection { @@ -114,8 +114,15 @@ impl SessionManager { id } + pub fn ensure_connection(&mut self, connection_id: u128, address: SocketAddr) { + self.connections.entry(connection_id).or_insert(Connection { + address, + state: ConnectionState::Connected, + }); + } + /// Remove a connection (disconnect). Cleans up the reverse index if bound. - pub fn remove_connection(&mut self, connection_id: u64) { + pub fn remove_connection(&mut self, connection_id: u128) { if let Some(conn) = self.connections.remove(&connection_id) && let ConnectionState::Bound { client_id, .. } = conn.state { @@ -127,7 +134,7 @@ impl SessionManager { /// /// # Errors /// Returns `Err` if the connection doesn't exist or isn't in `Connected` state. - pub fn login(&mut self, connection_id: u64, user_id: u32) -> Result<(), SessionError> { + pub fn login(&mut self, connection_id: u128, user_id: u32) -> Result<(), SessionError> { let conn = self .connections .get_mut(&connection_id) @@ -153,7 +160,7 @@ impl SessionManager { /// /// # Errors /// Returns `Err` if the connection doesn't exist or isn't `Authenticated`. - pub fn reset_to_connected(&mut self, connection_id: u64) -> Result<(), SessionError> { + pub fn reset_to_connected(&mut self, connection_id: u128) -> Result<(), SessionError> { let conn = self .connections .get_mut(&connection_id) @@ -188,7 +195,7 @@ impl SessionManager { /// (impossible in single-threaded use). pub fn bind_session( &mut self, - connection_id: u64, + connection_id: u128, client_id: u128, session: u64, ) -> Result<(), SessionError> { @@ -227,7 +234,7 @@ impl SessionManager { /// /// Returns `(client_id, session)` if the connection is `Bound`, `None` otherwise. #[must_use] - pub fn get_session(&self, connection_id: u64) -> Option<(u128, u64)> { + pub fn get_session(&self, connection_id: u128) -> Option<(u128, u64)> { let conn = self.connections.get(&connection_id)?; match conn.state { ConnectionState::Bound { @@ -239,13 +246,13 @@ impl SessionManager { /// Look up the connection ID for a client (for routing consensus replies). #[must_use] - pub fn connection_for_client(&self, client_id: u128) -> Option<u64> { + pub fn connection_for_client(&self, client_id: u128) -> Option<u128> { self.client_to_connection.get(&client_id).copied() } /// Get connection metadata. #[must_use] - pub fn get_connection(&self, connection_id: u64) -> Option<&Connection> { + pub fn get_connection(&self, connection_id: u128) -> Option<&Connection> { self.connections.get(&connection_id) } @@ -270,9 +277,9 @@ impl Default for SessionManager { #[derive(Debug)] pub enum SessionError { - ConnectionNotFound(u64), + ConnectionNotFound(u128), InvalidTransition { - connection_id: u64, + connection_id: u128, from: &'static str, to: &'static str, },
