This is an automated email from the ASF dual-hosted git repository. hgruszecki pushed a commit to branch auth-test in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 6b0f90a0e693ef4c9ba2b4041ebacdaadc786160 Author: Hubert Gruszecki <[email protected]> AuthorDate: Wed Jan 14 13:52:53 2026 +0100 feat(server): require auth for stats endpoint Remove /stats from PUBLIC_PATHS, add identity extraction to handler. Add exhaustive auth scenario test with EnumIter derive. Add ensure_authenticated to all binary handlers that require authentication. --- Cargo.lock | 3 +- DEPENDENCIES.md | 2 +- .../tests/tcp_test/offset_feature_deserialize.go | 5 +- core/integration/Cargo.toml | 1 + core/integration/tests/server/general.rs | 3 +- core/integration/tests/server/mod.rs | 8 +- .../server/scenarios/authentication_scenario.rs | 465 +++++++++++++++++++++ core/integration/tests/server/scenarios/mod.rs | 1 + core/sdk/src/http/http_client.rs | 1 - core/server/Cargo.toml | 2 +- core/server/server.http | 33 +- core/server/src/binary/command.rs | 2 +- .../cluster/get_cluster_metadata_handler.rs | 1 + .../create_consumer_group_handler.rs | 1 + .../delete_consumer_group_handler.rs | 1 + .../consumer_groups/join_consumer_group_handler.rs | 1 + .../leave_consumer_group_handler.rs | 1 + .../delete_consumer_offset_handler.rs | 1 + .../get_consumer_offset_handler.rs | 1 + .../store_consumer_offset_handler.rs | 1 + .../messages/flush_unsaved_buffer_handler.rs | 1 + .../handlers/messages/poll_messages_handler.rs | 1 + .../handlers/messages/send_messages_handler.rs | 1 + .../partitions/create_partitions_handler.rs | 1 + .../partitions/delete_partitions_handler.rs | 1 + .../create_personal_access_token_handler.rs | 1 + .../delete_personal_access_token_handler.rs | 1 + .../get_personal_access_tokens_handler.rs | 1 + .../handlers/streams/create_stream_handler.rs | 1 + .../handlers/streams/delete_stream_handler.rs | 1 + .../handlers/streams/purge_stream_handler.rs | 1 + .../handlers/streams/update_stream_handler.rs | 1 + .../binary/handlers/system/get_client_handler.rs | 1 + .../binary/handlers/system/get_clients_handler.rs | 1 + .../src/binary/handlers/system/get_me_handler.rs | 1 + .../{get_snapshot.rs => get_snapshot_handler.rs} | 1 + .../binary/handlers/system/get_stats_handler.rs | 1 + core/server/src/binary/handlers/system/mod.rs | 2 +- .../binary/handlers/topics/create_topic_handler.rs | 1 + .../binary/handlers/topics/delete_topic_handler.rs | 1 + .../binary/handlers/topics/purge_topic_handler.rs | 1 + .../binary/handlers/topics/update_topic_handler.rs | 1 + .../handlers/users/change_password_handler.rs | 1 + .../binary/handlers/users/create_user_handler.rs | 1 + .../binary/handlers/users/delete_user_handler.rs | 1 + .../src/binary/handlers/users/get_user_handler.rs | 1 + .../src/binary/handlers/users/get_users_handler.rs | 1 + .../binary/handlers/users/logout_user_handler.rs | 1 + .../handlers/users/update_permissions_handler.rs | 1 + .../binary/handlers/users/update_user_handler.rs | 1 + core/server/src/binary/macros.rs | 2 +- core/server/src/http/consumer_groups.rs | 4 - core/server/src/http/jwt/middleware.rs | 1 - core/server/src/http/streams.rs | 10 +- core/server/src/http/system.rs | 15 +- core/server/src/http/topics.rs | 4 - core/server/src/shard/system/clients.rs | 2 - core/server/src/shard/system/consumer_groups.rs | 4 - core/server/src/shard/system/consumer_offsets.rs | 3 - core/server/src/shard/system/partitions.rs | 2 - .../src/shard/system/personal_access_tokens.rs | 3 - core/server/src/shard/system/snapshot/mod.rs | 4 +- core/server/src/shard/system/streams.rs | 4 - core/server/src/shard/system/topics.rs | 4 - core/server/src/shard/system/users.rs | 10 - 65 files changed, 560 insertions(+), 77 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 88953e444..a9e03e889 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5035,6 +5035,7 @@ dependencies = [ "serde_json", "serial_test", "server", + "strum 0.27.2", "strum_macros 0.27.2", "tempfile", "test-case", @@ -8290,7 +8291,7 @@ dependencies = [ [[package]] name = "server" -version = "0.6.1-edge.3" +version = "0.6.1-edge.4" dependencies = [ "ahash 0.8.12", "anyhow", diff --git a/DEPENDENCIES.md b/DEPENDENCIES.md index 31143eead..ac097d204 100644 --- a/DEPENDENCIES.md +++ b/DEPENDENCIES.md @@ -729,7 +729,7 @@ serde_with_macros: 3.16.1, "Apache-2.0 OR MIT", serde_yaml_ng: 0.10.0, "MIT", serial_test: 3.2.0, "MIT", serial_test_derive: 3.2.0, "MIT", -server: 0.6.1-edge.3, "Apache-2.0", +server: 0.6.1-edge.4, "Apache-2.0", sha1: 0.10.6, "Apache-2.0 OR MIT", sha2: 0.10.9, "Apache-2.0 OR MIT", sha3: 0.10.8, "Apache-2.0 OR MIT", diff --git a/bdd/go/tests/tcp_test/offset_feature_deserialize.go b/bdd/go/tests/tcp_test/offset_feature_deserialize.go index abec1494e..08da310c5 100644 --- a/bdd/go/tests/tcp_test/offset_feature_deserialize.go +++ b/bdd/go/tests/tcp_test/offset_feature_deserialize.go @@ -149,15 +149,14 @@ var _ = ginkgo.Describe("GET CONSUMER OFFSET:", func() { consumer := iggcon.NewGroupConsumer(randomU32Identifier()) partitionId := uint32(1) - offset, err := client.GetConsumerOffset( + _, err := client.GetConsumerOffset( consumer, streamIdentifier, topicIdentifier, &partitionId, ) - itShouldNotReturnError(err) - itShouldReturnNilOffsetForNewConsumerGroup(offset) + itShouldReturnUnauthenticatedError(err) }) ginkgo.Context("and tries to get offset from non-existing stream", func() { diff --git a/core/integration/Cargo.toml b/core/integration/Cargo.toml index bc4160710..e30ef758c 100644 --- a/core/integration/Cargo.toml +++ b/core/integration/Cargo.toml @@ -58,6 +58,7 @@ rmcp = { version = "0.12.0", features = [ serde_json = { workspace = true } serial_test = { workspace = true } server = { workspace = true } +strum = { workspace = true } strum_macros = { workspace = true } tempfile = { workspace = true } test-case = { workspace = true } diff --git a/core/integration/tests/server/general.rs b/core/integration/tests/server/general.rs index 9aa0ef04c..f2bb3de10 100644 --- a/core/integration/tests/server/general.rs +++ b/core/integration/tests/server/general.rs @@ -16,7 +16,7 @@ // under the License. use crate::server::{ - ScenarioFn, bench_scenario, consumer_timestamp_polling_scenario, + ScenarioFn, authentication_scenario, bench_scenario, consumer_timestamp_polling_scenario, create_message_payload_scenario, message_headers_scenario, run_scenario, stream_size_validation_scenario, system_scenario, user_scenario, }; @@ -27,6 +27,7 @@ use test_case::test_matrix; #[test_matrix( [quic(), tcp(), http(), websocket()], [ + authentication_scenario(), system_scenario(), user_scenario(), message_headers_scenario(), diff --git a/core/integration/tests/server/mod.rs b/core/integration/tests/server/mod.rs index f1a3524ea..0b8fd502b 100644 --- a/core/integration/tests/server/mod.rs +++ b/core/integration/tests/server/mod.rs @@ -32,8 +32,8 @@ use integration::{ websocket_client::WebSocketClientFactory, }; use scenarios::{ - bench_scenario, consumer_group_auto_commit_reconnection_scenario, consumer_group_join_scenario, - consumer_group_offset_cleanup_scenario, + authentication_scenario, bench_scenario, consumer_group_auto_commit_reconnection_scenario, + consumer_group_join_scenario, consumer_group_offset_cleanup_scenario, consumer_group_with_multiple_clients_polling_messages_scenario, consumer_group_with_single_client_polling_messages_scenario, consumer_timestamp_polling_scenario, create_message_payload, message_headers_scenario, @@ -46,6 +46,10 @@ use std::{collections::HashMap, future::Future}; type ScenarioFn = fn(&dyn ClientFactory) -> Pin<Box<dyn Future<Output = ()> + '_>>; +fn authentication_scenario() -> ScenarioFn { + |factory| Box::pin(authentication_scenario::run(factory)) +} + fn system_scenario() -> ScenarioFn { |factory| Box::pin(system_scenario::run(factory)) } diff --git a/core/integration/tests/server/scenarios/authentication_scenario.rs b/core/integration/tests/server/scenarios/authentication_scenario.rs new file mode 100644 index 000000000..f793e49a4 --- /dev/null +++ b/core/integration/tests/server/scenarios/authentication_scenario.rs @@ -0,0 +1,465 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +//! Authentication scenario test. +//! +//! Validates that all commands require authentication except: +//! - Ping (health check) +//! - LoginUser (authenticate with username/password) +//! - LoginWithPersonalAccessToken (authenticate with PAT) +//! +//! Uses exhaustive matching on `ServerCommand` to ensure no command is missed +//! when new commands are added. + +use crate::server::scenarios::create_client; +use bytes::Bytes; +use iggy::prelude::*; +use integration::test_server::{ClientFactory, login_root}; +use server::binary::command::ServerCommand; +use strum::IntoEnumIterator; + +const STREAM_NAME: &str = "auth-test-stream"; +const TOPIC_NAME: &str = "auth-test-topic"; + +/// Shared test context with identifiers used across command tests. +struct TestContext { + stream_id: Identifier, + topic_id: Identifier, + user_id: Identifier, + group_id: Identifier, + consumer: Consumer, +} + +impl TestContext { + fn new() -> Self { + Self { + stream_id: Identifier::named(STREAM_NAME).unwrap(), + topic_id: Identifier::named(TOPIC_NAME).unwrap(), + user_id: Identifier::numeric(1).unwrap(), + group_id: Identifier::named("test-group").unwrap(), + consumer: Consumer::default(), + } + } +} + +pub async fn run(client_factory: &dyn ClientFactory) { + let client = create_client(client_factory).await; + + // Phase 1: Verify ping works without auth + client.ping().await.expect("ping should work without auth"); + + // Phase 2: Verify all protected commands fail without auth + test_all_commands_require_auth(&client).await; + + // Phase 3: Login and verify commands work + let identity = login_root(&client).await; + assert_eq!(identity.user_id, 0, "root user should have id 0"); + setup_test_resources(&client).await; + verify_auth_works(&client).await; + + // Phase 4: Logout and verify commands fail again + client.logout_user().await.expect("logout should succeed"); + test_all_commands_require_auth(&client).await; + + // Phase 5: Test PAT authentication + login_root(&client).await; + let raw_pat = client + .create_personal_access_token("auth-test-pat", PersonalAccessTokenExpiry::NeverExpire) + .await + .expect("should create PAT"); + + client.logout_user().await.expect("logout should succeed"); + assert!( + client.get_streams().await.is_err(), + "should fail after logout" + ); + + let identity = client + .login_with_personal_access_token(&raw_pat.token) + .await + .expect("PAT login should work"); + assert_eq!(identity.user_id, 0, "PAT should authenticate as root"); + + client + .get_streams() + .await + .expect("get_streams should work after PAT login"); + + cleanup_test_resources(&client).await; +} + +/// Tests all commands require authentication using exhaustive matching. +/// If a new command is added to ServerCommand, this match becomes non-exhaustive. +async fn test_all_commands_require_auth(client: &IggyClient) { + let ctx = TestContext::new(); + + for cmd in ServerCommand::iter() { + let (name, result): (&str, Result<(), IggyError>) = match cmd { + // ================================================================ + // NO AUTH REQUIRED (3 commands) + // ================================================================ + ServerCommand::Ping(_) + | ServerCommand::LoginUser(_) + | ServerCommand::LoginWithPersonalAccessToken(_) => continue, + + // ================================================================ + // STATEFUL - NOT SUPPORTED ON HTTP (2 commands) + // ================================================================ + ServerCommand::JoinConsumerGroup(_) | ServerCommand::LeaveConsumerGroup(_) => continue, + + // ================================================================ + // SPECIAL SETUP REQUIRED (3 commands) + // ================================================================ + ServerCommand::GetSnapshot(_) + | ServerCommand::DeleteSegments(_) + | ServerCommand::LogoutUser(_) => continue, + + // ================================================================ + // REQUIRES AUTH (39 commands) + // ================================================================ + + // System + ServerCommand::GetStats(_) => ("GetStats", client.get_stats().await.map(|_| ())), + ServerCommand::GetMe(_) => ("GetMe", client.get_me().await.map(|_| ())), + ServerCommand::GetClient(_) => ("GetClient", client.get_client(1).await.map(|_| ())), + ServerCommand::GetClients(_) => ("GetClients", client.get_clients().await.map(|_| ())), + ServerCommand::GetClusterMetadata(_) => ( + "GetClusterMetadata", + client.get_cluster_metadata().await.map(|_| ()), + ), + + // Users + ServerCommand::GetUser(_) => { + ("GetUser", client.get_user(&ctx.user_id).await.map(|_| ())) + } + ServerCommand::GetUsers(_) => ("GetUsers", client.get_users().await.map(|_| ())), + ServerCommand::CreateUser(_) => ( + "CreateUser", + client + .create_user("test", "test", UserStatus::Active, None) + .await + .map(|_| ()), + ), + ServerCommand::DeleteUser(_) => ("DeleteUser", client.delete_user(&ctx.user_id).await), + ServerCommand::UpdateUser(_) => ( + "UpdateUser", + client.update_user(&ctx.user_id, Some("x"), None).await, + ), + ServerCommand::UpdatePermissions(_) => ( + "UpdatePermissions", + client.update_permissions(&ctx.user_id, None).await, + ), + ServerCommand::ChangePassword(_) => ( + "ChangePassword", + client.change_password(&ctx.user_id, "old", "new").await, + ), + + // PAT + ServerCommand::GetPersonalAccessTokens(_) => ( + "GetPersonalAccessTokens", + client.get_personal_access_tokens().await.map(|_| ()), + ), + ServerCommand::CreatePersonalAccessToken(_) => ( + "CreatePersonalAccessToken", + client + .create_personal_access_token("x", PersonalAccessTokenExpiry::NeverExpire) + .await + .map(|_| ()), + ), + ServerCommand::DeletePersonalAccessToken(_) => ( + "DeletePersonalAccessToken", + client.delete_personal_access_token("x").await, + ), + + // Streams + ServerCommand::GetStream(_) => ( + "GetStream", + client.get_stream(&ctx.stream_id).await.map(|_| ()), + ), + ServerCommand::GetStreams(_) => ("GetStreams", client.get_streams().await.map(|_| ())), + ServerCommand::CreateStream(_) => { + ("CreateStream", client.create_stream("x").await.map(|_| ())) + } + ServerCommand::DeleteStream(_) => { + ("DeleteStream", client.delete_stream(&ctx.stream_id).await) + } + ServerCommand::UpdateStream(_) => ( + "UpdateStream", + client.update_stream(&ctx.stream_id, "x").await, + ), + ServerCommand::PurgeStream(_) => { + ("PurgeStream", client.purge_stream(&ctx.stream_id).await) + } + + // Topics + ServerCommand::GetTopic(_) => ( + "GetTopic", + client + .get_topic(&ctx.stream_id, &ctx.topic_id) + .await + .map(|_| ()), + ), + ServerCommand::GetTopics(_) => ( + "GetTopics", + client.get_topics(&ctx.stream_id).await.map(|_| ()), + ), + ServerCommand::CreateTopic(_) => ( + "CreateTopic", + client + .create_topic( + &ctx.stream_id, + "x", + 1, + CompressionAlgorithm::None, + None, + IggyExpiry::NeverExpire, + MaxTopicSize::ServerDefault, + ) + .await + .map(|_| ()), + ), + ServerCommand::DeleteTopic(_) => ( + "DeleteTopic", + client.delete_topic(&ctx.stream_id, &ctx.topic_id).await, + ), + ServerCommand::UpdateTopic(_) => ( + "UpdateTopic", + client + .update_topic( + &ctx.stream_id, + &ctx.topic_id, + "x", + CompressionAlgorithm::None, + None, + IggyExpiry::NeverExpire, + MaxTopicSize::ServerDefault, + ) + .await, + ), + ServerCommand::PurgeTopic(_) => ( + "PurgeTopic", + client.purge_topic(&ctx.stream_id, &ctx.topic_id).await, + ), + + // Partitions + ServerCommand::CreatePartitions(_) => ( + "CreatePartitions", + client + .create_partitions(&ctx.stream_id, &ctx.topic_id, 1) + .await, + ), + ServerCommand::DeletePartitions(_) => ( + "DeletePartitions", + client + .delete_partitions(&ctx.stream_id, &ctx.topic_id, 1) + .await, + ), + + // Messages + ServerCommand::SendMessages(_) => { + let mut msgs = vec![ + IggyMessage::builder() + .payload(Bytes::from("x")) + .build() + .unwrap(), + ]; + ( + "SendMessages", + client + .send_messages( + &ctx.stream_id, + &ctx.topic_id, + &Partitioning::partition_id(0), + &mut msgs, + ) + .await, + ) + } + ServerCommand::PollMessages(_) => ( + "PollMessages", + client + .poll_messages( + &ctx.stream_id, + &ctx.topic_id, + Some(0), + &ctx.consumer, + &PollingStrategy::offset(0), + 1, + false, + ) + .await + .map(|_| ()), + ), + ServerCommand::FlushUnsavedBuffer(_) => ( + "FlushUnsavedBuffer", + client + .flush_unsaved_buffer(&ctx.stream_id, &ctx.topic_id, 0, false) + .await, + ), + + // Consumer Offsets + ServerCommand::GetConsumerOffset(_) => ( + "GetConsumerOffset", + client + .get_consumer_offset(&ctx.consumer, &ctx.stream_id, &ctx.topic_id, Some(0)) + .await + .map(|_| ()), + ), + ServerCommand::StoreConsumerOffset(_) => ( + "StoreConsumerOffset", + client + .store_consumer_offset(&ctx.consumer, &ctx.stream_id, &ctx.topic_id, Some(0), 0) + .await, + ), + ServerCommand::DeleteConsumerOffset(_) => ( + "DeleteConsumerOffset", + client + .delete_consumer_offset(&ctx.consumer, &ctx.stream_id, &ctx.topic_id, Some(0)) + .await, + ), + + // Consumer Groups + ServerCommand::GetConsumerGroup(_) => ( + "GetConsumerGroup", + client + .get_consumer_group(&ctx.stream_id, &ctx.topic_id, &ctx.group_id) + .await + .map(|_| ()), + ), + ServerCommand::GetConsumerGroups(_) => ( + "GetConsumerGroups", + client + .get_consumer_groups(&ctx.stream_id, &ctx.topic_id) + .await + .map(|_| ()), + ), + ServerCommand::CreateConsumerGroup(_) => ( + "CreateConsumerGroup", + client + .create_consumer_group(&ctx.stream_id, &ctx.topic_id, "x") + .await + .map(|_| ()), + ), + ServerCommand::DeleteConsumerGroup(_) => ( + "DeleteConsumerGroup", + client + .delete_consumer_group(&ctx.stream_id, &ctx.topic_id, &ctx.group_id) + .await, + ), + }; + + assert_unauthenticated(result, name); + } +} + +fn assert_unauthenticated(result: Result<(), IggyError>, name: &str) { + match result { + Err(e) if e.as_code() == IggyError::FeatureUnavailable.as_code() => {} + Err(e) if e.as_code() == IggyError::Unauthenticated.as_code() => {} + Err(e) => panic!( + "{name}: expected Unauthenticated ({}), got {:?} ({})", + IggyError::Unauthenticated.as_code(), + e, + e.as_code() + ), + Ok(()) => panic!("{name}: expected Unauthenticated, got Ok"), + } +} + +/// Verifies a subset of commands work when authenticated. +async fn verify_auth_works(client: &IggyClient) { + let ctx = TestContext::new(); + + // System + client.get_stats().await.expect("get_stats"); + client.get_clients().await.expect("get_clients"); + client + .get_cluster_metadata() + .await + .expect("get_cluster_metadata"); + + // Streams & Topics + client.get_streams().await.expect("get_streams"); + client.get_stream(&ctx.stream_id).await.expect("get_stream"); + client.get_topics(&ctx.stream_id).await.expect("get_topics"); + client + .get_topic(&ctx.stream_id, &ctx.topic_id) + .await + .expect("get_topic"); + + // Messages + let mut msgs = vec![ + IggyMessage::builder() + .payload(Bytes::from("test")) + .build() + .unwrap(), + ]; + client + .send_messages( + &ctx.stream_id, + &ctx.topic_id, + &Partitioning::partition_id(0), + &mut msgs, + ) + .await + .expect("send_messages"); + + client + .poll_messages( + &ctx.stream_id, + &ctx.topic_id, + Some(0), + &ctx.consumer, + &PollingStrategy::offset(0), + 10, + false, + ) + .await + .expect("poll_messages"); + + // Users & PATs + client.get_users().await.expect("get_users"); + client.get_personal_access_tokens().await.expect("get_pats"); +} + +async fn setup_test_resources(client: &IggyClient) { + client + .create_stream(STREAM_NAME) + .await + .expect("create stream"); + client + .create_topic( + &Identifier::named(STREAM_NAME).unwrap(), + TOPIC_NAME, + 1, + CompressionAlgorithm::None, + None, + IggyExpiry::NeverExpire, + MaxTopicSize::ServerDefault, + ) + .await + .expect("create topic"); +} + +async fn cleanup_test_resources(client: &IggyClient) { + let _ = client.delete_personal_access_token("auth-test-pat").await; + client + .delete_stream(&Identifier::named(STREAM_NAME).unwrap()) + .await + .expect("delete stream"); +} diff --git a/core/integration/tests/server/scenarios/mod.rs b/core/integration/tests/server/scenarios/mod.rs index 57ac2ddaa..1676cefa3 100644 --- a/core/integration/tests/server/scenarios/mod.rs +++ b/core/integration/tests/server/scenarios/mod.rs @@ -16,6 +16,7 @@ * under the License. */ +pub mod authentication_scenario; pub mod bench_scenario; pub mod concurrent_scenario; pub mod consumer_group_auto_commit_reconnection_scenario; diff --git a/core/sdk/src/http/http_client.rs b/core/sdk/src/http/http_client.rs index 05e63fab1..a6ddcaa65 100644 --- a/core/sdk/src/http/http_client.rs +++ b/core/sdk/src/http/http_client.rs @@ -38,7 +38,6 @@ const PUBLIC_PATHS: &[&str] = &[ "/", "/metrics", "/ping", - "/stats", "/users/login", "/users/refresh-token", "/personal-access-tokens/login", diff --git a/core/server/Cargo.toml b/core/server/Cargo.toml index 3e3bb5dc4..fd72144be 100644 --- a/core/server/Cargo.toml +++ b/core/server/Cargo.toml @@ -17,7 +17,7 @@ [package] name = "server" -version = "0.6.1-edge.3" +version = "0.6.1-edge.4" edition = "2024" license = "Apache-2.0" diff --git a/core/server/server.http b/core/server/server.http index dd52429e4..02a7d2d13 100644 --- a/core/server/server.http +++ b/core/server/server.http @@ -42,11 +42,29 @@ GET {{url}} ### GET {{url}}/ping +### +POST {{url}}/users/login +Content-Type: application/json + +{ + "username": "{{root_username}}", + "password": "{{root_password}}" +} + +### +POST {{url}}/personal-access-tokens/login +Content-Type: application/json + +{ + "token": "{{pat_raw_token}}" +} + ### GET {{url}}/metrics ### GET {{url}}/stats +Authorization: Bearer {{access_token}} ### GET {{url}}/cluster/metadata @@ -60,16 +78,6 @@ Authorization: Bearer {{access_token}} GET {{url}}/clients/{{client_id}} Authorization: Bearer {{access_token}} - -### -POST {{url}}/users/login -Content-Type: application/json - -{ - "username": "{{root_username}}", - "password": "{{root_password}}" -} - ### POST {{url}}/users/refresh-token Content-Type: application/json @@ -183,12 +191,7 @@ Content-Type: application/json } ### -POST {{url}}/personal-access-tokens/login -Content-Type: application/json -{ - "token": "{{pat_raw_token}}" -} ### DELETE {{url}}/personal-access-tokens/{{pat_name}} diff --git a/core/server/src/binary/command.rs b/core/server/src/binary/command.rs index e10fe6ab8..78a2b4417 100644 --- a/core/server/src/binary/command.rs +++ b/core/server/src/binary/command.rs @@ -68,7 +68,7 @@ use iggy_common::update_topic::UpdateTopic; use iggy_common::update_user::UpdateUser; use iggy_common::*; use std::rc::Rc; -use strum::EnumString; +use strum::{EnumIter, EnumString}; use tracing::error; define_server_command_enum! { diff --git a/core/server/src/binary/handlers/cluster/get_cluster_metadata_handler.rs b/core/server/src/binary/handlers/cluster/get_cluster_metadata_handler.rs index af6c7a9a0..501c697ba 100644 --- a/core/server/src/binary/handlers/cluster/get_cluster_metadata_handler.rs +++ b/core/server/src/binary/handlers/cluster/get_cluster_metadata_handler.rs @@ -44,6 +44,7 @@ impl ServerCommandHandler for GetClusterMetadata { shard: &Rc<IggyShard>, ) -> Result<HandlerResult, IggyError> { debug!("session: {session}, command: {self}"); + shard.ensure_authenticated(session)?; let cluster_metadata = shard.get_cluster_metadata(session)?; diff --git a/core/server/src/binary/handlers/consumer_groups/create_consumer_group_handler.rs b/core/server/src/binary/handlers/consumer_groups/create_consumer_group_handler.rs index 192d867f5..fe1350a9f 100644 --- a/core/server/src/binary/handlers/consumer_groups/create_consumer_group_handler.rs +++ b/core/server/src/binary/handlers/consumer_groups/create_consumer_group_handler.rs @@ -49,6 +49,7 @@ impl ServerCommandHandler for CreateConsumerGroup { shard: &Rc<IggyShard>, ) -> Result<HandlerResult, IggyError> { debug!("session: {session}, command: {self}"); + shard.ensure_authenticated(session)?; let cg = shard.create_consumer_group( session, &self.stream_id, diff --git a/core/server/src/binary/handlers/consumer_groups/delete_consumer_group_handler.rs b/core/server/src/binary/handlers/consumer_groups/delete_consumer_group_handler.rs index 05c3cd9de..07751b591 100644 --- a/core/server/src/binary/handlers/consumer_groups/delete_consumer_group_handler.rs +++ b/core/server/src/binary/handlers/consumer_groups/delete_consumer_group_handler.rs @@ -49,6 +49,7 @@ impl ServerCommandHandler for DeleteConsumerGroup { shard: &Rc<IggyShard>, ) -> Result<HandlerResult, IggyError> { debug!("session: {session}, command: {self}"); + shard.ensure_authenticated(session)?; let cg = shard.delete_consumer_group(session, &self.stream_id, &self.topic_id, &self.group_id).error(|e: &IggyError| { format!( "{COMPONENT} (error: {e}) - failed to delete consumer group with ID: {} for topic with ID: {} in stream with ID: {} for session: {}", diff --git a/core/server/src/binary/handlers/consumer_groups/join_consumer_group_handler.rs b/core/server/src/binary/handlers/consumer_groups/join_consumer_group_handler.rs index cc9a8e27b..8aab2dca5 100644 --- a/core/server/src/binary/handlers/consumer_groups/join_consumer_group_handler.rs +++ b/core/server/src/binary/handlers/consumer_groups/join_consumer_group_handler.rs @@ -45,6 +45,7 @@ impl ServerCommandHandler for JoinConsumerGroup { shard: &Rc<IggyShard>, ) -> Result<HandlerResult, IggyError> { debug!("session: {session}, command: {self}"); + shard.ensure_authenticated(session)?; shard .join_consumer_group( session, diff --git a/core/server/src/binary/handlers/consumer_groups/leave_consumer_group_handler.rs b/core/server/src/binary/handlers/consumer_groups/leave_consumer_group_handler.rs index b56d94a48..32f1c4d5a 100644 --- a/core/server/src/binary/handlers/consumer_groups/leave_consumer_group_handler.rs +++ b/core/server/src/binary/handlers/consumer_groups/leave_consumer_group_handler.rs @@ -46,6 +46,7 @@ impl ServerCommandHandler for LeaveConsumerGroup { shard: &Rc<IggyShard>, ) -> Result<HandlerResult, IggyError> { debug!("session: {session}, command: {self}"); + shard.ensure_authenticated(session)?; shard .leave_consumer_group( diff --git a/core/server/src/binary/handlers/consumer_offsets/delete_consumer_offset_handler.rs b/core/server/src/binary/handlers/consumer_offsets/delete_consumer_offset_handler.rs index 45f22349d..64d01255e 100644 --- a/core/server/src/binary/handlers/consumer_offsets/delete_consumer_offset_handler.rs +++ b/core/server/src/binary/handlers/consumer_offsets/delete_consumer_offset_handler.rs @@ -43,6 +43,7 @@ impl ServerCommandHandler for DeleteConsumerOffset { shard: &Rc<IggyShard>, ) -> Result<HandlerResult, IggyError> { debug!("session: {session}, command: {self}"); + shard.ensure_authenticated(session)?; shard .delete_consumer_offset( session, diff --git a/core/server/src/binary/handlers/consumer_offsets/get_consumer_offset_handler.rs b/core/server/src/binary/handlers/consumer_offsets/get_consumer_offset_handler.rs index 29ee6f310..7b8991981 100644 --- a/core/server/src/binary/handlers/consumer_offsets/get_consumer_offset_handler.rs +++ b/core/server/src/binary/handlers/consumer_offsets/get_consumer_offset_handler.rs @@ -43,6 +43,7 @@ impl ServerCommandHandler for GetConsumerOffset { shard: &Rc<IggyShard>, ) -> Result<HandlerResult, IggyError> { debug!("session: {session}, command: {self}"); + shard.ensure_authenticated(session)?; let Ok(offset) = shard .get_consumer_offset( session, diff --git a/core/server/src/binary/handlers/consumer_offsets/store_consumer_offset_handler.rs b/core/server/src/binary/handlers/consumer_offsets/store_consumer_offset_handler.rs index dcbb79db3..c603f4cb8 100644 --- a/core/server/src/binary/handlers/consumer_offsets/store_consumer_offset_handler.rs +++ b/core/server/src/binary/handlers/consumer_offsets/store_consumer_offset_handler.rs @@ -45,6 +45,7 @@ impl ServerCommandHandler for StoreConsumerOffset { shard: &Rc<IggyShard>, ) -> Result<HandlerResult, IggyError> { debug!("session: {session}, command: {self}"); + shard.ensure_authenticated(session)?; shard .store_consumer_offset( session, diff --git a/core/server/src/binary/handlers/messages/flush_unsaved_buffer_handler.rs b/core/server/src/binary/handlers/messages/flush_unsaved_buffer_handler.rs index 937a03cb3..008d1c600 100644 --- a/core/server/src/binary/handlers/messages/flush_unsaved_buffer_handler.rs +++ b/core/server/src/binary/handlers/messages/flush_unsaved_buffer_handler.rs @@ -43,6 +43,7 @@ impl ServerCommandHandler for FlushUnsavedBuffer { shard: &Rc<IggyShard>, ) -> Result<HandlerResult, IggyError> { debug!("session: {session}, command: {self}"); + shard.ensure_authenticated(session)?; let user_id = session.get_user_id(); let stream_id = self.stream_id.clone(); diff --git a/core/server/src/binary/handlers/messages/poll_messages_handler.rs b/core/server/src/binary/handlers/messages/poll_messages_handler.rs index 0839e3a55..ea8ccbb8c 100644 --- a/core/server/src/binary/handlers/messages/poll_messages_handler.rs +++ b/core/server/src/binary/handlers/messages/poll_messages_handler.rs @@ -57,6 +57,7 @@ impl ServerCommandHandler for PollMessages { shard: &Rc<IggyShard>, ) -> Result<HandlerResult, IggyError> { debug!("session: {session}, command: {self}"); + shard.ensure_authenticated(session)?; let PollMessages { consumer, partition_id, diff --git a/core/server/src/binary/handlers/messages/send_messages_handler.rs b/core/server/src/binary/handlers/messages/send_messages_handler.rs index 375848799..6aa6786ac 100644 --- a/core/server/src/binary/handlers/messages/send_messages_handler.rs +++ b/core/server/src/binary/handlers/messages/send_messages_handler.rs @@ -53,6 +53,7 @@ impl ServerCommandHandler for SendMessages { session: &Session, shard: &Rc<IggyShard>, ) -> Result<HandlerResult, IggyError> { + shard.ensure_authenticated(session)?; let total_payload_size = length as usize - std::mem::size_of::<u32>(); let metadata_len_field_size = std::mem::size_of::<u32>(); diff --git a/core/server/src/binary/handlers/partitions/create_partitions_handler.rs b/core/server/src/binary/handlers/partitions/create_partitions_handler.rs index 5c1930a8a..85b4e3030 100644 --- a/core/server/src/binary/handlers/partitions/create_partitions_handler.rs +++ b/core/server/src/binary/handlers/partitions/create_partitions_handler.rs @@ -49,6 +49,7 @@ impl ServerCommandHandler for CreatePartitions { shard: &Rc<IggyShard>, ) -> Result<HandlerResult, IggyError> { debug!("session: {session}, command: {self}"); + shard.ensure_authenticated(session)?; // Acquire partition lock to serialize filesystem operations let _partition_guard = shard.fs_locks.partition_lock.lock().await; diff --git a/core/server/src/binary/handlers/partitions/delete_partitions_handler.rs b/core/server/src/binary/handlers/partitions/delete_partitions_handler.rs index 9090ec2c8..f4bed945f 100644 --- a/core/server/src/binary/handlers/partitions/delete_partitions_handler.rs +++ b/core/server/src/binary/handlers/partitions/delete_partitions_handler.rs @@ -47,6 +47,7 @@ impl ServerCommandHandler for DeletePartitions { shard: &Rc<IggyShard>, ) -> Result<HandlerResult, IggyError> { debug!("session: {session}, command: {self}"); + shard.ensure_authenticated(session)?; let stream_id = self.stream_id.clone(); let topic_id = self.topic_id.clone(); diff --git a/core/server/src/binary/handlers/personal_access_tokens/create_personal_access_token_handler.rs b/core/server/src/binary/handlers/personal_access_tokens/create_personal_access_token_handler.rs index 50b896fb0..314f1821d 100644 --- a/core/server/src/binary/handlers/personal_access_tokens/create_personal_access_token_handler.rs +++ b/core/server/src/binary/handlers/personal_access_tokens/create_personal_access_token_handler.rs @@ -49,6 +49,7 @@ impl ServerCommandHandler for CreatePersonalAccessToken { shard: &Rc<IggyShard>, ) -> Result<HandlerResult, IggyError> { debug!("session: {session}, command: {self}"); + shard.ensure_authenticated(session)?; let (personal_access_token, token) = shard .create_personal_access_token(session, &self.name, self.expiry) diff --git a/core/server/src/binary/handlers/personal_access_tokens/delete_personal_access_token_handler.rs b/core/server/src/binary/handlers/personal_access_tokens/delete_personal_access_token_handler.rs index bf89c70ca..edb1e8e9a 100644 --- a/core/server/src/binary/handlers/personal_access_tokens/delete_personal_access_token_handler.rs +++ b/core/server/src/binary/handlers/personal_access_tokens/delete_personal_access_token_handler.rs @@ -45,6 +45,7 @@ impl ServerCommandHandler for DeletePersonalAccessToken { shard: &Rc<IggyShard>, ) -> Result<HandlerResult, IggyError> { debug!("session: {session}, command: {self}"); + shard.ensure_authenticated(session)?; let token_name = self.name.clone(); shard diff --git a/core/server/src/binary/handlers/personal_access_tokens/get_personal_access_tokens_handler.rs b/core/server/src/binary/handlers/personal_access_tokens/get_personal_access_tokens_handler.rs index 9e2923868..2f020c983 100644 --- a/core/server/src/binary/handlers/personal_access_tokens/get_personal_access_tokens_handler.rs +++ b/core/server/src/binary/handlers/personal_access_tokens/get_personal_access_tokens_handler.rs @@ -44,6 +44,7 @@ impl ServerCommandHandler for GetPersonalAccessTokens { shard: &Rc<IggyShard>, ) -> Result<HandlerResult, IggyError> { debug!("session: {session}, command: {self}"); + shard.ensure_authenticated(session)?; let personal_access_tokens = shard .get_personal_access_tokens(session) .error(|e: &IggyError| { diff --git a/core/server/src/binary/handlers/streams/create_stream_handler.rs b/core/server/src/binary/handlers/streams/create_stream_handler.rs index c2935ef6a..4d9e85d5d 100644 --- a/core/server/src/binary/handlers/streams/create_stream_handler.rs +++ b/core/server/src/binary/handlers/streams/create_stream_handler.rs @@ -54,6 +54,7 @@ impl ServerCommandHandler for CreateStream { shard: &Rc<IggyShard>, ) -> Result<HandlerResult, IggyError> { debug!("session: {session}, command: {self}"); + shard.ensure_authenticated(session)?; let request = ShardRequest { stream_id: Identifier::default(), diff --git a/core/server/src/binary/handlers/streams/delete_stream_handler.rs b/core/server/src/binary/handlers/streams/delete_stream_handler.rs index d1e23cb04..addc25af3 100644 --- a/core/server/src/binary/handlers/streams/delete_stream_handler.rs +++ b/core/server/src/binary/handlers/streams/delete_stream_handler.rs @@ -52,6 +52,7 @@ impl ServerCommandHandler for DeleteStream { shard: &Rc<IggyShard>, ) -> Result<HandlerResult, IggyError> { debug!("session: {session}, command: {self}"); + shard.ensure_authenticated(session)?; let stream_id = self.stream_id.clone(); let request = ShardRequest { stream_id: Identifier::default(), diff --git a/core/server/src/binary/handlers/streams/purge_stream_handler.rs b/core/server/src/binary/handlers/streams/purge_stream_handler.rs index bfa189613..c688ecd96 100644 --- a/core/server/src/binary/handlers/streams/purge_stream_handler.rs +++ b/core/server/src/binary/handlers/streams/purge_stream_handler.rs @@ -47,6 +47,7 @@ impl ServerCommandHandler for PurgeStream { shard: &Rc<IggyShard>, ) -> Result<HandlerResult, IggyError> { debug!("session: {session}, command: {self}"); + shard.ensure_authenticated(session)?; let stream_id = self.stream_id.clone(); shard diff --git a/core/server/src/binary/handlers/streams/update_stream_handler.rs b/core/server/src/binary/handlers/streams/update_stream_handler.rs index 8e4de6574..3986f589c 100644 --- a/core/server/src/binary/handlers/streams/update_stream_handler.rs +++ b/core/server/src/binary/handlers/streams/update_stream_handler.rs @@ -47,6 +47,7 @@ impl ServerCommandHandler for UpdateStream { shard: &Rc<IggyShard>, ) -> Result<HandlerResult, IggyError> { debug!("session: {session}, command: {self}"); + shard.ensure_authenticated(session)?; let stream_id = self.stream_id.clone(); shard .update_stream(session, &self.stream_id, self.name.clone()) diff --git a/core/server/src/binary/handlers/system/get_client_handler.rs b/core/server/src/binary/handlers/system/get_client_handler.rs index b9e16aef3..b62cd076e 100644 --- a/core/server/src/binary/handlers/system/get_client_handler.rs +++ b/core/server/src/binary/handlers/system/get_client_handler.rs @@ -42,6 +42,7 @@ impl ServerCommandHandler for GetClient { shard: &Rc<IggyShard>, ) -> Result<HandlerResult, IggyError> { debug!("session: {session}, command: {self}"); + shard.ensure_authenticated(session)?; let Ok(client) = shard.get_client(session, self.client_id) else { sender.send_empty_ok_response().await?; diff --git a/core/server/src/binary/handlers/system/get_clients_handler.rs b/core/server/src/binary/handlers/system/get_clients_handler.rs index 81ac21b5f..5e365fa8b 100644 --- a/core/server/src/binary/handlers/system/get_clients_handler.rs +++ b/core/server/src/binary/handlers/system/get_clients_handler.rs @@ -44,6 +44,7 @@ impl ServerCommandHandler for GetClients { shard: &Rc<IggyShard>, ) -> Result<HandlerResult, IggyError> { debug!("session: {session}, command: {self}"); + shard.ensure_authenticated(session)?; let clients = shard.get_clients(session).error(|e: &IggyError| { format!("{COMPONENT} (error: {e}) - failed to get clients, session: {session}") diff --git a/core/server/src/binary/handlers/system/get_me_handler.rs b/core/server/src/binary/handlers/system/get_me_handler.rs index 339063ab3..6f9209bb1 100644 --- a/core/server/src/binary/handlers/system/get_me_handler.rs +++ b/core/server/src/binary/handlers/system/get_me_handler.rs @@ -42,6 +42,7 @@ impl ServerCommandHandler for GetMe { session: &Session, shard: &Rc<IggyShard>, ) -> Result<HandlerResult, IggyError> { + shard.ensure_authenticated(session)?; let Some(client) = shard.get_client(session, session.client_id).error( |e: &IggyError| { format!( diff --git a/core/server/src/binary/handlers/system/get_snapshot.rs b/core/server/src/binary/handlers/system/get_snapshot_handler.rs similarity index 97% rename from core/server/src/binary/handlers/system/get_snapshot.rs rename to core/server/src/binary/handlers/system/get_snapshot_handler.rs index cc2163b91..809f8fd60 100644 --- a/core/server/src/binary/handlers/system/get_snapshot.rs +++ b/core/server/src/binary/handlers/system/get_snapshot_handler.rs @@ -42,6 +42,7 @@ impl ServerCommandHandler for GetSnapshot { shard: &Rc<IggyShard>, ) -> Result<HandlerResult, IggyError> { debug!("session: {session}, command: {self}"); + shard.ensure_authenticated(session)?; let snapshot = shard .get_snapshot(session, self.compression, &self.snapshot_types) diff --git a/core/server/src/binary/handlers/system/get_stats_handler.rs b/core/server/src/binary/handlers/system/get_stats_handler.rs index 2fe368ce1..ea787bea6 100644 --- a/core/server/src/binary/handlers/system/get_stats_handler.rs +++ b/core/server/src/binary/handlers/system/get_stats_handler.rs @@ -48,6 +48,7 @@ impl ServerCommandHandler for GetStats { shard: &Rc<IggyShard>, ) -> Result<HandlerResult, IggyError> { debug!("session: {session}, command: {self}"); + shard.ensure_authenticated(session)?; // Route GetStats to shard0 only let request = ShardRequest { diff --git a/core/server/src/binary/handlers/system/mod.rs b/core/server/src/binary/handlers/system/mod.rs index ae177f250..28398754a 100644 --- a/core/server/src/binary/handlers/system/mod.rs +++ b/core/server/src/binary/handlers/system/mod.rs @@ -19,7 +19,7 @@ pub mod get_client_handler; pub mod get_clients_handler; pub mod get_me_handler; -pub mod get_snapshot; +pub mod get_snapshot_handler; pub mod get_stats_handler; pub mod ping_handler; diff --git a/core/server/src/binary/handlers/topics/create_topic_handler.rs b/core/server/src/binary/handlers/topics/create_topic_handler.rs index fa5c0270a..95cbbe4c5 100644 --- a/core/server/src/binary/handlers/topics/create_topic_handler.rs +++ b/core/server/src/binary/handlers/topics/create_topic_handler.rs @@ -55,6 +55,7 @@ impl ServerCommandHandler for CreateTopic { shard: &Rc<IggyShard>, ) -> Result<HandlerResult, IggyError> { debug!("session: {session}, command: {self}"); + shard.ensure_authenticated(session)?; let request = ShardRequest { stream_id: Identifier::default(), diff --git a/core/server/src/binary/handlers/topics/delete_topic_handler.rs b/core/server/src/binary/handlers/topics/delete_topic_handler.rs index 668dbd707..85db4d075 100644 --- a/core/server/src/binary/handlers/topics/delete_topic_handler.rs +++ b/core/server/src/binary/handlers/topics/delete_topic_handler.rs @@ -54,6 +54,7 @@ impl ServerCommandHandler for DeleteTopic { shard: &Rc<IggyShard>, ) -> Result<HandlerResult, IggyError> { debug!("session: {session}, command: {self}"); + shard.ensure_authenticated(session)?; let request = ShardRequest { stream_id: Identifier::default(), diff --git a/core/server/src/binary/handlers/topics/purge_topic_handler.rs b/core/server/src/binary/handlers/topics/purge_topic_handler.rs index 6e86293ae..012795da6 100644 --- a/core/server/src/binary/handlers/topics/purge_topic_handler.rs +++ b/core/server/src/binary/handlers/topics/purge_topic_handler.rs @@ -45,6 +45,7 @@ impl ServerCommandHandler for PurgeTopic { shard: &Rc<IggyShard>, ) -> Result<HandlerResult, IggyError> { debug!("session: {session}, command: {self}"); + shard.ensure_authenticated(session)?; let topic_id = self.topic_id.clone(); let stream_id = self.stream_id.clone(); diff --git a/core/server/src/binary/handlers/topics/update_topic_handler.rs b/core/server/src/binary/handlers/topics/update_topic_handler.rs index 90f5e11ec..aa8912fa7 100644 --- a/core/server/src/binary/handlers/topics/update_topic_handler.rs +++ b/core/server/src/binary/handlers/topics/update_topic_handler.rs @@ -52,6 +52,7 @@ impl ServerCommandHandler for UpdateTopic { shard: &Rc<IggyShard>, ) -> Result<HandlerResult, IggyError> { debug!("session: {session}, command: {self}"); + shard.ensure_authenticated(session)?; let request = ShardRequest { stream_id: Identifier::default(), diff --git a/core/server/src/binary/handlers/users/change_password_handler.rs b/core/server/src/binary/handlers/users/change_password_handler.rs index 0dadd98bb..9b11969cc 100644 --- a/core/server/src/binary/handlers/users/change_password_handler.rs +++ b/core/server/src/binary/handlers/users/change_password_handler.rs @@ -49,6 +49,7 @@ impl ServerCommandHandler for ChangePassword { shard: &Rc<IggyShard>, ) -> Result<HandlerResult, IggyError> { debug!("session: {session}, command: {self}"); + shard.ensure_authenticated(session)?; info!("Changing password for user with ID: {}...", self.user_id); shard diff --git a/core/server/src/binary/handlers/users/create_user_handler.rs b/core/server/src/binary/handlers/users/create_user_handler.rs index 3cace71b3..57501efc0 100644 --- a/core/server/src/binary/handlers/users/create_user_handler.rs +++ b/core/server/src/binary/handlers/users/create_user_handler.rs @@ -54,6 +54,7 @@ impl ServerCommandHandler for CreateUser { shard: &Rc<IggyShard>, ) -> Result<HandlerResult, IggyError> { debug!("session: {session}, command: {self}"); + shard.ensure_authenticated(session)?; let request = ShardRequest { stream_id: Identifier::default(), diff --git a/core/server/src/binary/handlers/users/delete_user_handler.rs b/core/server/src/binary/handlers/users/delete_user_handler.rs index a7fa3ec1b..0c9151f91 100644 --- a/core/server/src/binary/handlers/users/delete_user_handler.rs +++ b/core/server/src/binary/handlers/users/delete_user_handler.rs @@ -53,6 +53,7 @@ impl ServerCommandHandler for DeleteUser { shard: &Rc<IggyShard>, ) -> Result<HandlerResult, IggyError> { debug!("session: {session}, command: {self}"); + shard.ensure_authenticated(session)?; let request = ShardRequest { stream_id: Identifier::default(), diff --git a/core/server/src/binary/handlers/users/get_user_handler.rs b/core/server/src/binary/handlers/users/get_user_handler.rs index 97322a340..4a60cdda4 100644 --- a/core/server/src/binary/handlers/users/get_user_handler.rs +++ b/core/server/src/binary/handlers/users/get_user_handler.rs @@ -43,6 +43,7 @@ impl ServerCommandHandler for GetUser { shard: &Rc<IggyShard>, ) -> Result<HandlerResult, IggyError> { debug!("session: {session}, command: {self}"); + shard.ensure_authenticated(session)?; let Ok(user) = shard.find_user(session, &self.user_id) else { sender.send_empty_ok_response().await?; return Ok(HandlerResult::Finished); diff --git a/core/server/src/binary/handlers/users/get_users_handler.rs b/core/server/src/binary/handlers/users/get_users_handler.rs index bca683a0a..b0ff5e76e 100644 --- a/core/server/src/binary/handlers/users/get_users_handler.rs +++ b/core/server/src/binary/handlers/users/get_users_handler.rs @@ -45,6 +45,7 @@ impl ServerCommandHandler for GetUsers { shard: &Rc<IggyShard>, ) -> Result<HandlerResult, IggyError> { debug!("session: {session}, command: {self}"); + shard.ensure_authenticated(session)?; let users = shard.get_users(session).await.error(|e: &IggyError| { format!("{COMPONENT} (error: {e}) - failed to get users, session: {session}") })?; diff --git a/core/server/src/binary/handlers/users/logout_user_handler.rs b/core/server/src/binary/handlers/users/logout_user_handler.rs index 714815173..f09a20b36 100644 --- a/core/server/src/binary/handlers/users/logout_user_handler.rs +++ b/core/server/src/binary/handlers/users/logout_user_handler.rs @@ -47,6 +47,7 @@ impl ServerCommandHandler for LogoutUser { shard: &Rc<IggyShard>, ) -> Result<HandlerResult, IggyError> { debug!("session: {session}, command: {self}"); + shard.ensure_authenticated(session)?; info!("Logging out user with ID: {}...", session.get_user_id()); shard.logout_user(session).error(|e: &IggyError| { format!("{COMPONENT} (error: {e}) - failed to logout user, session: {session}") diff --git a/core/server/src/binary/handlers/users/update_permissions_handler.rs b/core/server/src/binary/handlers/users/update_permissions_handler.rs index 30829a840..5e46b8a9b 100644 --- a/core/server/src/binary/handlers/users/update_permissions_handler.rs +++ b/core/server/src/binary/handlers/users/update_permissions_handler.rs @@ -49,6 +49,7 @@ impl ServerCommandHandler for UpdatePermissions { shard: &Rc<IggyShard>, ) -> Result<HandlerResult, IggyError> { debug!("session: {session}, command: {self}"); + shard.ensure_authenticated(session)?; shard .update_permissions(session, &self.user_id, self.permissions.clone()) diff --git a/core/server/src/binary/handlers/users/update_user_handler.rs b/core/server/src/binary/handlers/users/update_user_handler.rs index bdb929136..433e024af 100644 --- a/core/server/src/binary/handlers/users/update_user_handler.rs +++ b/core/server/src/binary/handlers/users/update_user_handler.rs @@ -49,6 +49,7 @@ impl ServerCommandHandler for UpdateUser { shard: &Rc<IggyShard>, ) -> Result<HandlerResult, IggyError> { debug!("session: {session}, command: {self}"); + shard.ensure_authenticated(session)?; let user =shard .update_user( diff --git a/core/server/src/binary/macros.rs b/core/server/src/binary/macros.rs index a56c0bee0..f69632c4c 100644 --- a/core/server/src/binary/macros.rs +++ b/core/server/src/binary/macros.rs @@ -33,7 +33,7 @@ macro_rules! define_server_command_enum { );* $(;)? ) => { #[enum_dispatch(ServerCommandHandler)] - #[derive(Debug, PartialEq, EnumString)] + #[derive(Debug, PartialEq, EnumString, EnumIter)] pub enum ServerCommand { $( $variant($ty), diff --git a/core/server/src/http/consumer_groups.rs b/core/server/src/http/consumer_groups.rs index 5faf48adb..6f66de799 100644 --- a/core/server/src/http/consumer_groups.rs +++ b/core/server/src/http/consumer_groups.rs @@ -65,8 +65,6 @@ async fn get_consumer_group( let session = Session::stateless(identity.user_id, identity.ip_address); - // Check permissions and existence - state.shard.shard().ensure_authenticated(&session)?; let exists = state .shard .shard() @@ -117,8 +115,6 @@ async fn get_consumer_groups( let session = Session::stateless(identity.user_id, identity.ip_address); - // Check permissions and existence - state.shard.shard().ensure_authenticated(&session)?; state .shard .shard() diff --git a/core/server/src/http/jwt/middleware.rs b/core/server/src/http/jwt/middleware.rs index 591d3eea5..910e02ccb 100644 --- a/core/server/src/http/jwt/middleware.rs +++ b/core/server/src/http/jwt/middleware.rs @@ -38,7 +38,6 @@ const PUBLIC_PATHS: &[&str] = &[ "/", "/metrics", "/ping", - "/stats", "/users/login", "/users/refresh-token", "/personal-access-tokens/login", diff --git a/core/server/src/http/streams.rs b/core/server/src/http/streams.rs index 9054777ea..d96fd4e1e 100644 --- a/core/server/src/http/streams.rs +++ b/core/server/src/http/streams.rs @@ -55,6 +55,7 @@ pub fn router(state: Arc<AppState>) -> Router { #[debug_handler] async fn get_stream( State(state): State<Arc<AppState>>, + Extension(identity): Extension<Identity>, Path(stream_id): Path<String>, ) -> Result<Json<StreamDetails>, CustomError> { let stream_id = Identifier::from_str_value(&stream_id)?; @@ -63,6 +64,8 @@ async fn get_stream( return Err(CustomError::ResourceNotFound); } + let _session = Session::stateless(identity.user_id, identity.ip_address); + // Use direct slab access for thread-safe stream retrieval let stream_details = SendWrapper::new(|| { state @@ -78,7 +81,12 @@ async fn get_stream( } #[debug_handler] -async fn get_streams(State(state): State<Arc<AppState>>) -> Result<Json<Vec<Stream>>, CustomError> { +async fn get_streams( + State(state): State<Arc<AppState>>, + Extension(identity): Extension<Identity>, +) -> Result<Json<Vec<Stream>>, CustomError> { + let _session = Session::stateless(identity.user_id, identity.ip_address); + // Use direct slab access for thread-safe streams retrieval let streams = SendWrapper::new(|| { state.shard.shard().streams.with_components(|stream_ref| { diff --git a/core/server/src/http/system.rs b/core/server/src/http/system.rs index 42ae94bbf..af442a922 100644 --- a/core/server/src/http/system.rs +++ b/core/server/src/http/system.rs @@ -65,11 +65,18 @@ async fn get_metrics(State(state): State<Arc<AppState>>) -> Result<String, Custo } #[debug_handler] -async fn get_stats(State(state): State<Arc<AppState>>) -> Result<Json<Stats>, CustomError> { +async fn get_stats( + State(state): State<Arc<AppState>>, + Extension(identity): Extension<Identity>, +) -> Result<Json<Stats>, CustomError> { + let _session = Session::stateless(identity.user_id, identity.ip_address); let stats_future = SendWrapper::new(state.shard.shard().get_stats()); - let stats = stats_future - .await - .error(|e: &IggyError| format!("{COMPONENT} (error: {e}) - failed to get stats"))?; + let stats = stats_future.await.error(|e: &IggyError| { + format!( + "{COMPONENT} (error: {e}) - failed to get stats, user ID: {}", + identity.user_id + ) + })?; Ok(Json(stats)) } diff --git a/core/server/src/http/topics.rs b/core/server/src/http/topics.rs index 8341c2cfa..cc686e69f 100644 --- a/core/server/src/http/topics.rs +++ b/core/server/src/http/topics.rs @@ -69,8 +69,6 @@ async fn get_topic( let session = Session::stateless(identity.user_id, identity.ip_address); - // Check permissions and stream existence - state.shard.shard().ensure_authenticated(&session)?; let stream_exists = state .shard .shard() @@ -130,8 +128,6 @@ async fn get_topics( let stream_id = Identifier::from_str_value(&stream_id)?; let session = Session::stateless(identity.user_id, identity.ip_address); - // Check permissions and stream existence - state.shard.shard().ensure_authenticated(&session)?; state.shard.shard().ensure_stream_exists(&stream_id)?; let numeric_stream_id = state diff --git a/core/server/src/shard/system/clients.rs b/core/server/src/shard/system/clients.rs index 2259e5b11..fd01862a6 100644 --- a/core/server/src/shard/system/clients.rs +++ b/core/server/src/shard/system/clients.rs @@ -73,7 +73,6 @@ impl IggyShard { session: &Session, client_id: u32, ) -> Result<Option<Client>, IggyError> { - self.ensure_authenticated(session)?; self.permissioner .borrow() .get_client(session.get_user_id()) @@ -88,7 +87,6 @@ impl IggyShard { } pub fn get_clients(&self, session: &Session) -> Result<Vec<Client>, IggyError> { - self.ensure_authenticated(session)?; self.permissioner .borrow() .get_clients(session.get_user_id()) diff --git a/core/server/src/shard/system/consumer_groups.rs b/core/server/src/shard/system/consumer_groups.rs index 1f9610a22..7f04e5f6e 100644 --- a/core/server/src/shard/system/consumer_groups.rs +++ b/core/server/src/shard/system/consumer_groups.rs @@ -41,7 +41,6 @@ impl IggyShard { topic_id: &Identifier, name: String, ) -> Result<consumer_group::ConsumerGroup, IggyError> { - self.ensure_authenticated(session)?; self.ensure_topic_exists(stream_id, topic_id)?; let exists = self .streams @@ -115,7 +114,6 @@ impl IggyShard { topic_id: &Identifier, group_id: &Identifier, ) -> Result<consumer_group::ConsumerGroup, IggyError> { - self.ensure_authenticated(session)?; self.ensure_consumer_group_exists(stream_id, topic_id, group_id)?; { let topic_id = @@ -183,7 +181,6 @@ impl IggyShard { topic_id: &Identifier, group_id: &Identifier, ) -> Result<(), IggyError> { - self.ensure_authenticated(session)?; self.ensure_consumer_group_exists(stream_id, topic_id, group_id)?; { let topic_id = @@ -242,7 +239,6 @@ impl IggyShard { topic_id: &Identifier, group_id: &Identifier, ) -> Result<(), IggyError> { - self.ensure_authenticated(session)?; self.ensure_consumer_group_exists(stream_id, topic_id, group_id)?; { let topic_id = diff --git a/core/server/src/shard/system/consumer_offsets.rs b/core/server/src/shard/system/consumer_offsets.rs index 1777675a1..da85b75ea 100644 --- a/core/server/src/shard/system/consumer_offsets.rs +++ b/core/server/src/shard/system/consumer_offsets.rs @@ -40,7 +40,6 @@ impl IggyShard { partition_id: Option<u32>, offset: u64, ) -> Result<(PollingConsumer, usize), IggyError> { - self.ensure_authenticated(session)?; self.ensure_topic_exists(stream_id, topic_id)?; { let topic_id = @@ -93,7 +92,6 @@ impl IggyShard { topic_id: &Identifier, partition_id: Option<u32>, ) -> Result<Option<ConsumerOffsetInfo>, IggyError> { - self.ensure_authenticated(session)?; self.ensure_topic_exists(stream_id, topic_id)?; { let topic_id = @@ -153,7 +151,6 @@ impl IggyShard { topic_id: &Identifier, partition_id: Option<u32>, ) -> Result<(PollingConsumer, usize), IggyError> { - self.ensure_authenticated(session)?; self.ensure_topic_exists(stream_id, topic_id)?; { let topic_id = diff --git a/core/server/src/shard/system/partitions.rs b/core/server/src/shard/system/partitions.rs index aacbd52c0..7607fd804 100644 --- a/core/server/src/shard/system/partitions.rs +++ b/core/server/src/shard/system/partitions.rs @@ -68,7 +68,6 @@ impl IggyShard { topic_id: &Identifier, partitions_count: u32, ) -> Result<Vec<partition::Partition>, IggyError> { - self.ensure_authenticated(session)?; self.ensure_topic_exists(stream_id, topic_id)?; let numeric_stream_id = self .streams @@ -228,7 +227,6 @@ impl IggyShard { topic_id: &Identifier, partitions_count: u32, ) -> Result<Vec<usize>, IggyError> { - self.ensure_authenticated(session)?; self.ensure_partitions_exist(stream_id, topic_id, partitions_count)?; let numeric_stream_id = self diff --git a/core/server/src/shard/system/personal_access_tokens.rs b/core/server/src/shard/system/personal_access_tokens.rs index 7fe8dbbb3..6966193e4 100644 --- a/core/server/src/shard/system/personal_access_tokens.rs +++ b/core/server/src/shard/system/personal_access_tokens.rs @@ -32,7 +32,6 @@ impl IggyShard { &self, session: &Session, ) -> Result<Vec<PersonalAccessToken>, IggyError> { - self.ensure_authenticated(session)?; let user_id = session.get_user_id(); let user = self.get_user(&user_id.try_into()?).error(|e: &IggyError| { format!("{COMPONENT} (error: {e}) - failed to get user with id: {user_id}") @@ -57,7 +56,6 @@ impl IggyShard { name: &str, expiry: IggyExpiry, ) -> Result<(PersonalAccessToken, String), IggyError> { - self.ensure_authenticated(session)?; let user_id = session.get_user_id(); let identifier = user_id.try_into()?; { @@ -131,7 +129,6 @@ impl IggyShard { session: &Session, name: &str, ) -> Result<(), IggyError> { - self.ensure_authenticated(session)?; let user_id = session.get_user_id(); self.delete_personal_access_token_base(user_id, name) } diff --git a/core/server/src/shard/system/snapshot/mod.rs b/core/server/src/shard/system/snapshot/mod.rs index 35d94ef00..d157da8ab 100644 --- a/core/server/src/shard/system/snapshot/mod.rs +++ b/core/server/src/shard/system/snapshot/mod.rs @@ -43,12 +43,10 @@ use std::process::Command; impl IggyShard { pub async fn get_snapshot( &self, - session: &Session, + _session: &Session, compression: SnapshotCompression, snapshot_types: &Vec<SystemSnapshotType>, ) -> Result<Snapshot, IggyError> { - self.ensure_authenticated(session)?; - let snapshot_types = if snapshot_types.contains(&SystemSnapshotType::All) { if snapshot_types.len() > 1 { error!("When using 'All' snapshot type, no other types can be specified"); diff --git a/core/server/src/shard/system/streams.rs b/core/server/src/shard/system/streams.rs index 418532542..f64131c76 100644 --- a/core/server/src/shard/system/streams.rs +++ b/core/server/src/shard/system/streams.rs @@ -31,7 +31,6 @@ impl IggyShard { session: &Session, name: String, ) -> Result<stream::Stream, IggyError> { - self.ensure_authenticated(session)?; self.permissioner .borrow() .create_stream(session.get_user_id())?; @@ -63,7 +62,6 @@ impl IggyShard { stream_id: &Identifier, name: String, ) -> Result<(), IggyError> { - self.ensure_authenticated(session)?; self.ensure_stream_exists(stream_id)?; let id = self .streams @@ -129,7 +127,6 @@ impl IggyShard { session: &Session, id: &Identifier, ) -> Result<stream::Stream, IggyError> { - self.ensure_authenticated(session)?; self.ensure_stream_exists(id)?; let stream_id = self .streams @@ -178,7 +175,6 @@ impl IggyShard { session: &Session, stream_id: &Identifier, ) -> Result<(), IggyError> { - self.ensure_authenticated(session)?; self.ensure_stream_exists(stream_id)?; { let get_stream_id = crate::streaming::streams::helpers::get_stream_id(); diff --git a/core/server/src/shard/system/topics.rs b/core/server/src/shard/system/topics.rs index 042a49821..295a7d409 100644 --- a/core/server/src/shard/system/topics.rs +++ b/core/server/src/shard/system/topics.rs @@ -40,7 +40,6 @@ impl IggyShard { max_topic_size: MaxTopicSize, replication_factor: Option<u8>, ) -> Result<topic::Topic, IggyError> { - self.ensure_authenticated(session)?; self.ensure_stream_exists(stream_id)?; let numeric_stream_id = self.streams.get_index(stream_id); { @@ -103,7 +102,6 @@ impl IggyShard { max_topic_size: MaxTopicSize, replication_factor: Option<u8>, ) -> Result<(), IggyError> { - self.ensure_authenticated(session)?; self.ensure_topic_exists(stream_id, topic_id)?; { let topic_id_val = @@ -202,7 +200,6 @@ impl IggyShard { stream_id: &Identifier, topic_id: &Identifier, ) -> Result<topic::Topic, IggyError> { - self.ensure_authenticated(session)?; self.ensure_topic_exists(stream_id, topic_id)?; let numeric_topic_id = self.streams @@ -274,7 +271,6 @@ impl IggyShard { stream_id: &Identifier, topic_id: &Identifier, ) -> Result<(), IggyError> { - self.ensure_authenticated(session)?; self.ensure_topic_exists(stream_id, topic_id)?; { let topic_id = diff --git a/core/server/src/shard/system/users.rs b/core/server/src/shard/system/users.rs index 04a416444..31266d496 100644 --- a/core/server/src/shard/system/users.rs +++ b/core/server/src/shard/system/users.rs @@ -36,7 +36,6 @@ impl IggyShard { session: &Session, user_id: &Identifier, ) -> Result<Option<User>, IggyError> { - self.ensure_authenticated(session)?; let Some(user) = self.try_get_user(user_id)? else { return Ok(None); }; @@ -63,7 +62,6 @@ impl IggyShard { } pub async fn get_users(&self, session: &Session) -> Result<Vec<User>, IggyError> { - self.ensure_authenticated(session)?; self.permissioner .borrow() .get_users(session.get_user_id()) @@ -84,7 +82,6 @@ impl IggyShard { status: UserStatus, permissions: Option<Permissions>, ) -> Result<User, IggyError> { - self.ensure_authenticated(session)?; self.permissioner .borrow() .create_user(session.get_user_id()) @@ -148,7 +145,6 @@ impl IggyShard { } pub fn delete_user(&self, session: &Session, user_id: &Identifier) -> Result<User, IggyError> { - self.ensure_authenticated(session)?; self.permissioner .borrow() .delete_user(session.get_user_id()) @@ -204,7 +200,6 @@ impl IggyShard { username: Option<String>, status: Option<UserStatus>, ) -> Result<User, IggyError> { - self.ensure_authenticated(session)?; self.permissioner .borrow() .update_user(session.get_user_id()) @@ -266,8 +261,6 @@ impl IggyShard { user_id: &Identifier, permissions: Option<Permissions>, ) -> Result<(), IggyError> { - self.ensure_authenticated(session)?; - { self.permissioner .borrow() @@ -326,8 +319,6 @@ impl IggyShard { current_password: &str, new_password: &str, ) -> Result<(), IggyError> { - self.ensure_authenticated(session)?; - { let user = self.get_user(user_id).error(|e: &IggyError| { format!("{COMPONENT} (error: {e}) - failed to get user with id: {user_id}") @@ -437,7 +428,6 @@ impl IggyShard { } pub fn logout_user(&self, session: &Session) -> Result<(), IggyError> { - self.ensure_authenticated(session)?; let client_id = session.client_id; self.logout_user_base(client_id)?; Ok(())
