This is an automated email from the ASF dual-hosted git repository.
hgruszecki pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new e4460cb3e feat(server): require auth for stats endpoint (#2559)
e4460cb3e is described below
commit e4460cb3ed6839e86a1a547f21e78226b1a76509
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Wed Jan 14 16:17:00 2026 +0100
feat(server): require auth for stats endpoint (#2559)
Remove /stats from PUBLIC_PATHS, add identity extraction
to handler. Add exhaustive auth scenario test with
EnumIter derive. Add ensure_authenticated to all binary
handlers that require authentication.
---
Cargo.lock | 3 +-
DEPENDENCIES.md | 2 +-
.../tests/tcp_test/offset_feature_deserialize.go | 5 +-
core/integration/Cargo.toml | 1 +
core/integration/tests/server/general.rs | 3 +-
core/integration/tests/server/mod.rs | 8 +-
.../server/scenarios/authentication_scenario.rs | 465 +++++++++++++++++++++
core/integration/tests/server/scenarios/mod.rs | 1 +
core/sdk/src/http/http_client.rs | 1 -
core/server/Cargo.toml | 2 +-
core/server/server.http | 33 +-
core/server/src/binary/command.rs | 2 +-
.../cluster/get_cluster_metadata_handler.rs | 1 +
.../create_consumer_group_handler.rs | 1 +
.../delete_consumer_group_handler.rs | 1 +
.../consumer_groups/join_consumer_group_handler.rs | 1 +
.../leave_consumer_group_handler.rs | 1 +
.../delete_consumer_offset_handler.rs | 1 +
.../get_consumer_offset_handler.rs | 1 +
.../store_consumer_offset_handler.rs | 1 +
.../messages/flush_unsaved_buffer_handler.rs | 1 +
.../handlers/messages/poll_messages_handler.rs | 1 +
.../handlers/messages/send_messages_handler.rs | 1 +
.../partitions/create_partitions_handler.rs | 1 +
.../partitions/delete_partitions_handler.rs | 1 +
.../create_personal_access_token_handler.rs | 1 +
.../delete_personal_access_token_handler.rs | 1 +
.../get_personal_access_tokens_handler.rs | 1 +
.../handlers/streams/create_stream_handler.rs | 1 +
.../handlers/streams/delete_stream_handler.rs | 1 +
.../handlers/streams/purge_stream_handler.rs | 1 +
.../handlers/streams/update_stream_handler.rs | 1 +
.../binary/handlers/system/get_client_handler.rs | 1 +
.../binary/handlers/system/get_clients_handler.rs | 1 +
.../src/binary/handlers/system/get_me_handler.rs | 1 +
.../{get_snapshot.rs => get_snapshot_handler.rs} | 1 +
.../binary/handlers/system/get_stats_handler.rs | 1 +
core/server/src/binary/handlers/system/mod.rs | 2 +-
.../binary/handlers/topics/create_topic_handler.rs | 1 +
.../binary/handlers/topics/delete_topic_handler.rs | 1 +
.../binary/handlers/topics/purge_topic_handler.rs | 1 +
.../binary/handlers/topics/update_topic_handler.rs | 1 +
.../handlers/users/change_password_handler.rs | 1 +
.../binary/handlers/users/create_user_handler.rs | 1 +
.../binary/handlers/users/delete_user_handler.rs | 1 +
.../src/binary/handlers/users/get_user_handler.rs | 1 +
.../src/binary/handlers/users/get_users_handler.rs | 1 +
.../binary/handlers/users/logout_user_handler.rs | 1 +
.../handlers/users/update_permissions_handler.rs | 1 +
.../binary/handlers/users/update_user_handler.rs | 1 +
core/server/src/binary/macros.rs | 2 +-
core/server/src/http/consumer_groups.rs | 4 -
core/server/src/http/jwt/middleware.rs | 1 -
core/server/src/http/streams.rs | 10 +-
core/server/src/http/system.rs | 15 +-
core/server/src/http/topics.rs | 4 -
core/server/src/shard/system/clients.rs | 2 -
core/server/src/shard/system/consumer_groups.rs | 4 -
core/server/src/shard/system/consumer_offsets.rs | 3 -
core/server/src/shard/system/partitions.rs | 2 -
.../src/shard/system/personal_access_tokens.rs | 3 -
core/server/src/shard/system/snapshot/mod.rs | 4 +-
core/server/src/shard/system/streams.rs | 4 -
core/server/src/shard/system/topics.rs | 4 -
core/server/src/shard/system/users.rs | 10 -
65 files changed, 560 insertions(+), 77 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index 88953e444..a9e03e889 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -5035,6 +5035,7 @@ dependencies = [
"serde_json",
"serial_test",
"server",
+ "strum 0.27.2",
"strum_macros 0.27.2",
"tempfile",
"test-case",
@@ -8290,7 +8291,7 @@ dependencies = [
[[package]]
name = "server"
-version = "0.6.1-edge.3"
+version = "0.6.1-edge.4"
dependencies = [
"ahash 0.8.12",
"anyhow",
diff --git a/DEPENDENCIES.md b/DEPENDENCIES.md
index 31143eead..ac097d204 100644
--- a/DEPENDENCIES.md
+++ b/DEPENDENCIES.md
@@ -729,7 +729,7 @@ serde_with_macros: 3.16.1, "Apache-2.0 OR MIT",
serde_yaml_ng: 0.10.0, "MIT",
serial_test: 3.2.0, "MIT",
serial_test_derive: 3.2.0, "MIT",
-server: 0.6.1-edge.3, "Apache-2.0",
+server: 0.6.1-edge.4, "Apache-2.0",
sha1: 0.10.6, "Apache-2.0 OR MIT",
sha2: 0.10.9, "Apache-2.0 OR MIT",
sha3: 0.10.8, "Apache-2.0 OR MIT",
diff --git a/bdd/go/tests/tcp_test/offset_feature_deserialize.go
b/bdd/go/tests/tcp_test/offset_feature_deserialize.go
index abec1494e..d37808eb9 100644
--- a/bdd/go/tests/tcp_test/offset_feature_deserialize.go
+++ b/bdd/go/tests/tcp_test/offset_feature_deserialize.go
@@ -183,15 +183,14 @@ var _ = ginkgo.Describe("GET CONSUMER OFFSET:", func() {
consumer :=
iggcon.NewGroupConsumer(randomU32Identifier())
partitionId := uint32(1)
- offset, err := client.GetConsumerOffset(
+ _, err := client.GetConsumerOffset(
consumer,
randomU32Identifier(),
randomU32Identifier(),
&partitionId,
)
- itShouldNotReturnError(err)
- itShouldReturnNilOffsetForNewConsumerGroup(offset)
+ itShouldReturnUnauthenticatedError(err)
})
})
})
diff --git a/core/integration/Cargo.toml b/core/integration/Cargo.toml
index bc4160710..e30ef758c 100644
--- a/core/integration/Cargo.toml
+++ b/core/integration/Cargo.toml
@@ -58,6 +58,7 @@ rmcp = { version = "0.12.0", features = [
serde_json = { workspace = true }
serial_test = { workspace = true }
server = { workspace = true }
+strum = { workspace = true }
strum_macros = { workspace = true }
tempfile = { workspace = true }
test-case = { workspace = true }
diff --git a/core/integration/tests/server/general.rs
b/core/integration/tests/server/general.rs
index 9aa0ef04c..f2bb3de10 100644
--- a/core/integration/tests/server/general.rs
+++ b/core/integration/tests/server/general.rs
@@ -16,7 +16,7 @@
// under the License.
use crate::server::{
- ScenarioFn, bench_scenario, consumer_timestamp_polling_scenario,
+ ScenarioFn, authentication_scenario, bench_scenario,
consumer_timestamp_polling_scenario,
create_message_payload_scenario, message_headers_scenario, run_scenario,
stream_size_validation_scenario, system_scenario, user_scenario,
};
@@ -27,6 +27,7 @@ use test_case::test_matrix;
#[test_matrix(
[quic(), tcp(), http(), websocket()],
[
+ authentication_scenario(),
system_scenario(),
user_scenario(),
message_headers_scenario(),
diff --git a/core/integration/tests/server/mod.rs
b/core/integration/tests/server/mod.rs
index f1a3524ea..0b8fd502b 100644
--- a/core/integration/tests/server/mod.rs
+++ b/core/integration/tests/server/mod.rs
@@ -32,8 +32,8 @@ use integration::{
websocket_client::WebSocketClientFactory,
};
use scenarios::{
- bench_scenario, consumer_group_auto_commit_reconnection_scenario,
consumer_group_join_scenario,
- consumer_group_offset_cleanup_scenario,
+ authentication_scenario, bench_scenario,
consumer_group_auto_commit_reconnection_scenario,
+ consumer_group_join_scenario, consumer_group_offset_cleanup_scenario,
consumer_group_with_multiple_clients_polling_messages_scenario,
consumer_group_with_single_client_polling_messages_scenario,
consumer_timestamp_polling_scenario, create_message_payload,
message_headers_scenario,
@@ -46,6 +46,10 @@ use std::{collections::HashMap, future::Future};
type ScenarioFn = fn(&dyn ClientFactory) -> Pin<Box<dyn Future<Output = ()> +
'_>>;
+fn authentication_scenario() -> ScenarioFn {
+ |factory| Box::pin(authentication_scenario::run(factory))
+}
+
fn system_scenario() -> ScenarioFn {
|factory| Box::pin(system_scenario::run(factory))
}
diff --git a/core/integration/tests/server/scenarios/authentication_scenario.rs
b/core/integration/tests/server/scenarios/authentication_scenario.rs
new file mode 100644
index 000000000..f793e49a4
--- /dev/null
+++ b/core/integration/tests/server/scenarios/authentication_scenario.rs
@@ -0,0 +1,465 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+//! Authentication scenario test.
+//!
+//! Validates that all commands require authentication except:
+//! - Ping (health check)
+//! - LoginUser (authenticate with username/password)
+//! - LoginWithPersonalAccessToken (authenticate with PAT)
+//!
+//! Uses exhaustive matching on `ServerCommand` to ensure no command is missed
+//! when new commands are added.
+
+use crate::server::scenarios::create_client;
+use bytes::Bytes;
+use iggy::prelude::*;
+use integration::test_server::{ClientFactory, login_root};
+use server::binary::command::ServerCommand;
+use strum::IntoEnumIterator;
+
+const STREAM_NAME: &str = "auth-test-stream";
+const TOPIC_NAME: &str = "auth-test-topic";
+
+/// Shared test context with identifiers used across command tests.
+struct TestContext {
+ stream_id: Identifier,
+ topic_id: Identifier,
+ user_id: Identifier,
+ group_id: Identifier,
+ consumer: Consumer,
+}
+
+impl TestContext {
+ fn new() -> Self {
+ Self {
+ stream_id: Identifier::named(STREAM_NAME).unwrap(),
+ topic_id: Identifier::named(TOPIC_NAME).unwrap(),
+ user_id: Identifier::numeric(1).unwrap(),
+ group_id: Identifier::named("test-group").unwrap(),
+ consumer: Consumer::default(),
+ }
+ }
+}
+
+pub async fn run(client_factory: &dyn ClientFactory) {
+ let client = create_client(client_factory).await;
+
+ // Phase 1: Verify ping works without auth
+ client.ping().await.expect("ping should work without auth");
+
+ // Phase 2: Verify all protected commands fail without auth
+ test_all_commands_require_auth(&client).await;
+
+ // Phase 3: Login and verify commands work
+ let identity = login_root(&client).await;
+ assert_eq!(identity.user_id, 0, "root user should have id 0");
+ setup_test_resources(&client).await;
+ verify_auth_works(&client).await;
+
+ // Phase 4: Logout and verify commands fail again
+ client.logout_user().await.expect("logout should succeed");
+ test_all_commands_require_auth(&client).await;
+
+ // Phase 5: Test PAT authentication
+ login_root(&client).await;
+ let raw_pat = client
+ .create_personal_access_token("auth-test-pat",
PersonalAccessTokenExpiry::NeverExpire)
+ .await
+ .expect("should create PAT");
+
+ client.logout_user().await.expect("logout should succeed");
+ assert!(
+ client.get_streams().await.is_err(),
+ "should fail after logout"
+ );
+
+ let identity = client
+ .login_with_personal_access_token(&raw_pat.token)
+ .await
+ .expect("PAT login should work");
+ assert_eq!(identity.user_id, 0, "PAT should authenticate as root");
+
+ client
+ .get_streams()
+ .await
+ .expect("get_streams should work after PAT login");
+
+ cleanup_test_resources(&client).await;
+}
+
+/// Tests all commands require authentication using exhaustive matching.
+/// If a new command is added to ServerCommand, this match becomes
non-exhaustive.
+async fn test_all_commands_require_auth(client: &IggyClient) {
+ let ctx = TestContext::new();
+
+ for cmd in ServerCommand::iter() {
+ let (name, result): (&str, Result<(), IggyError>) = match cmd {
+ // ================================================================
+ // NO AUTH REQUIRED (3 commands)
+ // ================================================================
+ ServerCommand::Ping(_)
+ | ServerCommand::LoginUser(_)
+ | ServerCommand::LoginWithPersonalAccessToken(_) => continue,
+
+ // ================================================================
+ // STATEFUL - NOT SUPPORTED ON HTTP (2 commands)
+ // ================================================================
+ ServerCommand::JoinConsumerGroup(_) |
ServerCommand::LeaveConsumerGroup(_) => continue,
+
+ // ================================================================
+ // SPECIAL SETUP REQUIRED (3 commands)
+ // ================================================================
+ ServerCommand::GetSnapshot(_)
+ | ServerCommand::DeleteSegments(_)
+ | ServerCommand::LogoutUser(_) => continue,
+
+ // ================================================================
+ // REQUIRES AUTH (39 commands)
+ // ================================================================
+
+ // System
+ ServerCommand::GetStats(_) => ("GetStats",
client.get_stats().await.map(|_| ())),
+ ServerCommand::GetMe(_) => ("GetMe", client.get_me().await.map(|_|
())),
+ ServerCommand::GetClient(_) => ("GetClient",
client.get_client(1).await.map(|_| ())),
+ ServerCommand::GetClients(_) => ("GetClients",
client.get_clients().await.map(|_| ())),
+ ServerCommand::GetClusterMetadata(_) => (
+ "GetClusterMetadata",
+ client.get_cluster_metadata().await.map(|_| ()),
+ ),
+
+ // Users
+ ServerCommand::GetUser(_) => {
+ ("GetUser", client.get_user(&ctx.user_id).await.map(|_| ()))
+ }
+ ServerCommand::GetUsers(_) => ("GetUsers",
client.get_users().await.map(|_| ())),
+ ServerCommand::CreateUser(_) => (
+ "CreateUser",
+ client
+ .create_user("test", "test", UserStatus::Active, None)
+ .await
+ .map(|_| ()),
+ ),
+ ServerCommand::DeleteUser(_) => ("DeleteUser",
client.delete_user(&ctx.user_id).await),
+ ServerCommand::UpdateUser(_) => (
+ "UpdateUser",
+ client.update_user(&ctx.user_id, Some("x"), None).await,
+ ),
+ ServerCommand::UpdatePermissions(_) => (
+ "UpdatePermissions",
+ client.update_permissions(&ctx.user_id, None).await,
+ ),
+ ServerCommand::ChangePassword(_) => (
+ "ChangePassword",
+ client.change_password(&ctx.user_id, "old", "new").await,
+ ),
+
+ // PAT
+ ServerCommand::GetPersonalAccessTokens(_) => (
+ "GetPersonalAccessTokens",
+ client.get_personal_access_tokens().await.map(|_| ()),
+ ),
+ ServerCommand::CreatePersonalAccessToken(_) => (
+ "CreatePersonalAccessToken",
+ client
+ .create_personal_access_token("x",
PersonalAccessTokenExpiry::NeverExpire)
+ .await
+ .map(|_| ()),
+ ),
+ ServerCommand::DeletePersonalAccessToken(_) => (
+ "DeletePersonalAccessToken",
+ client.delete_personal_access_token("x").await,
+ ),
+
+ // Streams
+ ServerCommand::GetStream(_) => (
+ "GetStream",
+ client.get_stream(&ctx.stream_id).await.map(|_| ()),
+ ),
+ ServerCommand::GetStreams(_) => ("GetStreams",
client.get_streams().await.map(|_| ())),
+ ServerCommand::CreateStream(_) => {
+ ("CreateStream", client.create_stream("x").await.map(|_| ()))
+ }
+ ServerCommand::DeleteStream(_) => {
+ ("DeleteStream", client.delete_stream(&ctx.stream_id).await)
+ }
+ ServerCommand::UpdateStream(_) => (
+ "UpdateStream",
+ client.update_stream(&ctx.stream_id, "x").await,
+ ),
+ ServerCommand::PurgeStream(_) => {
+ ("PurgeStream", client.purge_stream(&ctx.stream_id).await)
+ }
+
+ // Topics
+ ServerCommand::GetTopic(_) => (
+ "GetTopic",
+ client
+ .get_topic(&ctx.stream_id, &ctx.topic_id)
+ .await
+ .map(|_| ()),
+ ),
+ ServerCommand::GetTopics(_) => (
+ "GetTopics",
+ client.get_topics(&ctx.stream_id).await.map(|_| ()),
+ ),
+ ServerCommand::CreateTopic(_) => (
+ "CreateTopic",
+ client
+ .create_topic(
+ &ctx.stream_id,
+ "x",
+ 1,
+ CompressionAlgorithm::None,
+ None,
+ IggyExpiry::NeverExpire,
+ MaxTopicSize::ServerDefault,
+ )
+ .await
+ .map(|_| ()),
+ ),
+ ServerCommand::DeleteTopic(_) => (
+ "DeleteTopic",
+ client.delete_topic(&ctx.stream_id, &ctx.topic_id).await,
+ ),
+ ServerCommand::UpdateTopic(_) => (
+ "UpdateTopic",
+ client
+ .update_topic(
+ &ctx.stream_id,
+ &ctx.topic_id,
+ "x",
+ CompressionAlgorithm::None,
+ None,
+ IggyExpiry::NeverExpire,
+ MaxTopicSize::ServerDefault,
+ )
+ .await,
+ ),
+ ServerCommand::PurgeTopic(_) => (
+ "PurgeTopic",
+ client.purge_topic(&ctx.stream_id, &ctx.topic_id).await,
+ ),
+
+ // Partitions
+ ServerCommand::CreatePartitions(_) => (
+ "CreatePartitions",
+ client
+ .create_partitions(&ctx.stream_id, &ctx.topic_id, 1)
+ .await,
+ ),
+ ServerCommand::DeletePartitions(_) => (
+ "DeletePartitions",
+ client
+ .delete_partitions(&ctx.stream_id, &ctx.topic_id, 1)
+ .await,
+ ),
+
+ // Messages
+ ServerCommand::SendMessages(_) => {
+ let mut msgs = vec![
+ IggyMessage::builder()
+ .payload(Bytes::from("x"))
+ .build()
+ .unwrap(),
+ ];
+ (
+ "SendMessages",
+ client
+ .send_messages(
+ &ctx.stream_id,
+ &ctx.topic_id,
+ &Partitioning::partition_id(0),
+ &mut msgs,
+ )
+ .await,
+ )
+ }
+ ServerCommand::PollMessages(_) => (
+ "PollMessages",
+ client
+ .poll_messages(
+ &ctx.stream_id,
+ &ctx.topic_id,
+ Some(0),
+ &ctx.consumer,
+ &PollingStrategy::offset(0),
+ 1,
+ false,
+ )
+ .await
+ .map(|_| ()),
+ ),
+ ServerCommand::FlushUnsavedBuffer(_) => (
+ "FlushUnsavedBuffer",
+ client
+ .flush_unsaved_buffer(&ctx.stream_id, &ctx.topic_id, 0,
false)
+ .await,
+ ),
+
+ // Consumer Offsets
+ ServerCommand::GetConsumerOffset(_) => (
+ "GetConsumerOffset",
+ client
+ .get_consumer_offset(&ctx.consumer, &ctx.stream_id,
&ctx.topic_id, Some(0))
+ .await
+ .map(|_| ()),
+ ),
+ ServerCommand::StoreConsumerOffset(_) => (
+ "StoreConsumerOffset",
+ client
+ .store_consumer_offset(&ctx.consumer, &ctx.stream_id,
&ctx.topic_id, Some(0), 0)
+ .await,
+ ),
+ ServerCommand::DeleteConsumerOffset(_) => (
+ "DeleteConsumerOffset",
+ client
+ .delete_consumer_offset(&ctx.consumer, &ctx.stream_id,
&ctx.topic_id, Some(0))
+ .await,
+ ),
+
+ // Consumer Groups
+ ServerCommand::GetConsumerGroup(_) => (
+ "GetConsumerGroup",
+ client
+ .get_consumer_group(&ctx.stream_id, &ctx.topic_id,
&ctx.group_id)
+ .await
+ .map(|_| ()),
+ ),
+ ServerCommand::GetConsumerGroups(_) => (
+ "GetConsumerGroups",
+ client
+ .get_consumer_groups(&ctx.stream_id, &ctx.topic_id)
+ .await
+ .map(|_| ()),
+ ),
+ ServerCommand::CreateConsumerGroup(_) => (
+ "CreateConsumerGroup",
+ client
+ .create_consumer_group(&ctx.stream_id, &ctx.topic_id, "x")
+ .await
+ .map(|_| ()),
+ ),
+ ServerCommand::DeleteConsumerGroup(_) => (
+ "DeleteConsumerGroup",
+ client
+ .delete_consumer_group(&ctx.stream_id, &ctx.topic_id,
&ctx.group_id)
+ .await,
+ ),
+ };
+
+ assert_unauthenticated(result, name);
+ }
+}
+
+fn assert_unauthenticated(result: Result<(), IggyError>, name: &str) {
+ match result {
+ Err(e) if e.as_code() == IggyError::FeatureUnavailable.as_code() => {}
+ Err(e) if e.as_code() == IggyError::Unauthenticated.as_code() => {}
+ Err(e) => panic!(
+ "{name}: expected Unauthenticated ({}), got {:?} ({})",
+ IggyError::Unauthenticated.as_code(),
+ e,
+ e.as_code()
+ ),
+ Ok(()) => panic!("{name}: expected Unauthenticated, got Ok"),
+ }
+}
+
+/// Verifies a subset of commands work when authenticated.
+async fn verify_auth_works(client: &IggyClient) {
+ let ctx = TestContext::new();
+
+ // System
+ client.get_stats().await.expect("get_stats");
+ client.get_clients().await.expect("get_clients");
+ client
+ .get_cluster_metadata()
+ .await
+ .expect("get_cluster_metadata");
+
+ // Streams & Topics
+ client.get_streams().await.expect("get_streams");
+ client.get_stream(&ctx.stream_id).await.expect("get_stream");
+ client.get_topics(&ctx.stream_id).await.expect("get_topics");
+ client
+ .get_topic(&ctx.stream_id, &ctx.topic_id)
+ .await
+ .expect("get_topic");
+
+ // Messages
+ let mut msgs = vec![
+ IggyMessage::builder()
+ .payload(Bytes::from("test"))
+ .build()
+ .unwrap(),
+ ];
+ client
+ .send_messages(
+ &ctx.stream_id,
+ &ctx.topic_id,
+ &Partitioning::partition_id(0),
+ &mut msgs,
+ )
+ .await
+ .expect("send_messages");
+
+ client
+ .poll_messages(
+ &ctx.stream_id,
+ &ctx.topic_id,
+ Some(0),
+ &ctx.consumer,
+ &PollingStrategy::offset(0),
+ 10,
+ false,
+ )
+ .await
+ .expect("poll_messages");
+
+ // Users & PATs
+ client.get_users().await.expect("get_users");
+ client.get_personal_access_tokens().await.expect("get_pats");
+}
+
+async fn setup_test_resources(client: &IggyClient) {
+ client
+ .create_stream(STREAM_NAME)
+ .await
+ .expect("create stream");
+ client
+ .create_topic(
+ &Identifier::named(STREAM_NAME).unwrap(),
+ TOPIC_NAME,
+ 1,
+ CompressionAlgorithm::None,
+ None,
+ IggyExpiry::NeverExpire,
+ MaxTopicSize::ServerDefault,
+ )
+ .await
+ .expect("create topic");
+}
+
+async fn cleanup_test_resources(client: &IggyClient) {
+ let _ = client.delete_personal_access_token("auth-test-pat").await;
+ client
+ .delete_stream(&Identifier::named(STREAM_NAME).unwrap())
+ .await
+ .expect("delete stream");
+}
diff --git a/core/integration/tests/server/scenarios/mod.rs
b/core/integration/tests/server/scenarios/mod.rs
index 57ac2ddaa..1676cefa3 100644
--- a/core/integration/tests/server/scenarios/mod.rs
+++ b/core/integration/tests/server/scenarios/mod.rs
@@ -16,6 +16,7 @@
* under the License.
*/
+pub mod authentication_scenario;
pub mod bench_scenario;
pub mod concurrent_scenario;
pub mod consumer_group_auto_commit_reconnection_scenario;
diff --git a/core/sdk/src/http/http_client.rs b/core/sdk/src/http/http_client.rs
index 05e63fab1..a6ddcaa65 100644
--- a/core/sdk/src/http/http_client.rs
+++ b/core/sdk/src/http/http_client.rs
@@ -38,7 +38,6 @@ const PUBLIC_PATHS: &[&str] = &[
"/",
"/metrics",
"/ping",
- "/stats",
"/users/login",
"/users/refresh-token",
"/personal-access-tokens/login",
diff --git a/core/server/Cargo.toml b/core/server/Cargo.toml
index 3e3bb5dc4..fd72144be 100644
--- a/core/server/Cargo.toml
+++ b/core/server/Cargo.toml
@@ -17,7 +17,7 @@
[package]
name = "server"
-version = "0.6.1-edge.3"
+version = "0.6.1-edge.4"
edition = "2024"
license = "Apache-2.0"
diff --git a/core/server/server.http b/core/server/server.http
index dd52429e4..02a7d2d13 100644
--- a/core/server/server.http
+++ b/core/server/server.http
@@ -42,11 +42,29 @@ GET {{url}}
###
GET {{url}}/ping
+###
+POST {{url}}/users/login
+Content-Type: application/json
+
+{
+ "username": "{{root_username}}",
+ "password": "{{root_password}}"
+}
+
+###
+POST {{url}}/personal-access-tokens/login
+Content-Type: application/json
+
+{
+ "token": "{{pat_raw_token}}"
+}
+
###
GET {{url}}/metrics
###
GET {{url}}/stats
+Authorization: Bearer {{access_token}}
###
GET {{url}}/cluster/metadata
@@ -60,16 +78,6 @@ Authorization: Bearer {{access_token}}
GET {{url}}/clients/{{client_id}}
Authorization: Bearer {{access_token}}
-
-###
-POST {{url}}/users/login
-Content-Type: application/json
-
-{
- "username": "{{root_username}}",
- "password": "{{root_password}}"
-}
-
###
POST {{url}}/users/refresh-token
Content-Type: application/json
@@ -183,12 +191,7 @@ Content-Type: application/json
}
###
-POST {{url}}/personal-access-tokens/login
-Content-Type: application/json
-{
- "token": "{{pat_raw_token}}"
-}
###
DELETE {{url}}/personal-access-tokens/{{pat_name}}
diff --git a/core/server/src/binary/command.rs
b/core/server/src/binary/command.rs
index e10fe6ab8..78a2b4417 100644
--- a/core/server/src/binary/command.rs
+++ b/core/server/src/binary/command.rs
@@ -68,7 +68,7 @@ use iggy_common::update_topic::UpdateTopic;
use iggy_common::update_user::UpdateUser;
use iggy_common::*;
use std::rc::Rc;
-use strum::EnumString;
+use strum::{EnumIter, EnumString};
use tracing::error;
define_server_command_enum! {
diff --git
a/core/server/src/binary/handlers/cluster/get_cluster_metadata_handler.rs
b/core/server/src/binary/handlers/cluster/get_cluster_metadata_handler.rs
index af6c7a9a0..501c697ba 100644
--- a/core/server/src/binary/handlers/cluster/get_cluster_metadata_handler.rs
+++ b/core/server/src/binary/handlers/cluster/get_cluster_metadata_handler.rs
@@ -44,6 +44,7 @@ impl ServerCommandHandler for GetClusterMetadata {
shard: &Rc<IggyShard>,
) -> Result<HandlerResult, IggyError> {
debug!("session: {session}, command: {self}");
+ shard.ensure_authenticated(session)?;
let cluster_metadata = shard.get_cluster_metadata(session)?;
diff --git
a/core/server/src/binary/handlers/consumer_groups/create_consumer_group_handler.rs
b/core/server/src/binary/handlers/consumer_groups/create_consumer_group_handler.rs
index 192d867f5..fe1350a9f 100644
---
a/core/server/src/binary/handlers/consumer_groups/create_consumer_group_handler.rs
+++
b/core/server/src/binary/handlers/consumer_groups/create_consumer_group_handler.rs
@@ -49,6 +49,7 @@ impl ServerCommandHandler for CreateConsumerGroup {
shard: &Rc<IggyShard>,
) -> Result<HandlerResult, IggyError> {
debug!("session: {session}, command: {self}");
+ shard.ensure_authenticated(session)?;
let cg = shard.create_consumer_group(
session,
&self.stream_id,
diff --git
a/core/server/src/binary/handlers/consumer_groups/delete_consumer_group_handler.rs
b/core/server/src/binary/handlers/consumer_groups/delete_consumer_group_handler.rs
index 05c3cd9de..07751b591 100644
---
a/core/server/src/binary/handlers/consumer_groups/delete_consumer_group_handler.rs
+++
b/core/server/src/binary/handlers/consumer_groups/delete_consumer_group_handler.rs
@@ -49,6 +49,7 @@ impl ServerCommandHandler for DeleteConsumerGroup {
shard: &Rc<IggyShard>,
) -> Result<HandlerResult, IggyError> {
debug!("session: {session}, command: {self}");
+ shard.ensure_authenticated(session)?;
let cg = shard.delete_consumer_group(session, &self.stream_id,
&self.topic_id, &self.group_id).error(|e: &IggyError| {
format!(
"{COMPONENT} (error: {e}) - failed to delete consumer group
with ID: {} for topic with ID: {} in stream with ID: {} for session: {}",
diff --git
a/core/server/src/binary/handlers/consumer_groups/join_consumer_group_handler.rs
b/core/server/src/binary/handlers/consumer_groups/join_consumer_group_handler.rs
index cc9a8e27b..8aab2dca5 100644
---
a/core/server/src/binary/handlers/consumer_groups/join_consumer_group_handler.rs
+++
b/core/server/src/binary/handlers/consumer_groups/join_consumer_group_handler.rs
@@ -45,6 +45,7 @@ impl ServerCommandHandler for JoinConsumerGroup {
shard: &Rc<IggyShard>,
) -> Result<HandlerResult, IggyError> {
debug!("session: {session}, command: {self}");
+ shard.ensure_authenticated(session)?;
shard
.join_consumer_group(
session,
diff --git
a/core/server/src/binary/handlers/consumer_groups/leave_consumer_group_handler.rs
b/core/server/src/binary/handlers/consumer_groups/leave_consumer_group_handler.rs
index b56d94a48..32f1c4d5a 100644
---
a/core/server/src/binary/handlers/consumer_groups/leave_consumer_group_handler.rs
+++
b/core/server/src/binary/handlers/consumer_groups/leave_consumer_group_handler.rs
@@ -46,6 +46,7 @@ impl ServerCommandHandler for LeaveConsumerGroup {
shard: &Rc<IggyShard>,
) -> Result<HandlerResult, IggyError> {
debug!("session: {session}, command: {self}");
+ shard.ensure_authenticated(session)?;
shard
.leave_consumer_group(
diff --git
a/core/server/src/binary/handlers/consumer_offsets/delete_consumer_offset_handler.rs
b/core/server/src/binary/handlers/consumer_offsets/delete_consumer_offset_handler.rs
index 45f22349d..64d01255e 100644
---
a/core/server/src/binary/handlers/consumer_offsets/delete_consumer_offset_handler.rs
+++
b/core/server/src/binary/handlers/consumer_offsets/delete_consumer_offset_handler.rs
@@ -43,6 +43,7 @@ impl ServerCommandHandler for DeleteConsumerOffset {
shard: &Rc<IggyShard>,
) -> Result<HandlerResult, IggyError> {
debug!("session: {session}, command: {self}");
+ shard.ensure_authenticated(session)?;
shard
.delete_consumer_offset(
session,
diff --git
a/core/server/src/binary/handlers/consumer_offsets/get_consumer_offset_handler.rs
b/core/server/src/binary/handlers/consumer_offsets/get_consumer_offset_handler.rs
index 29ee6f310..7b8991981 100644
---
a/core/server/src/binary/handlers/consumer_offsets/get_consumer_offset_handler.rs
+++
b/core/server/src/binary/handlers/consumer_offsets/get_consumer_offset_handler.rs
@@ -43,6 +43,7 @@ impl ServerCommandHandler for GetConsumerOffset {
shard: &Rc<IggyShard>,
) -> Result<HandlerResult, IggyError> {
debug!("session: {session}, command: {self}");
+ shard.ensure_authenticated(session)?;
let Ok(offset) = shard
.get_consumer_offset(
session,
diff --git
a/core/server/src/binary/handlers/consumer_offsets/store_consumer_offset_handler.rs
b/core/server/src/binary/handlers/consumer_offsets/store_consumer_offset_handler.rs
index dcbb79db3..c603f4cb8 100644
---
a/core/server/src/binary/handlers/consumer_offsets/store_consumer_offset_handler.rs
+++
b/core/server/src/binary/handlers/consumer_offsets/store_consumer_offset_handler.rs
@@ -45,6 +45,7 @@ impl ServerCommandHandler for StoreConsumerOffset {
shard: &Rc<IggyShard>,
) -> Result<HandlerResult, IggyError> {
debug!("session: {session}, command: {self}");
+ shard.ensure_authenticated(session)?;
shard
.store_consumer_offset(
session,
diff --git
a/core/server/src/binary/handlers/messages/flush_unsaved_buffer_handler.rs
b/core/server/src/binary/handlers/messages/flush_unsaved_buffer_handler.rs
index 937a03cb3..008d1c600 100644
--- a/core/server/src/binary/handlers/messages/flush_unsaved_buffer_handler.rs
+++ b/core/server/src/binary/handlers/messages/flush_unsaved_buffer_handler.rs
@@ -43,6 +43,7 @@ impl ServerCommandHandler for FlushUnsavedBuffer {
shard: &Rc<IggyShard>,
) -> Result<HandlerResult, IggyError> {
debug!("session: {session}, command: {self}");
+ shard.ensure_authenticated(session)?;
let user_id = session.get_user_id();
let stream_id = self.stream_id.clone();
diff --git a/core/server/src/binary/handlers/messages/poll_messages_handler.rs
b/core/server/src/binary/handlers/messages/poll_messages_handler.rs
index 0839e3a55..ea8ccbb8c 100644
--- a/core/server/src/binary/handlers/messages/poll_messages_handler.rs
+++ b/core/server/src/binary/handlers/messages/poll_messages_handler.rs
@@ -57,6 +57,7 @@ impl ServerCommandHandler for PollMessages {
shard: &Rc<IggyShard>,
) -> Result<HandlerResult, IggyError> {
debug!("session: {session}, command: {self}");
+ shard.ensure_authenticated(session)?;
let PollMessages {
consumer,
partition_id,
diff --git a/core/server/src/binary/handlers/messages/send_messages_handler.rs
b/core/server/src/binary/handlers/messages/send_messages_handler.rs
index 375848799..6aa6786ac 100644
--- a/core/server/src/binary/handlers/messages/send_messages_handler.rs
+++ b/core/server/src/binary/handlers/messages/send_messages_handler.rs
@@ -53,6 +53,7 @@ impl ServerCommandHandler for SendMessages {
session: &Session,
shard: &Rc<IggyShard>,
) -> Result<HandlerResult, IggyError> {
+ shard.ensure_authenticated(session)?;
let total_payload_size = length as usize - std::mem::size_of::<u32>();
let metadata_len_field_size = std::mem::size_of::<u32>();
diff --git
a/core/server/src/binary/handlers/partitions/create_partitions_handler.rs
b/core/server/src/binary/handlers/partitions/create_partitions_handler.rs
index 5c1930a8a..85b4e3030 100644
--- a/core/server/src/binary/handlers/partitions/create_partitions_handler.rs
+++ b/core/server/src/binary/handlers/partitions/create_partitions_handler.rs
@@ -49,6 +49,7 @@ impl ServerCommandHandler for CreatePartitions {
shard: &Rc<IggyShard>,
) -> Result<HandlerResult, IggyError> {
debug!("session: {session}, command: {self}");
+ shard.ensure_authenticated(session)?;
// Acquire partition lock to serialize filesystem operations
let _partition_guard = shard.fs_locks.partition_lock.lock().await;
diff --git
a/core/server/src/binary/handlers/partitions/delete_partitions_handler.rs
b/core/server/src/binary/handlers/partitions/delete_partitions_handler.rs
index 9090ec2c8..f4bed945f 100644
--- a/core/server/src/binary/handlers/partitions/delete_partitions_handler.rs
+++ b/core/server/src/binary/handlers/partitions/delete_partitions_handler.rs
@@ -47,6 +47,7 @@ impl ServerCommandHandler for DeletePartitions {
shard: &Rc<IggyShard>,
) -> Result<HandlerResult, IggyError> {
debug!("session: {session}, command: {self}");
+ shard.ensure_authenticated(session)?;
let stream_id = self.stream_id.clone();
let topic_id = self.topic_id.clone();
diff --git
a/core/server/src/binary/handlers/personal_access_tokens/create_personal_access_token_handler.rs
b/core/server/src/binary/handlers/personal_access_tokens/create_personal_access_token_handler.rs
index 50b896fb0..314f1821d 100644
---
a/core/server/src/binary/handlers/personal_access_tokens/create_personal_access_token_handler.rs
+++
b/core/server/src/binary/handlers/personal_access_tokens/create_personal_access_token_handler.rs
@@ -49,6 +49,7 @@ impl ServerCommandHandler for CreatePersonalAccessToken {
shard: &Rc<IggyShard>,
) -> Result<HandlerResult, IggyError> {
debug!("session: {session}, command: {self}");
+ shard.ensure_authenticated(session)?;
let (personal_access_token, token) = shard
.create_personal_access_token(session, &self.name, self.expiry)
diff --git
a/core/server/src/binary/handlers/personal_access_tokens/delete_personal_access_token_handler.rs
b/core/server/src/binary/handlers/personal_access_tokens/delete_personal_access_token_handler.rs
index bf89c70ca..edb1e8e9a 100644
---
a/core/server/src/binary/handlers/personal_access_tokens/delete_personal_access_token_handler.rs
+++
b/core/server/src/binary/handlers/personal_access_tokens/delete_personal_access_token_handler.rs
@@ -45,6 +45,7 @@ impl ServerCommandHandler for DeletePersonalAccessToken {
shard: &Rc<IggyShard>,
) -> Result<HandlerResult, IggyError> {
debug!("session: {session}, command: {self}");
+ shard.ensure_authenticated(session)?;
let token_name = self.name.clone();
shard
diff --git
a/core/server/src/binary/handlers/personal_access_tokens/get_personal_access_tokens_handler.rs
b/core/server/src/binary/handlers/personal_access_tokens/get_personal_access_tokens_handler.rs
index 9e2923868..2f020c983 100644
---
a/core/server/src/binary/handlers/personal_access_tokens/get_personal_access_tokens_handler.rs
+++
b/core/server/src/binary/handlers/personal_access_tokens/get_personal_access_tokens_handler.rs
@@ -44,6 +44,7 @@ impl ServerCommandHandler for GetPersonalAccessTokens {
shard: &Rc<IggyShard>,
) -> Result<HandlerResult, IggyError> {
debug!("session: {session}, command: {self}");
+ shard.ensure_authenticated(session)?;
let personal_access_tokens = shard
.get_personal_access_tokens(session)
.error(|e: &IggyError| {
diff --git a/core/server/src/binary/handlers/streams/create_stream_handler.rs
b/core/server/src/binary/handlers/streams/create_stream_handler.rs
index c2935ef6a..4d9e85d5d 100644
--- a/core/server/src/binary/handlers/streams/create_stream_handler.rs
+++ b/core/server/src/binary/handlers/streams/create_stream_handler.rs
@@ -54,6 +54,7 @@ impl ServerCommandHandler for CreateStream {
shard: &Rc<IggyShard>,
) -> Result<HandlerResult, IggyError> {
debug!("session: {session}, command: {self}");
+ shard.ensure_authenticated(session)?;
let request = ShardRequest {
stream_id: Identifier::default(),
diff --git a/core/server/src/binary/handlers/streams/delete_stream_handler.rs
b/core/server/src/binary/handlers/streams/delete_stream_handler.rs
index d1e23cb04..addc25af3 100644
--- a/core/server/src/binary/handlers/streams/delete_stream_handler.rs
+++ b/core/server/src/binary/handlers/streams/delete_stream_handler.rs
@@ -52,6 +52,7 @@ impl ServerCommandHandler for DeleteStream {
shard: &Rc<IggyShard>,
) -> Result<HandlerResult, IggyError> {
debug!("session: {session}, command: {self}");
+ shard.ensure_authenticated(session)?;
let stream_id = self.stream_id.clone();
let request = ShardRequest {
stream_id: Identifier::default(),
diff --git a/core/server/src/binary/handlers/streams/purge_stream_handler.rs
b/core/server/src/binary/handlers/streams/purge_stream_handler.rs
index bfa189613..c688ecd96 100644
--- a/core/server/src/binary/handlers/streams/purge_stream_handler.rs
+++ b/core/server/src/binary/handlers/streams/purge_stream_handler.rs
@@ -47,6 +47,7 @@ impl ServerCommandHandler for PurgeStream {
shard: &Rc<IggyShard>,
) -> Result<HandlerResult, IggyError> {
debug!("session: {session}, command: {self}");
+ shard.ensure_authenticated(session)?;
let stream_id = self.stream_id.clone();
shard
diff --git a/core/server/src/binary/handlers/streams/update_stream_handler.rs
b/core/server/src/binary/handlers/streams/update_stream_handler.rs
index 8e4de6574..3986f589c 100644
--- a/core/server/src/binary/handlers/streams/update_stream_handler.rs
+++ b/core/server/src/binary/handlers/streams/update_stream_handler.rs
@@ -47,6 +47,7 @@ impl ServerCommandHandler for UpdateStream {
shard: &Rc<IggyShard>,
) -> Result<HandlerResult, IggyError> {
debug!("session: {session}, command: {self}");
+ shard.ensure_authenticated(session)?;
let stream_id = self.stream_id.clone();
shard
.update_stream(session, &self.stream_id, self.name.clone())
diff --git a/core/server/src/binary/handlers/system/get_client_handler.rs
b/core/server/src/binary/handlers/system/get_client_handler.rs
index b9e16aef3..b62cd076e 100644
--- a/core/server/src/binary/handlers/system/get_client_handler.rs
+++ b/core/server/src/binary/handlers/system/get_client_handler.rs
@@ -42,6 +42,7 @@ impl ServerCommandHandler for GetClient {
shard: &Rc<IggyShard>,
) -> Result<HandlerResult, IggyError> {
debug!("session: {session}, command: {self}");
+ shard.ensure_authenticated(session)?;
let Ok(client) = shard.get_client(session, self.client_id) else {
sender.send_empty_ok_response().await?;
diff --git a/core/server/src/binary/handlers/system/get_clients_handler.rs
b/core/server/src/binary/handlers/system/get_clients_handler.rs
index 81ac21b5f..5e365fa8b 100644
--- a/core/server/src/binary/handlers/system/get_clients_handler.rs
+++ b/core/server/src/binary/handlers/system/get_clients_handler.rs
@@ -44,6 +44,7 @@ impl ServerCommandHandler for GetClients {
shard: &Rc<IggyShard>,
) -> Result<HandlerResult, IggyError> {
debug!("session: {session}, command: {self}");
+ shard.ensure_authenticated(session)?;
let clients = shard.get_clients(session).error(|e: &IggyError| {
format!("{COMPONENT} (error: {e}) - failed to get clients,
session: {session}")
diff --git a/core/server/src/binary/handlers/system/get_me_handler.rs
b/core/server/src/binary/handlers/system/get_me_handler.rs
index 339063ab3..6f9209bb1 100644
--- a/core/server/src/binary/handlers/system/get_me_handler.rs
+++ b/core/server/src/binary/handlers/system/get_me_handler.rs
@@ -42,6 +42,7 @@ impl ServerCommandHandler for GetMe {
session: &Session,
shard: &Rc<IggyShard>,
) -> Result<HandlerResult, IggyError> {
+ shard.ensure_authenticated(session)?;
let Some(client) = shard.get_client(session, session.client_id).error(
|e: &IggyError| {
format!(
diff --git a/core/server/src/binary/handlers/system/get_snapshot.rs
b/core/server/src/binary/handlers/system/get_snapshot_handler.rs
similarity index 97%
rename from core/server/src/binary/handlers/system/get_snapshot.rs
rename to core/server/src/binary/handlers/system/get_snapshot_handler.rs
index cc2163b91..809f8fd60 100644
--- a/core/server/src/binary/handlers/system/get_snapshot.rs
+++ b/core/server/src/binary/handlers/system/get_snapshot_handler.rs
@@ -42,6 +42,7 @@ impl ServerCommandHandler for GetSnapshot {
shard: &Rc<IggyShard>,
) -> Result<HandlerResult, IggyError> {
debug!("session: {session}, command: {self}");
+ shard.ensure_authenticated(session)?;
let snapshot = shard
.get_snapshot(session, self.compression, &self.snapshot_types)
diff --git a/core/server/src/binary/handlers/system/get_stats_handler.rs
b/core/server/src/binary/handlers/system/get_stats_handler.rs
index 2fe368ce1..ea787bea6 100644
--- a/core/server/src/binary/handlers/system/get_stats_handler.rs
+++ b/core/server/src/binary/handlers/system/get_stats_handler.rs
@@ -48,6 +48,7 @@ impl ServerCommandHandler for GetStats {
shard: &Rc<IggyShard>,
) -> Result<HandlerResult, IggyError> {
debug!("session: {session}, command: {self}");
+ shard.ensure_authenticated(session)?;
// Route GetStats to shard0 only
let request = ShardRequest {
diff --git a/core/server/src/binary/handlers/system/mod.rs
b/core/server/src/binary/handlers/system/mod.rs
index ae177f250..28398754a 100644
--- a/core/server/src/binary/handlers/system/mod.rs
+++ b/core/server/src/binary/handlers/system/mod.rs
@@ -19,7 +19,7 @@
pub mod get_client_handler;
pub mod get_clients_handler;
pub mod get_me_handler;
-pub mod get_snapshot;
+pub mod get_snapshot_handler;
pub mod get_stats_handler;
pub mod ping_handler;
diff --git a/core/server/src/binary/handlers/topics/create_topic_handler.rs
b/core/server/src/binary/handlers/topics/create_topic_handler.rs
index fa5c0270a..95cbbe4c5 100644
--- a/core/server/src/binary/handlers/topics/create_topic_handler.rs
+++ b/core/server/src/binary/handlers/topics/create_topic_handler.rs
@@ -55,6 +55,7 @@ impl ServerCommandHandler for CreateTopic {
shard: &Rc<IggyShard>,
) -> Result<HandlerResult, IggyError> {
debug!("session: {session}, command: {self}");
+ shard.ensure_authenticated(session)?;
let request = ShardRequest {
stream_id: Identifier::default(),
diff --git a/core/server/src/binary/handlers/topics/delete_topic_handler.rs
b/core/server/src/binary/handlers/topics/delete_topic_handler.rs
index 668dbd707..85db4d075 100644
--- a/core/server/src/binary/handlers/topics/delete_topic_handler.rs
+++ b/core/server/src/binary/handlers/topics/delete_topic_handler.rs
@@ -54,6 +54,7 @@ impl ServerCommandHandler for DeleteTopic {
shard: &Rc<IggyShard>,
) -> Result<HandlerResult, IggyError> {
debug!("session: {session}, command: {self}");
+ shard.ensure_authenticated(session)?;
let request = ShardRequest {
stream_id: Identifier::default(),
diff --git a/core/server/src/binary/handlers/topics/purge_topic_handler.rs
b/core/server/src/binary/handlers/topics/purge_topic_handler.rs
index 6e86293ae..012795da6 100644
--- a/core/server/src/binary/handlers/topics/purge_topic_handler.rs
+++ b/core/server/src/binary/handlers/topics/purge_topic_handler.rs
@@ -45,6 +45,7 @@ impl ServerCommandHandler for PurgeTopic {
shard: &Rc<IggyShard>,
) -> Result<HandlerResult, IggyError> {
debug!("session: {session}, command: {self}");
+ shard.ensure_authenticated(session)?;
let topic_id = self.topic_id.clone();
let stream_id = self.stream_id.clone();
diff --git a/core/server/src/binary/handlers/topics/update_topic_handler.rs
b/core/server/src/binary/handlers/topics/update_topic_handler.rs
index 90f5e11ec..aa8912fa7 100644
--- a/core/server/src/binary/handlers/topics/update_topic_handler.rs
+++ b/core/server/src/binary/handlers/topics/update_topic_handler.rs
@@ -52,6 +52,7 @@ impl ServerCommandHandler for UpdateTopic {
shard: &Rc<IggyShard>,
) -> Result<HandlerResult, IggyError> {
debug!("session: {session}, command: {self}");
+ shard.ensure_authenticated(session)?;
let request = ShardRequest {
stream_id: Identifier::default(),
diff --git a/core/server/src/binary/handlers/users/change_password_handler.rs
b/core/server/src/binary/handlers/users/change_password_handler.rs
index 0dadd98bb..9b11969cc 100644
--- a/core/server/src/binary/handlers/users/change_password_handler.rs
+++ b/core/server/src/binary/handlers/users/change_password_handler.rs
@@ -49,6 +49,7 @@ impl ServerCommandHandler for ChangePassword {
shard: &Rc<IggyShard>,
) -> Result<HandlerResult, IggyError> {
debug!("session: {session}, command: {self}");
+ shard.ensure_authenticated(session)?;
info!("Changing password for user with ID: {}...", self.user_id);
shard
diff --git a/core/server/src/binary/handlers/users/create_user_handler.rs
b/core/server/src/binary/handlers/users/create_user_handler.rs
index 3cace71b3..57501efc0 100644
--- a/core/server/src/binary/handlers/users/create_user_handler.rs
+++ b/core/server/src/binary/handlers/users/create_user_handler.rs
@@ -54,6 +54,7 @@ impl ServerCommandHandler for CreateUser {
shard: &Rc<IggyShard>,
) -> Result<HandlerResult, IggyError> {
debug!("session: {session}, command: {self}");
+ shard.ensure_authenticated(session)?;
let request = ShardRequest {
stream_id: Identifier::default(),
diff --git a/core/server/src/binary/handlers/users/delete_user_handler.rs
b/core/server/src/binary/handlers/users/delete_user_handler.rs
index a7fa3ec1b..0c9151f91 100644
--- a/core/server/src/binary/handlers/users/delete_user_handler.rs
+++ b/core/server/src/binary/handlers/users/delete_user_handler.rs
@@ -53,6 +53,7 @@ impl ServerCommandHandler for DeleteUser {
shard: &Rc<IggyShard>,
) -> Result<HandlerResult, IggyError> {
debug!("session: {session}, command: {self}");
+ shard.ensure_authenticated(session)?;
let request = ShardRequest {
stream_id: Identifier::default(),
diff --git a/core/server/src/binary/handlers/users/get_user_handler.rs
b/core/server/src/binary/handlers/users/get_user_handler.rs
index 97322a340..4a60cdda4 100644
--- a/core/server/src/binary/handlers/users/get_user_handler.rs
+++ b/core/server/src/binary/handlers/users/get_user_handler.rs
@@ -43,6 +43,7 @@ impl ServerCommandHandler for GetUser {
shard: &Rc<IggyShard>,
) -> Result<HandlerResult, IggyError> {
debug!("session: {session}, command: {self}");
+ shard.ensure_authenticated(session)?;
let Ok(user) = shard.find_user(session, &self.user_id) else {
sender.send_empty_ok_response().await?;
return Ok(HandlerResult::Finished);
diff --git a/core/server/src/binary/handlers/users/get_users_handler.rs
b/core/server/src/binary/handlers/users/get_users_handler.rs
index bca683a0a..b0ff5e76e 100644
--- a/core/server/src/binary/handlers/users/get_users_handler.rs
+++ b/core/server/src/binary/handlers/users/get_users_handler.rs
@@ -45,6 +45,7 @@ impl ServerCommandHandler for GetUsers {
shard: &Rc<IggyShard>,
) -> Result<HandlerResult, IggyError> {
debug!("session: {session}, command: {self}");
+ shard.ensure_authenticated(session)?;
let users = shard.get_users(session).await.error(|e: &IggyError| {
format!("{COMPONENT} (error: {e}) - failed to get users, session:
{session}")
})?;
diff --git a/core/server/src/binary/handlers/users/logout_user_handler.rs
b/core/server/src/binary/handlers/users/logout_user_handler.rs
index 714815173..f09a20b36 100644
--- a/core/server/src/binary/handlers/users/logout_user_handler.rs
+++ b/core/server/src/binary/handlers/users/logout_user_handler.rs
@@ -47,6 +47,7 @@ impl ServerCommandHandler for LogoutUser {
shard: &Rc<IggyShard>,
) -> Result<HandlerResult, IggyError> {
debug!("session: {session}, command: {self}");
+ shard.ensure_authenticated(session)?;
info!("Logging out user with ID: {}...", session.get_user_id());
shard.logout_user(session).error(|e: &IggyError| {
format!("{COMPONENT} (error: {e}) - failed to logout user,
session: {session}")
diff --git
a/core/server/src/binary/handlers/users/update_permissions_handler.rs
b/core/server/src/binary/handlers/users/update_permissions_handler.rs
index 30829a840..5e46b8a9b 100644
--- a/core/server/src/binary/handlers/users/update_permissions_handler.rs
+++ b/core/server/src/binary/handlers/users/update_permissions_handler.rs
@@ -49,6 +49,7 @@ impl ServerCommandHandler for UpdatePermissions {
shard: &Rc<IggyShard>,
) -> Result<HandlerResult, IggyError> {
debug!("session: {session}, command: {self}");
+ shard.ensure_authenticated(session)?;
shard
.update_permissions(session, &self.user_id,
self.permissions.clone())
diff --git a/core/server/src/binary/handlers/users/update_user_handler.rs
b/core/server/src/binary/handlers/users/update_user_handler.rs
index bdb929136..433e024af 100644
--- a/core/server/src/binary/handlers/users/update_user_handler.rs
+++ b/core/server/src/binary/handlers/users/update_user_handler.rs
@@ -49,6 +49,7 @@ impl ServerCommandHandler for UpdateUser {
shard: &Rc<IggyShard>,
) -> Result<HandlerResult, IggyError> {
debug!("session: {session}, command: {self}");
+ shard.ensure_authenticated(session)?;
let user =shard
.update_user(
diff --git a/core/server/src/binary/macros.rs b/core/server/src/binary/macros.rs
index a56c0bee0..f69632c4c 100644
--- a/core/server/src/binary/macros.rs
+++ b/core/server/src/binary/macros.rs
@@ -33,7 +33,7 @@ macro_rules! define_server_command_enum {
);* $(;)?
) => {
#[enum_dispatch(ServerCommandHandler)]
- #[derive(Debug, PartialEq, EnumString)]
+ #[derive(Debug, PartialEq, EnumString, EnumIter)]
pub enum ServerCommand {
$(
$variant($ty),
diff --git a/core/server/src/http/consumer_groups.rs
b/core/server/src/http/consumer_groups.rs
index 5faf48adb..6f66de799 100644
--- a/core/server/src/http/consumer_groups.rs
+++ b/core/server/src/http/consumer_groups.rs
@@ -65,8 +65,6 @@ async fn get_consumer_group(
let session = Session::stateless(identity.user_id, identity.ip_address);
- // Check permissions and existence
- state.shard.shard().ensure_authenticated(&session)?;
let exists = state
.shard
.shard()
@@ -117,8 +115,6 @@ async fn get_consumer_groups(
let session = Session::stateless(identity.user_id, identity.ip_address);
- // Check permissions and existence
- state.shard.shard().ensure_authenticated(&session)?;
state
.shard
.shard()
diff --git a/core/server/src/http/jwt/middleware.rs
b/core/server/src/http/jwt/middleware.rs
index 591d3eea5..910e02ccb 100644
--- a/core/server/src/http/jwt/middleware.rs
+++ b/core/server/src/http/jwt/middleware.rs
@@ -38,7 +38,6 @@ const PUBLIC_PATHS: &[&str] = &[
"/",
"/metrics",
"/ping",
- "/stats",
"/users/login",
"/users/refresh-token",
"/personal-access-tokens/login",
diff --git a/core/server/src/http/streams.rs b/core/server/src/http/streams.rs
index 9054777ea..d96fd4e1e 100644
--- a/core/server/src/http/streams.rs
+++ b/core/server/src/http/streams.rs
@@ -55,6 +55,7 @@ pub fn router(state: Arc<AppState>) -> Router {
#[debug_handler]
async fn get_stream(
State(state): State<Arc<AppState>>,
+ Extension(identity): Extension<Identity>,
Path(stream_id): Path<String>,
) -> Result<Json<StreamDetails>, CustomError> {
let stream_id = Identifier::from_str_value(&stream_id)?;
@@ -63,6 +64,8 @@ async fn get_stream(
return Err(CustomError::ResourceNotFound);
}
+ let _session = Session::stateless(identity.user_id, identity.ip_address);
+
// Use direct slab access for thread-safe stream retrieval
let stream_details = SendWrapper::new(|| {
state
@@ -78,7 +81,12 @@ async fn get_stream(
}
#[debug_handler]
-async fn get_streams(State(state): State<Arc<AppState>>) ->
Result<Json<Vec<Stream>>, CustomError> {
+async fn get_streams(
+ State(state): State<Arc<AppState>>,
+ Extension(identity): Extension<Identity>,
+) -> Result<Json<Vec<Stream>>, CustomError> {
+ let _session = Session::stateless(identity.user_id, identity.ip_address);
+
// Use direct slab access for thread-safe streams retrieval
let streams = SendWrapper::new(|| {
state.shard.shard().streams.with_components(|stream_ref| {
diff --git a/core/server/src/http/system.rs b/core/server/src/http/system.rs
index 42ae94bbf..af442a922 100644
--- a/core/server/src/http/system.rs
+++ b/core/server/src/http/system.rs
@@ -65,11 +65,18 @@ async fn get_metrics(State(state): State<Arc<AppState>>) ->
Result<String, Custo
}
#[debug_handler]
-async fn get_stats(State(state): State<Arc<AppState>>) -> Result<Json<Stats>,
CustomError> {
+async fn get_stats(
+ State(state): State<Arc<AppState>>,
+ Extension(identity): Extension<Identity>,
+) -> Result<Json<Stats>, CustomError> {
+ let _session = Session::stateless(identity.user_id, identity.ip_address);
let stats_future = SendWrapper::new(state.shard.shard().get_stats());
- let stats = stats_future
- .await
- .error(|e: &IggyError| format!("{COMPONENT} (error: {e}) - failed to
get stats"))?;
+ let stats = stats_future.await.error(|e: &IggyError| {
+ format!(
+ "{COMPONENT} (error: {e}) - failed to get stats, user ID: {}",
+ identity.user_id
+ )
+ })?;
Ok(Json(stats))
}
diff --git a/core/server/src/http/topics.rs b/core/server/src/http/topics.rs
index 8341c2cfa..cc686e69f 100644
--- a/core/server/src/http/topics.rs
+++ b/core/server/src/http/topics.rs
@@ -69,8 +69,6 @@ async fn get_topic(
let session = Session::stateless(identity.user_id, identity.ip_address);
- // Check permissions and stream existence
- state.shard.shard().ensure_authenticated(&session)?;
let stream_exists = state
.shard
.shard()
@@ -130,8 +128,6 @@ async fn get_topics(
let stream_id = Identifier::from_str_value(&stream_id)?;
let session = Session::stateless(identity.user_id, identity.ip_address);
- // Check permissions and stream existence
- state.shard.shard().ensure_authenticated(&session)?;
state.shard.shard().ensure_stream_exists(&stream_id)?;
let numeric_stream_id = state
diff --git a/core/server/src/shard/system/clients.rs
b/core/server/src/shard/system/clients.rs
index 2259e5b11..fd01862a6 100644
--- a/core/server/src/shard/system/clients.rs
+++ b/core/server/src/shard/system/clients.rs
@@ -73,7 +73,6 @@ impl IggyShard {
session: &Session,
client_id: u32,
) -> Result<Option<Client>, IggyError> {
- self.ensure_authenticated(session)?;
self.permissioner
.borrow()
.get_client(session.get_user_id())
@@ -88,7 +87,6 @@ impl IggyShard {
}
pub fn get_clients(&self, session: &Session) -> Result<Vec<Client>,
IggyError> {
- self.ensure_authenticated(session)?;
self.permissioner
.borrow()
.get_clients(session.get_user_id())
diff --git a/core/server/src/shard/system/consumer_groups.rs
b/core/server/src/shard/system/consumer_groups.rs
index 1f9610a22..7f04e5f6e 100644
--- a/core/server/src/shard/system/consumer_groups.rs
+++ b/core/server/src/shard/system/consumer_groups.rs
@@ -41,7 +41,6 @@ impl IggyShard {
topic_id: &Identifier,
name: String,
) -> Result<consumer_group::ConsumerGroup, IggyError> {
- self.ensure_authenticated(session)?;
self.ensure_topic_exists(stream_id, topic_id)?;
let exists = self
.streams
@@ -115,7 +114,6 @@ impl IggyShard {
topic_id: &Identifier,
group_id: &Identifier,
) -> Result<consumer_group::ConsumerGroup, IggyError> {
- self.ensure_authenticated(session)?;
self.ensure_consumer_group_exists(stream_id, topic_id, group_id)?;
{
let topic_id =
@@ -183,7 +181,6 @@ impl IggyShard {
topic_id: &Identifier,
group_id: &Identifier,
) -> Result<(), IggyError> {
- self.ensure_authenticated(session)?;
self.ensure_consumer_group_exists(stream_id, topic_id, group_id)?;
{
let topic_id =
@@ -242,7 +239,6 @@ impl IggyShard {
topic_id: &Identifier,
group_id: &Identifier,
) -> Result<(), IggyError> {
- self.ensure_authenticated(session)?;
self.ensure_consumer_group_exists(stream_id, topic_id, group_id)?;
{
let topic_id =
diff --git a/core/server/src/shard/system/consumer_offsets.rs
b/core/server/src/shard/system/consumer_offsets.rs
index 1777675a1..da85b75ea 100644
--- a/core/server/src/shard/system/consumer_offsets.rs
+++ b/core/server/src/shard/system/consumer_offsets.rs
@@ -40,7 +40,6 @@ impl IggyShard {
partition_id: Option<u32>,
offset: u64,
) -> Result<(PollingConsumer, usize), IggyError> {
- self.ensure_authenticated(session)?;
self.ensure_topic_exists(stream_id, topic_id)?;
{
let topic_id =
@@ -93,7 +92,6 @@ impl IggyShard {
topic_id: &Identifier,
partition_id: Option<u32>,
) -> Result<Option<ConsumerOffsetInfo>, IggyError> {
- self.ensure_authenticated(session)?;
self.ensure_topic_exists(stream_id, topic_id)?;
{
let topic_id =
@@ -153,7 +151,6 @@ impl IggyShard {
topic_id: &Identifier,
partition_id: Option<u32>,
) -> Result<(PollingConsumer, usize), IggyError> {
- self.ensure_authenticated(session)?;
self.ensure_topic_exists(stream_id, topic_id)?;
{
let topic_id =
diff --git a/core/server/src/shard/system/partitions.rs
b/core/server/src/shard/system/partitions.rs
index aacbd52c0..7607fd804 100644
--- a/core/server/src/shard/system/partitions.rs
+++ b/core/server/src/shard/system/partitions.rs
@@ -68,7 +68,6 @@ impl IggyShard {
topic_id: &Identifier,
partitions_count: u32,
) -> Result<Vec<partition::Partition>, IggyError> {
- self.ensure_authenticated(session)?;
self.ensure_topic_exists(stream_id, topic_id)?;
let numeric_stream_id = self
.streams
@@ -228,7 +227,6 @@ impl IggyShard {
topic_id: &Identifier,
partitions_count: u32,
) -> Result<Vec<usize>, IggyError> {
- self.ensure_authenticated(session)?;
self.ensure_partitions_exist(stream_id, topic_id, partitions_count)?;
let numeric_stream_id = self
diff --git a/core/server/src/shard/system/personal_access_tokens.rs
b/core/server/src/shard/system/personal_access_tokens.rs
index 7fe8dbbb3..6966193e4 100644
--- a/core/server/src/shard/system/personal_access_tokens.rs
+++ b/core/server/src/shard/system/personal_access_tokens.rs
@@ -32,7 +32,6 @@ impl IggyShard {
&self,
session: &Session,
) -> Result<Vec<PersonalAccessToken>, IggyError> {
- self.ensure_authenticated(session)?;
let user_id = session.get_user_id();
let user = self.get_user(&user_id.try_into()?).error(|e: &IggyError| {
format!("{COMPONENT} (error: {e}) - failed to get user with id:
{user_id}")
@@ -57,7 +56,6 @@ impl IggyShard {
name: &str,
expiry: IggyExpiry,
) -> Result<(PersonalAccessToken, String), IggyError> {
- self.ensure_authenticated(session)?;
let user_id = session.get_user_id();
let identifier = user_id.try_into()?;
{
@@ -131,7 +129,6 @@ impl IggyShard {
session: &Session,
name: &str,
) -> Result<(), IggyError> {
- self.ensure_authenticated(session)?;
let user_id = session.get_user_id();
self.delete_personal_access_token_base(user_id, name)
}
diff --git a/core/server/src/shard/system/snapshot/mod.rs
b/core/server/src/shard/system/snapshot/mod.rs
index 35d94ef00..d157da8ab 100644
--- a/core/server/src/shard/system/snapshot/mod.rs
+++ b/core/server/src/shard/system/snapshot/mod.rs
@@ -43,12 +43,10 @@ use std::process::Command;
impl IggyShard {
pub async fn get_snapshot(
&self,
- session: &Session,
+ _session: &Session,
compression: SnapshotCompression,
snapshot_types: &Vec<SystemSnapshotType>,
) -> Result<Snapshot, IggyError> {
- self.ensure_authenticated(session)?;
-
let snapshot_types = if
snapshot_types.contains(&SystemSnapshotType::All) {
if snapshot_types.len() > 1 {
error!("When using 'All' snapshot type, no other types can be
specified");
diff --git a/core/server/src/shard/system/streams.rs
b/core/server/src/shard/system/streams.rs
index 418532542..f64131c76 100644
--- a/core/server/src/shard/system/streams.rs
+++ b/core/server/src/shard/system/streams.rs
@@ -31,7 +31,6 @@ impl IggyShard {
session: &Session,
name: String,
) -> Result<stream::Stream, IggyError> {
- self.ensure_authenticated(session)?;
self.permissioner
.borrow()
.create_stream(session.get_user_id())?;
@@ -63,7 +62,6 @@ impl IggyShard {
stream_id: &Identifier,
name: String,
) -> Result<(), IggyError> {
- self.ensure_authenticated(session)?;
self.ensure_stream_exists(stream_id)?;
let id = self
.streams
@@ -129,7 +127,6 @@ impl IggyShard {
session: &Session,
id: &Identifier,
) -> Result<stream::Stream, IggyError> {
- self.ensure_authenticated(session)?;
self.ensure_stream_exists(id)?;
let stream_id = self
.streams
@@ -178,7 +175,6 @@ impl IggyShard {
session: &Session,
stream_id: &Identifier,
) -> Result<(), IggyError> {
- self.ensure_authenticated(session)?;
self.ensure_stream_exists(stream_id)?;
{
let get_stream_id =
crate::streaming::streams::helpers::get_stream_id();
diff --git a/core/server/src/shard/system/topics.rs
b/core/server/src/shard/system/topics.rs
index 042a49821..295a7d409 100644
--- a/core/server/src/shard/system/topics.rs
+++ b/core/server/src/shard/system/topics.rs
@@ -40,7 +40,6 @@ impl IggyShard {
max_topic_size: MaxTopicSize,
replication_factor: Option<u8>,
) -> Result<topic::Topic, IggyError> {
- self.ensure_authenticated(session)?;
self.ensure_stream_exists(stream_id)?;
let numeric_stream_id = self.streams.get_index(stream_id);
{
@@ -103,7 +102,6 @@ impl IggyShard {
max_topic_size: MaxTopicSize,
replication_factor: Option<u8>,
) -> Result<(), IggyError> {
- self.ensure_authenticated(session)?;
self.ensure_topic_exists(stream_id, topic_id)?;
{
let topic_id_val =
@@ -202,7 +200,6 @@ impl IggyShard {
stream_id: &Identifier,
topic_id: &Identifier,
) -> Result<topic::Topic, IggyError> {
- self.ensure_authenticated(session)?;
self.ensure_topic_exists(stream_id, topic_id)?;
let numeric_topic_id =
self.streams
@@ -274,7 +271,6 @@ impl IggyShard {
stream_id: &Identifier,
topic_id: &Identifier,
) -> Result<(), IggyError> {
- self.ensure_authenticated(session)?;
self.ensure_topic_exists(stream_id, topic_id)?;
{
let topic_id =
diff --git a/core/server/src/shard/system/users.rs
b/core/server/src/shard/system/users.rs
index 04a416444..31266d496 100644
--- a/core/server/src/shard/system/users.rs
+++ b/core/server/src/shard/system/users.rs
@@ -36,7 +36,6 @@ impl IggyShard {
session: &Session,
user_id: &Identifier,
) -> Result<Option<User>, IggyError> {
- self.ensure_authenticated(session)?;
let Some(user) = self.try_get_user(user_id)? else {
return Ok(None);
};
@@ -63,7 +62,6 @@ impl IggyShard {
}
pub async fn get_users(&self, session: &Session) -> Result<Vec<User>,
IggyError> {
- self.ensure_authenticated(session)?;
self.permissioner
.borrow()
.get_users(session.get_user_id())
@@ -84,7 +82,6 @@ impl IggyShard {
status: UserStatus,
permissions: Option<Permissions>,
) -> Result<User, IggyError> {
- self.ensure_authenticated(session)?;
self.permissioner
.borrow()
.create_user(session.get_user_id())
@@ -148,7 +145,6 @@ impl IggyShard {
}
pub fn delete_user(&self, session: &Session, user_id: &Identifier) ->
Result<User, IggyError> {
- self.ensure_authenticated(session)?;
self.permissioner
.borrow()
.delete_user(session.get_user_id())
@@ -204,7 +200,6 @@ impl IggyShard {
username: Option<String>,
status: Option<UserStatus>,
) -> Result<User, IggyError> {
- self.ensure_authenticated(session)?;
self.permissioner
.borrow()
.update_user(session.get_user_id())
@@ -266,8 +261,6 @@ impl IggyShard {
user_id: &Identifier,
permissions: Option<Permissions>,
) -> Result<(), IggyError> {
- self.ensure_authenticated(session)?;
-
{
self.permissioner
.borrow()
@@ -326,8 +319,6 @@ impl IggyShard {
current_password: &str,
new_password: &str,
) -> Result<(), IggyError> {
- self.ensure_authenticated(session)?;
-
{
let user = self.get_user(user_id).error(|e: &IggyError| {
format!("{COMPONENT} (error: {e}) - failed to get user with
id: {user_id}")
@@ -437,7 +428,6 @@ impl IggyShard {
}
pub fn logout_user(&self, session: &Session) -> Result<(), IggyError> {
- self.ensure_authenticated(session)?;
let client_id = session.client_id;
self.logout_user_base(client_id)?;
Ok(())