This is an automated email from the ASF dual-hosted git repository.
maciej pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new 25dcd7634 refactor(repo): wrap credentials in SecretString to prevent
leaks (#2931)
25dcd7634 is described below
commit 25dcd7634917bba21c170b381485d6d176524f90
Author: Grainier Perera <[email protected]>
AuthorDate: Tue Mar 17 11:40:57 2026 +0530
refactor(repo): wrap credentials in SecretString to prevent leaks (#2931)
Closes #2728
---
Cargo.lock | 14 +++
Cargo.toml | 1 +
core/cli/Cargo.toml | 1 +
.../create_personal_access_token.rs | 7 +-
core/cli/src/commands/binary_system/login.rs | 3 +-
core/cli/src/commands/binary_users/create_user.rs | 18 ++--
core/cli/src/credentials.rs | 43 +++++----
core/common/Cargo.toml | 1 +
.../login_with_personal_access_token.rs | 44 ++++++---
core/common/src/commands/users/change_password.rs | 82 +++++++++++-----
core/common/src/commands/users/create_user.rs | 59 +++++++++---
core/common/src/commands/users/login_user.rs | 65 +++++++++----
core/common/src/lib.rs | 1 +
.../traits/binary_impls/personal_access_tokens.rs | 3 +-
core/common/src/traits/binary_impls/users.rs | 9 +-
core/common/src/traits/binary_mapper.rs | 5 +-
.../types/configuration/auth_config/auto_login.rs | 2 +-
.../configuration/auth_config/connection_string.rs | 44 +++++----
.../types/configuration/auth_config/credentials.rs | 25 ++++-
.../src/types/permissions/personal_access_token.rs | 16 +++-
core/common/src/types/user/user_identity_info.rs | 17 +++-
core/common/src/utils/mod.rs | 1 +
core/common/src/utils/serde_secret.rs | 106 +++++++++++++++++++++
core/connectors/runtime/Cargo.toml | 1 +
core/connectors/runtime/src/api/auth.rs | 5 +-
core/connectors/runtime/src/api/config.rs | 28 ++++--
core/connectors/runtime/src/context.rs | 5 +-
.../connectors/sinks/elasticsearch_sink/Cargo.toml | 1 +
.../connectors/sinks/elasticsearch_sink/src/lib.rs | 7 +-
core/connectors/sinks/mongodb_sink/Cargo.toml | 2 +
core/connectors/sinks/mongodb_sink/src/lib.rs | 10 +-
core/connectors/sinks/postgres_sink/Cargo.toml | 1 +
core/connectors/sinks/postgres_sink/src/lib.rs | 10 +-
.../sources/elasticsearch_source/Cargo.toml | 1 +
.../sources/elasticsearch_source/src/lib.rs | 7 +-
core/connectors/sources/postgres_source/Cargo.toml | 1 +
core/connectors/sources/postgres_source/src/lib.rs | 10 +-
core/integration/Cargo.toml | 1 +
.../test_pat_login_options.rs | 7 +-
.../tests/cli/user/test_user_create_command.rs | 8 +-
core/integration/tests/mcp/mod.rs | 3 +-
.../server/scenarios/authentication_scenario.rs | 3 +-
.../scenarios/cross_protocol_pat_scenario.rs | 5 +-
.../server/scenarios/purge_delete_scenario.rs | 3 +-
.../stale_client_consumer_group_scenario.rs | 3 +-
.../tests/server/scenarios/user_scenario.rs | 9 +-
core/integration/tests/state/file.rs | 7 +-
core/integration/tests/state/system.rs | 10 +-
core/metadata/Cargo.toml | 1 +
core/metadata/src/stm/user.rs | 5 +-
core/sdk/Cargo.toml | 1 +
core/sdk/src/client_provider.rs | 7 +-
core/sdk/src/http/http_client.rs | 3 +-
core/sdk/src/http/personal_access_tokens.rs | 3 +-
core/sdk/src/http/users.rs | 9 +-
core/sdk/src/quic/quic_client.rs | 44 +++++----
core/sdk/src/tcp/tcp_client.rs | 44 +++++----
core/sdk/src/websocket/websocket_client.rs | 6 +-
core/server/Cargo.toml | 1 +
core/server/src/binary/command.rs | 8 +-
.../login_with_personal_access_token_handler.rs | 11 +--
.../binary/handlers/users/login_user_handler.rs | 3 +-
core/server/src/binary/macros.rs | 2 +-
core/server/src/http/mapper.rs | 3 +-
core/server/src/http/personal_access_tokens.rs | 7 +-
core/server/src/http/users.rs | 3 +-
core/server/src/shard/execution.rs | 17 ++--
core/server/src/state/command.rs | 2 +-
core/server/src/state/models.rs | 2 +-
core/server/src/state/system.rs | 37 ++++++-
70 files changed, 673 insertions(+), 261 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index 3afb4cadd..faa885be4 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -5143,6 +5143,7 @@ dependencies = [
"reqwest-retry",
"reqwest-tracing",
"rustls",
+ "secrecy",
"serde",
"tokio",
"tokio-rustls",
@@ -5227,6 +5228,7 @@ dependencies = [
"iggy_common",
"keyring",
"passterm",
+ "secrecy",
"serde",
"serde_json",
"thiserror 2.0.18",
@@ -5272,6 +5274,7 @@ dependencies = [
"reqwest-middleware",
"reqwest-retry",
"reqwest-tracing",
+ "secrecy",
"serde",
"serde_json",
"serde_with",
@@ -5356,6 +5359,7 @@ dependencies = [
"rcgen",
"ring",
"rustls",
+ "secrecy",
"serde",
"serde_json",
"serde_with",
@@ -5381,6 +5385,7 @@ dependencies = [
"iggy_common",
"iggy_connector_sdk",
"once_cell",
+ "secrecy",
"serde",
"serde_json",
"simd-json",
@@ -5399,6 +5404,7 @@ dependencies = [
"iggy_common",
"iggy_connector_sdk",
"once_cell",
+ "secrecy",
"serde",
"serde_json",
"simd-json",
@@ -5431,8 +5437,10 @@ version = "0.3.0"
dependencies = [
"async-trait",
"humantime",
+ "iggy_common",
"iggy_connector_sdk",
"mongodb",
+ "secrecy",
"serde",
"serde_json",
"tokio",
@@ -5450,6 +5458,7 @@ dependencies = [
"iggy_common",
"iggy_connector_sdk",
"once_cell",
+ "secrecy",
"serde",
"serde_json",
"simd-json",
@@ -5470,6 +5479,7 @@ dependencies = [
"iggy_common",
"iggy_connector_sdk",
"once_cell",
+ "secrecy",
"serde",
"serde_json",
"simd-json",
@@ -5767,6 +5777,7 @@ dependencies = [
"reqwest-middleware",
"reqwest-retry",
"rmcp",
+ "secrecy",
"serde",
"serde_json",
"serial_test",
@@ -6572,6 +6583,7 @@ dependencies = [
"message_bus",
"paste",
"rmp-serde",
+ "secrecy",
"serde",
"slab",
"tracing",
@@ -9331,6 +9343,7 @@ version = "0.10.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e891af845473308773346dc847b2c23ee78fe442e0472ac50e22a18a93d3ae5a"
dependencies = [
+ "serde",
"zeroize",
]
@@ -9656,6 +9669,7 @@ dependencies = [
"rust-embed",
"rustls",
"rustls-pemfile",
+ "secrecy",
"send_wrapper",
"serde",
"slab",
diff --git a/Cargo.toml b/Cargo.toml
index b3ea5a2bf..539251436 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -237,6 +237,7 @@ rust-embed = "8.11.0"
rust-s3 = { version = "0.37.1", default-features = false, features =
["tokio-rustls-tls", "tags"] }
rustls = { version = "0.23.37", features = ["ring"] }
rustls-pemfile = "2.2.0"
+secrecy = { version = "0.10", features = ["serde"] }
send_wrapper = "0.6.0"
serde = { version = "1.0.228", features = ["derive", "rc"] }
serde_json = "1.0.149"
diff --git a/core/cli/Cargo.toml b/core/cli/Cargo.toml
index d2bbca358..cbd35a625 100644
--- a/core/cli/Cargo.toml
+++ b/core/cli/Cargo.toml
@@ -58,6 +58,7 @@ iggy = { workspace = true }
iggy_common = { workspace = true }
keyring = { workspace = true, optional = true }
passterm = { workspace = true }
+secrecy = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
thiserror = { workspace = true }
diff --git
a/core/cli/src/commands/binary_personal_access_tokens/create_personal_access_token.rs
b/core/cli/src/commands/binary_personal_access_tokens/create_personal_access_token.rs
index 4fa96f700..bf0aef95f 100644
---
a/core/cli/src/commands/binary_personal_access_tokens/create_personal_access_token.rs
+++
b/core/cli/src/commands/binary_personal_access_tokens/create_personal_access_token.rs
@@ -23,6 +23,7 @@ use iggy_common::Client;
use iggy_common::PersonalAccessTokenExpiry;
use iggy_common::create_personal_access_token::CreatePersonalAccessToken;
use keyring::Entry;
+use secrecy::ExposeSecret;
use tracing::{Level, event};
pub struct CreatePersonalAccessTokenCmd {
@@ -84,7 +85,7 @@ impl CliCommand for CreatePersonalAccessTokenCmd {
if self.store_token {
let server_address = format!("iggy:{}", self.server_address);
let entry = Entry::new(&server_address, &self.create_token.name)?;
- entry.set_password(&token.token)?;
+ entry.set_password(token.token.expose_secret())?;
event!(target: PRINT_TARGET, Level::DEBUG,"Stored token under
service: {} and name: {}", server_address,
self.create_token.name);
event!(target: PRINT_TARGET, Level::INFO,
@@ -96,7 +97,7 @@ impl CliCommand for CreatePersonalAccessTokenCmd {
},
);
} else if self.quiet_mode {
- println!("{}", token.token);
+ println!("{}", token.token.expose_secret());
} else {
event!(target: PRINT_TARGET, Level::INFO,
"Personal access token with name: {} and {} created",
@@ -107,7 +108,7 @@ impl CliCommand for CreatePersonalAccessTokenCmd {
},
);
event!(target: PRINT_TARGET, Level::INFO,"Token: {}",
- token.token);
+ token.token.expose_secret());
}
Ok(())
diff --git a/core/cli/src/commands/binary_system/login.rs
b/core/cli/src/commands/binary_system/login.rs
index 7c99c1db9..efac41632 100644
--- a/core/cli/src/commands/binary_system/login.rs
+++ b/core/cli/src/commands/binary_system/login.rs
@@ -23,6 +23,7 @@ use anyhow::Context;
use async_trait::async_trait;
use iggy_common::Client;
use iggy_common::SEC_IN_MICRO;
+use secrecy::ExposeSecret;
use tracing::{Level, event};
const DEFAULT_LOGIN_SESSION_TIMEOUT: u64 = SEC_IN_MICRO * 15 * 60;
@@ -94,7 +95,7 @@ impl CliCommand for LoginCmd {
)
})?;
- self.server_session.store(&token.token)?;
+ self.server_session.store(token.token.expose_secret())?;
event!(target: PRINT_TARGET, Level::INFO,
"Successfully logged into Iggy server {}",
diff --git a/core/cli/src/commands/binary_users/create_user.rs
b/core/cli/src/commands/binary_users/create_user.rs
index 1a25459b7..a70d6b9de 100644
--- a/core/cli/src/commands/binary_users/create_user.rs
+++ b/core/cli/src/commands/binary_users/create_user.rs
@@ -23,6 +23,7 @@ use iggy_common::Client;
use iggy_common::Permissions;
use iggy_common::UserStatus;
use iggy_common::create_user::CreateUser;
+use secrecy::{ExposeSecret, SecretString};
use tracing::{Level, event};
pub struct CreateUserCmd {
@@ -39,7 +40,7 @@ impl CreateUserCmd {
Self {
create_user: CreateUser {
username,
- password,
+ password: SecretString::from(password),
status,
permissions,
},
@@ -50,31 +51,28 @@ impl CreateUserCmd {
#[async_trait]
impl CliCommand for CreateUserCmd {
fn explain(&self) -> String {
- format!(
- "create user with username: {} and password: {}",
- self.create_user.username, self.create_user.password
- )
+ format!("create user with username: {}", self.create_user.username)
}
async fn execute_cmd(&mut self, client: &dyn Client) -> anyhow::Result<(),
anyhow::Error> {
client
.create_user(
&self.create_user.username,
- &self.create_user.password,
+ self.create_user.password.expose_secret(),
self.create_user.status,
self.create_user.permissions.clone(),
)
.await
.with_context(|| {
format!(
- "Problem creating user (username: {} and password: {})",
- self.create_user.username, self.create_user.password
+ "Problem creating user (username: {})",
+ self.create_user.username
)
})?;
event!(target: PRINT_TARGET, Level::INFO,
- "User with username: {} and password: {} created",
- self.create_user.username, self.create_user.password
+ "User with username: {} created",
+ self.create_user.username
);
Ok(())
diff --git a/core/cli/src/credentials.rs b/core/cli/src/credentials.rs
index 4760218d3..0a92746c8 100644
--- a/core/cli/src/credentials.rs
+++ b/core/cli/src/credentials.rs
@@ -23,6 +23,7 @@ use iggy::clients::client::IggyClient;
use iggy::prelude::{Args, IggyError, PersonalAccessTokenClient, UserClient};
use iggy_cli::commands::binary_system::session::ServerSession;
use passterm::{Stream, isatty, prompt_password_stdin, prompt_password_tty};
+use secrecy::{ExposeSecret, SecretString};
use std::env::var;
#[cfg(feature = "login-session")]
@@ -40,13 +41,13 @@ static ENV_IGGY_PASSWORD: &str = "IGGY_PASSWORD";
struct IggyUserClient {
username: String,
- password: String,
+ password: SecretString,
}
enum Credentials {
UserNameAndPassword(IggyUserClient),
- PersonalAccessToken(String),
- SessionWithToken(String, String),
+ PersonalAccessToken(SecretString),
+ SessionWithToken(SecretString, String),
}
pub(crate) struct IggyCredentials<'a> {
@@ -73,7 +74,10 @@ impl<'a> IggyCredentials<'a> {
let server_session = ServerSession::new(server_address.clone());
if let Some(token) = server_session.get_token() {
return Ok(Self {
- credentials: Some(Credentials::SessionWithToken(token,
server_address)),
+ credentials: Some(Credentials::SessionWithToken(
+ SecretString::from(token),
+ server_address,
+ )),
iggy_client: None,
login_required,
});
@@ -91,7 +95,9 @@ impl<'a> IggyCredentials<'a> {
let token = entry.get_password()?;
Ok(Self {
- credentials:
Some(Credentials::PersonalAccessToken(token)),
+ credentials:
Some(Credentials::PersonalAccessToken(SecretString::from(
+ token,
+ ))),
iggy_client: None,
login_required,
})
@@ -102,19 +108,22 @@ impl<'a> IggyCredentials<'a> {
if let Some(token) = &cli_options.token {
Ok(Self {
- credentials:
Some(Credentials::PersonalAccessToken(token.clone())),
+ credentials:
Some(Credentials::PersonalAccessToken(SecretString::from(
+ token.clone(),
+ ))),
iggy_client: None,
login_required,
})
} else if let Some(username) = &cli_options.username {
let password = match &cli_options.password {
- Some(password) => password.clone(),
+ Some(password) => SecretString::from(password.clone()),
None => {
- if isatty(Stream::Stdin) {
+ let pwd = if isatty(Stream::Stdin) {
prompt_password_tty(Some("Password: "))?
} else {
prompt_password_stdin(None, Stream::Stdout)?
- }
+ };
+ SecretString::from(pwd)
}
};
@@ -130,7 +139,7 @@ impl<'a> IggyCredentials<'a> {
Ok(Self {
credentials:
Some(Credentials::UserNameAndPassword(IggyUserClient {
username: var(ENV_IGGY_USERNAME)?,
- password: var(ENV_IGGY_PASSWORD)?,
+ password: SecretString::from(var(ENV_IGGY_PASSWORD)?),
})),
iggy_client: None,
login_required,
@@ -154,7 +163,7 @@ impl<'a> IggyCredentials<'a> {
let _ = client
.login_user(
&username_and_password.username,
- &username_and_password.password,
+ username_and_password.password.expose_secret(),
)
.await
.with_context(|| {
@@ -166,14 +175,14 @@ impl<'a> IggyCredentials<'a> {
}
Credentials::PersonalAccessToken(token_value) => {
let _ = client
- .login_with_personal_access_token(token_value)
+
.login_with_personal_access_token(token_value.expose_secret())
.await
- .with_context(|| {
- format!("Problem with server login with token:
{token_value}")
- })?;
+ .with_context(|| "Problem with server login with
token".to_string())?;
}
Credentials::SessionWithToken(token_value, server_address) => {
- let login_result =
client.login_with_personal_access_token(token_value).await;
+ let login_result = client
+
.login_with_personal_access_token(token_value.expose_secret())
+ .await;
if let Err(err) = login_result {
if matches!(
err,
@@ -187,7 +196,7 @@ impl<'a> IggyCredentials<'a> {
"Login session expired for Iggy server:
{server_address}, please login again or use other authentication method"
);
} else {
- bail!("Problem with server login with token:
{token_value}");
+ bail!("Problem with server login with token");
}
}
}
diff --git a/core/common/Cargo.toml b/core/common/Cargo.toml
index 0e1d0bd1c..74d9dab1e 100644
--- a/core/common/Cargo.toml
+++ b/core/common/Cargo.toml
@@ -55,6 +55,7 @@ papaya = { workspace = true }
rcgen = { workspace = true }
ring = { workspace = true }
rustls = { workspace = true }
+secrecy = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
serde_with = { workspace = true, features = ["base64"] }
diff --git
a/core/common/src/commands/personal_access_tokens/login_with_personal_access_token.rs
b/core/common/src/commands/personal_access_tokens/login_with_personal_access_token.rs
index 59fafa9c1..73dedb34b 100644
---
a/core/common/src/commands/personal_access_tokens/login_with_personal_access_token.rs
+++
b/core/common/src/commands/personal_access_tokens/login_with_personal_access_token.rs
@@ -22,6 +22,7 @@ use crate::defaults::*;
use crate::error::IggyError;
use crate::{Command, LOGIN_WITH_PERSONAL_ACCESS_TOKEN_CODE};
use bytes::{BufMut, Bytes, BytesMut};
+use secrecy::{ExposeSecret, SecretString};
use serde::{Deserialize, Serialize};
use std::fmt::{Display, Formatter};
use std::str::from_utf8;
@@ -29,10 +30,11 @@ use std::str::from_utf8;
/// `LoginWithPersonalAccessToken` command is used to login the user with a
personal access token, instead of the username and password.
/// It has additional payload:
/// - `token` - personal access token
-#[derive(Debug, Serialize, Deserialize, PartialEq)]
+#[derive(Debug, Serialize, Deserialize)]
pub struct LoginWithPersonalAccessToken {
/// Personal access token
- pub token: String,
+ #[serde(serialize_with = "crate::utils::serde_secret::serialize_secret")]
+ pub token: SecretString,
}
impl Command for LoginWithPersonalAccessToken {
@@ -44,14 +46,15 @@ impl Command for LoginWithPersonalAccessToken {
impl Default for LoginWithPersonalAccessToken {
fn default() -> Self {
LoginWithPersonalAccessToken {
- token: "token".to_string(),
+ token: SecretString::from("token"),
}
}
}
impl Validatable<IggyError> for LoginWithPersonalAccessToken {
fn validate(&self) -> Result<(), IggyError> {
- if self.token.is_empty() || self.token.len() > MAX_PAT_LENGTH {
+ let token = self.token.expose_secret();
+ if token.is_empty() || token.len() > MAX_PAT_LENGTH {
return Err(IggyError::InvalidPersonalAccessToken);
}
@@ -61,10 +64,11 @@ impl Validatable<IggyError> for
LoginWithPersonalAccessToken {
impl BytesSerializable for LoginWithPersonalAccessToken {
fn to_bytes(&self) -> Bytes {
- let mut bytes = BytesMut::with_capacity(5 + self.token.len());
+ let token = self.token.expose_secret();
+ let mut bytes = BytesMut::with_capacity(5 + token.len());
#[allow(clippy::cast_possible_truncation)]
- bytes.put_u8(self.token.len() as u8);
- bytes.put_slice(self.token.as_bytes());
+ bytes.put_u8(token.len() as u8);
+ bytes.put_slice(token.as_bytes());
bytes.freeze()
}
@@ -82,14 +86,16 @@ impl BytesSerializable for LoginWithPersonalAccessToken {
.map_err(|_| IggyError::InvalidUtf8)?
.to_string();
- let command = LoginWithPersonalAccessToken { token };
+ let command = LoginWithPersonalAccessToken {
+ token: SecretString::from(token),
+ };
Ok(command)
}
}
impl Display for LoginWithPersonalAccessToken {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
- write!(f, "{}", self.token)
+ write!(f, "******")
}
}
@@ -100,14 +106,14 @@ mod tests {
#[test]
fn should_be_serialized_as_bytes() {
let command = LoginWithPersonalAccessToken {
- token: "test".to_string(),
+ token: SecretString::from("test"),
};
let bytes = command.to_bytes();
let token_length = bytes[0];
let token = from_utf8(&bytes[1..1 + token_length as usize]).unwrap();
assert!(!bytes.is_empty());
- assert_eq!(token, command.token);
+ assert_eq!(token, command.token.expose_secret());
}
#[test]
@@ -148,6 +154,20 @@ mod tests {
assert!(command.is_ok());
let command = command.unwrap();
- assert_eq!(command.token, token);
+ assert_eq!(command.token.expose_secret(), token);
+ }
+
+ #[test]
+ fn should_fail_validation_for_empty_token() {
+ let command = LoginWithPersonalAccessToken {
+ token: SecretString::from(""),
+ };
+ assert!(command.validate().is_err());
+ }
+
+ #[test]
+ fn should_pass_validation_for_valid_token() {
+ let command = LoginWithPersonalAccessToken::default();
+ assert!(command.validate().is_ok());
}
}
diff --git a/core/common/src/commands/users/change_password.rs
b/core/common/src/commands/users/change_password.rs
index ba8e251db..9d5272433 100644
--- a/core/common/src/commands/users/change_password.rs
+++ b/core/common/src/commands/users/change_password.rs
@@ -24,6 +24,7 @@ use crate::Validatable;
use crate::error::IggyError;
use crate::{CHANGE_PASSWORD_CODE, Command};
use bytes::{BufMut, Bytes, BytesMut};
+use secrecy::{ExposeSecret, SecretString};
use serde::{Deserialize, Serialize};
use std::fmt::Display;
use std::str::from_utf8;
@@ -33,15 +34,17 @@ use std::str::from_utf8;
/// - `user_id` - unique user ID (numeric or name).
/// - `current_password` - current password, must be between 3 and 100
characters long.
/// - `new_password` - new password, must be between 3 and 100 characters long.
-#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
+#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct ChangePassword {
/// Unique user ID (numeric or name).
#[serde(skip)]
pub user_id: Identifier,
/// Current password, must be between 3 and 100 characters long.
- pub current_password: String,
+ #[serde(serialize_with = "crate::utils::serde_secret::serialize_secret")]
+ pub current_password: SecretString,
/// New password, must be between 3 and 100 characters long.
- pub new_password: String,
+ #[serde(serialize_with = "crate::utils::serde_secret::serialize_secret")]
+ pub new_password: SecretString,
}
impl Command for ChangePassword {
@@ -54,24 +57,26 @@ impl Default for ChangePassword {
fn default() -> Self {
ChangePassword {
user_id: Identifier::default(),
- current_password: "secret".to_string(),
- new_password: "topsecret".to_string(),
+ current_password: SecretString::from("secret"),
+ new_password: SecretString::from("topsecret"),
}
}
}
impl Validatable<IggyError> for ChangePassword {
fn validate(&self) -> Result<(), IggyError> {
- if self.current_password.is_empty()
- || self.current_password.len() > MAX_PASSWORD_LENGTH
- || self.current_password.len() < MIN_PASSWORD_LENGTH
+ let current_password = self.current_password.expose_secret();
+ if current_password.is_empty()
+ || current_password.len() > MAX_PASSWORD_LENGTH
+ || current_password.len() < MIN_PASSWORD_LENGTH
{
return Err(IggyError::InvalidPassword);
}
- if self.new_password.is_empty()
- || self.new_password.len() > MAX_PASSWORD_LENGTH
- || self.new_password.len() < MIN_PASSWORD_LENGTH
+ let new_password = self.new_password.expose_secret();
+ if new_password.is_empty()
+ || new_password.len() > MAX_PASSWORD_LENGTH
+ || new_password.len() < MIN_PASSWORD_LENGTH
{
return Err(IggyError::InvalidPassword);
}
@@ -83,14 +88,16 @@ impl Validatable<IggyError> for ChangePassword {
impl BytesSerializable for ChangePassword {
fn to_bytes(&self) -> Bytes {
let user_id_bytes = self.user_id.to_bytes();
+ let current_password = self.current_password.expose_secret();
+ let new_password = self.new_password.expose_secret();
let mut bytes = BytesMut::new();
bytes.put_slice(&user_id_bytes);
#[allow(clippy::cast_possible_truncation)]
- bytes.put_u8(self.current_password.len() as u8);
- bytes.put_slice(self.current_password.as_bytes());
+ bytes.put_u8(current_password.len() as u8);
+ bytes.put_slice(current_password.as_bytes());
#[allow(clippy::cast_possible_truncation)]
- bytes.put_u8(self.new_password.len() as u8);
- bytes.put_slice(self.new_password.as_bytes());
+ bytes.put_u8(new_password.len() as u8);
+ bytes.put_slice(new_password.as_bytes());
bytes.freeze()
}
@@ -123,8 +130,8 @@ impl BytesSerializable for ChangePassword {
let command = ChangePassword {
user_id,
- current_password,
- new_password,
+ current_password: SecretString::from(current_password),
+ new_password: SecretString::from(new_password),
};
Ok(command)
}
@@ -144,8 +151,8 @@ mod tests {
fn should_be_serialized_as_bytes() {
let command = ChangePassword {
user_id: Identifier::numeric(1).unwrap(),
- current_password: "user".to_string(),
- new_password: "secret".to_string(),
+ current_password: SecretString::from("user"),
+ new_password: SecretString::from("secret"),
};
let bytes = command.to_bytes();
@@ -163,8 +170,8 @@ mod tests {
assert!(!bytes.is_empty());
assert_eq!(user_id, command.user_id);
- assert_eq!(current_password, command.current_password);
- assert_eq!(new_password, command.new_password);
+ assert_eq!(current_password, command.current_password.expose_secret());
+ assert_eq!(new_password, command.new_password.expose_secret());
}
#[test]
@@ -204,7 +211,36 @@ mod tests {
let command = command.unwrap();
assert_eq!(command.user_id, user_id);
- assert_eq!(command.current_password, current_password);
- assert_eq!(command.new_password, new_password);
+ assert_eq!(command.current_password.expose_secret(), current_password);
+ assert_eq!(command.new_password.expose_secret(), new_password);
+ }
+
+ #[test]
+ fn should_fail_validation_for_invalid_current_password() {
+ for password in ["", "ab"] {
+ let command = ChangePassword {
+ current_password: SecretString::from(password),
+ ..ChangePassword::default()
+ };
+ assert!(
+ command.validate().is_err(),
+ "expected validation error for current_password: {password:?}"
+ );
+ }
+ }
+
+ #[test]
+ fn should_fail_validation_for_empty_new_password() {
+ let command = ChangePassword {
+ new_password: SecretString::from(""),
+ ..ChangePassword::default()
+ };
+ assert!(command.validate().is_err());
+ }
+
+ #[test]
+ fn should_pass_validation_for_valid_command() {
+ let command = ChangePassword::default();
+ assert!(command.validate().is_ok());
}
}
diff --git a/core/common/src/commands/users/create_user.rs
b/core/common/src/commands/users/create_user.rs
index 6aa510ad6..3405bd486 100644
--- a/core/common/src/commands/users/create_user.rs
+++ b/core/common/src/commands/users/create_user.rs
@@ -24,6 +24,7 @@ use crate::Validatable;
use crate::error::IggyError;
use crate::{CREATE_USER_CODE, Command};
use bytes::{BufMut, Bytes, BytesMut};
+use secrecy::{ExposeSecret, SecretString};
use serde::{Deserialize, Serialize};
use std::fmt::Display;
use std::str::from_utf8;
@@ -34,12 +35,13 @@ use std::str::from_utf8;
/// - `password` - password of the user, must be between 3 and 100 characters
long.
/// - `status` - status of the user, can be either `active` or `inactive`.
/// - `permissions` - optional permissions of the user. If not provided, user
will have no permissions.
-#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
+#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct CreateUser {
/// Unique name of the user, must be between 3 and 50 characters long.
pub username: String,
/// Password of the user, must be between 3 and 100 characters long.
- pub password: String,
+ #[serde(serialize_with = "crate::utils::serde_secret::serialize_secret")]
+ pub password: SecretString,
/// Status of the user, can be either `active` or `inactive`.
pub status: UserStatus,
/// Optional permissions of the user. If not provided, user will have no
permissions.
@@ -56,7 +58,7 @@ impl Default for CreateUser {
fn default() -> Self {
CreateUser {
username: "user".to_string(),
- password: "secret".to_string(),
+ password: SecretString::from("secret"),
status: UserStatus::Active,
permissions: None,
}
@@ -72,9 +74,10 @@ impl Validatable<IggyError> for CreateUser {
return Err(IggyError::InvalidUsername);
}
- if self.password.is_empty()
- || self.password.len() > MAX_PASSWORD_LENGTH
- || self.password.len() < MIN_PASSWORD_LENGTH
+ let password = self.password.expose_secret();
+ if password.is_empty()
+ || password.len() > MAX_PASSWORD_LENGTH
+ || password.len() < MIN_PASSWORD_LENGTH
{
return Err(IggyError::InvalidPassword);
}
@@ -85,13 +88,14 @@ impl Validatable<IggyError> for CreateUser {
impl BytesSerializable for CreateUser {
fn to_bytes(&self) -> Bytes {
- let mut bytes = BytesMut::with_capacity(2 + self.username.len() +
self.password.len());
+ let password = self.password.expose_secret();
+ let mut bytes = BytesMut::with_capacity(2 + self.username.len() +
password.len());
#[allow(clippy::cast_possible_truncation)]
bytes.put_u8(self.username.len() as u8);
bytes.put_slice(self.username.as_bytes());
#[allow(clippy::cast_possible_truncation)]
- bytes.put_u8(self.password.len() as u8);
- bytes.put_slice(self.password.as_bytes());
+ bytes.put_u8(password.len() as u8);
+ bytes.put_slice(password.as_bytes());
bytes.put_u8(self.status.as_code());
if let Some(permissions) = &self.permissions {
bytes.put_u8(1);
@@ -161,7 +165,7 @@ impl BytesSerializable for CreateUser {
let command = CreateUser {
username,
- password,
+ password: SecretString::from(password),
status,
permissions,
};
@@ -193,7 +197,7 @@ mod tests {
fn should_be_serialized_as_bytes() {
let command = CreateUser {
username: "user".to_string(),
- password: "secret".to_string(),
+ password: SecretString::from("secret"),
status: UserStatus::Active,
permissions: Some(Permissions {
global: GlobalPermissions {
@@ -234,7 +238,7 @@ mod tests {
assert!(!bytes.is_empty());
assert_eq!(username, command.username);
- assert_eq!(password, command.password);
+ assert_eq!(password, command.password.expose_secret());
assert_eq!(status, command.status);
assert_eq!(has_permissions, 1);
assert_eq!(permissions, command.permissions.unwrap());
@@ -306,9 +310,38 @@ mod tests {
let command = command.unwrap();
assert_eq!(command.username, username);
- assert_eq!(command.password, password);
+ assert_eq!(command.password.expose_secret(), password);
assert_eq!(command.status, status);
assert!(command.permissions.is_some());
assert_eq!(command.permissions.unwrap(), permissions);
}
+
+ #[test]
+ fn should_fail_validation_for_empty_username() {
+ let command = CreateUser {
+ username: "".to_string(),
+ ..CreateUser::default()
+ };
+ assert!(command.validate().is_err());
+ }
+
+ #[test]
+ fn should_fail_validation_for_invalid_password() {
+ for password in ["", "ab"] {
+ let command = CreateUser {
+ password: SecretString::from(password),
+ ..CreateUser::default()
+ };
+ assert!(
+ command.validate().is_err(),
+ "expected validation error for password: {password:?}"
+ );
+ }
+ }
+
+ #[test]
+ fn should_pass_validation_for_valid_command() {
+ let command = CreateUser::default();
+ assert!(command.validate().is_ok());
+ }
}
diff --git a/core/common/src/commands/users/login_user.rs
b/core/common/src/commands/users/login_user.rs
index 9fe907f11..ce729d1c0 100644
--- a/core/common/src/commands/users/login_user.rs
+++ b/core/common/src/commands/users/login_user.rs
@@ -22,6 +22,7 @@ use crate::Validatable;
use crate::error::IggyError;
use crate::{Command, LOGIN_USER_CODE};
use bytes::{BufMut, Bytes, BytesMut};
+use secrecy::{ExposeSecret, SecretString};
use serde::{Deserialize, Serialize};
use std::fmt::Display;
use std::str::from_utf8;
@@ -30,12 +31,13 @@ use std::str::from_utf8;
/// It has additional payload:
/// - `username` - username, must be between 3 and 50 characters long.
/// - `password` - password, must be between 3 and 100 characters long.
-#[derive(Debug, Serialize, Deserialize, PartialEq)]
+#[derive(Debug, Serialize, Deserialize)]
pub struct LoginUser {
/// Username, must be between 3 and 50 characters long.
pub username: String,
/// Password, must be between 3 and 100 characters long.
- pub password: String,
+ #[serde(serialize_with = "crate::utils::serde_secret::serialize_secret")]
+ pub password: SecretString,
// Version metadata added by SDK.
pub version: Option<String>,
// Context metadata added by SDK.
@@ -52,7 +54,7 @@ impl Default for LoginUser {
fn default() -> Self {
LoginUser {
username: "user".to_string(),
- password: "secret".to_string(),
+ password: SecretString::from("secret"),
version: None,
context: None,
}
@@ -68,9 +70,10 @@ impl Validatable<IggyError> for LoginUser {
return Err(IggyError::InvalidUsername);
}
- if self.password.is_empty()
- || self.password.len() > MAX_PASSWORD_LENGTH
- || self.password.len() < MIN_PASSWORD_LENGTH
+ let password = self.password.expose_secret();
+ if password.is_empty()
+ || password.len() > MAX_PASSWORD_LENGTH
+ || password.len() < MIN_PASSWORD_LENGTH
{
return Err(IggyError::InvalidPassword);
}
@@ -81,13 +84,14 @@ impl Validatable<IggyError> for LoginUser {
impl BytesSerializable for LoginUser {
fn to_bytes(&self) -> Bytes {
- let mut bytes = BytesMut::with_capacity(2 + self.username.len() +
self.password.len());
+ let password = self.password.expose_secret();
+ let mut bytes = BytesMut::with_capacity(2 + self.username.len() +
password.len());
#[allow(clippy::cast_possible_truncation)]
bytes.put_u8(self.username.len() as u8);
bytes.put_slice(self.username.as_bytes());
#[allow(clippy::cast_possible_truncation)]
- bytes.put_u8(self.password.len() as u8);
- bytes.put_slice(self.password.as_bytes());
+ bytes.put_u8(password.len() as u8);
+ bytes.put_slice(password.as_bytes());
match &self.version {
Some(version) => {
bytes.put_u32_le(version.len() as u32);
@@ -200,7 +204,7 @@ impl BytesSerializable for LoginUser {
let command = LoginUser {
username,
- password,
+ password: SecretString::from(password),
version,
context,
};
@@ -222,7 +226,7 @@ mod tests {
fn should_be_serialized_as_bytes() {
let command = LoginUser {
username: "user".to_string(),
- password: "secret".to_string(),
+ password: SecretString::from("secret"),
version: Some("1.0.0".to_string()),
context: Some("test".to_string()),
};
@@ -253,7 +257,7 @@ mod tests {
assert!(!bytes.is_empty());
assert_eq!(username, command.username);
- assert_eq!(password, command.password);
+ assert_eq!(password, command.password.expose_secret());
assert_eq!(version, command.version);
assert_eq!(context, command.context);
}
@@ -267,7 +271,7 @@ mod tests {
fn from_bytes_should_fail_on_truncated_input() {
let command = LoginUser {
username: "user".to_string(),
- password: "secret".to_string(),
+ password: SecretString::from("secret"),
version: Some("1.0.0".to_string()),
context: Some("test".to_string()),
};
@@ -275,7 +279,7 @@ mod tests {
// Truncate at every position up to (but not including) the version
field.
// Positions within username/password must error; positions at or past
the
// version boundary are valid old-SDK payloads.
- let version_offset = 2 + command.username.len() +
command.password.len();
+ let version_offset = 2 + command.username.len() +
command.password.expose_secret().len();
for i in 0..version_offset {
let truncated = bytes.slice(..i);
assert!(
@@ -332,7 +336,7 @@ mod tests {
let command = LoginUser::from_bytes(bytes.freeze()).unwrap();
assert_eq!(command.username, username);
- assert_eq!(command.password, password);
+ assert_eq!(command.password.expose_secret(), password);
assert_eq!(command.version, None);
assert_eq!(command.context, None);
}
@@ -359,8 +363,37 @@ mod tests {
let command = command.unwrap();
assert_eq!(command.username, username);
- assert_eq!(command.password, password);
+ assert_eq!(command.password.expose_secret(), password);
assert_eq!(command.version, Some(version));
assert_eq!(command.context, Some(context));
}
+
+ #[test]
+ fn should_fail_validation_for_empty_username() {
+ let command = LoginUser {
+ username: "".to_string(),
+ ..LoginUser::default()
+ };
+ assert!(command.validate().is_err());
+ }
+
+ #[test]
+ fn should_fail_validation_for_invalid_password() {
+ for password in ["", "ab"] {
+ let command = LoginUser {
+ password: SecretString::from(password),
+ ..LoginUser::default()
+ };
+ assert!(
+ command.validate().is_err(),
+ "expected validation error for password: {password:?}"
+ );
+ }
+ }
+
+ #[test]
+ fn should_pass_validation_for_valid_command() {
+ let command = LoginUser::default();
+ assert!(command.validate().is_ok());
+ }
}
diff --git a/core/common/src/lib.rs b/core/common/src/lib.rs
index 349704d6c..b1b8aeb09 100644
--- a/core/common/src/lib.rs
+++ b/core/common/src/lib.rs
@@ -130,6 +130,7 @@ pub use utils::expiry::IggyExpiry;
pub use utils::hash::*;
pub use utils::personal_access_token_expiry::PersonalAccessTokenExpiry;
pub use utils::random_id;
+pub use utils::serde_secret;
pub use utils::text;
pub use utils::timestamp::*;
pub use utils::topic_size::MaxTopicSize;
diff --git a/core/common/src/traits/binary_impls/personal_access_tokens.rs
b/core/common/src/traits/binary_impls/personal_access_tokens.rs
index 1323ac28b..cd90fe6bf 100644
--- a/core/common/src/traits/binary_impls/personal_access_tokens.rs
+++ b/core/common/src/traits/binary_impls/personal_access_tokens.rs
@@ -28,6 +28,7 @@ use crate::{
ClientState, DiagnosticEvent, IdentityInfo, IggyError,
PersonalAccessTokenClient,
PersonalAccessTokenExpiry, PersonalAccessTokenInfo, RawPersonalAccessToken,
};
+use secrecy::SecretString;
#[async_trait::async_trait]
impl<B: BinaryClient> PersonalAccessTokenClient for B {
@@ -67,7 +68,7 @@ impl<B: BinaryClient> PersonalAccessTokenClient for B {
) -> Result<IdentityInfo, IggyError> {
let response = self
.send_with_response(&LoginWithPersonalAccessToken {
- token: token.to_string(),
+ token: SecretString::from(token),
})
.await?;
self.set_state(ClientState::Authenticated).await;
diff --git a/core/common/src/traits/binary_impls/users.rs
b/core/common/src/traits/binary_impls/users.rs
index 9b4edc8f9..dc2fd1fc2 100644
--- a/core/common/src/traits/binary_impls/users.rs
+++ b/core/common/src/traits/binary_impls/users.rs
@@ -32,6 +32,7 @@ use crate::{
ClientState, DiagnosticEvent, Identifier, IdentityInfo, IggyError,
Permissions, UserClient,
UserInfo, UserInfoDetails, UserStatus,
};
+use secrecy::SecretString;
#[async_trait::async_trait]
impl<B: BinaryClient> UserClient for B {
@@ -66,7 +67,7 @@ impl<B: BinaryClient> UserClient for B {
let response = self
.send_with_response(&CreateUser {
username: username.to_string(),
- password: password.to_string(),
+ password: SecretString::from(password),
status,
permissions,
})
@@ -122,8 +123,8 @@ impl<B: BinaryClient> UserClient for B {
fail_if_not_authenticated(self).await?;
self.send_with_response(&ChangePassword {
user_id: user_id.clone(),
- current_password: current_password.to_string(),
- new_password: new_password.to_string(),
+ current_password: SecretString::from(current_password),
+ new_password: SecretString::from(new_password),
})
.await?;
Ok(())
@@ -133,7 +134,7 @@ impl<B: BinaryClient> UserClient for B {
let response = self
.send_with_response(&LoginUser {
username: username.to_string(),
- password: password.to_string(),
+ password: SecretString::from(password),
version: Some(env!("CARGO_PKG_VERSION").to_string()),
context: Some("".to_string()),
})
diff --git a/core/common/src/traits/binary_mapper.rs
b/core/common/src/traits/binary_mapper.rs
index 727f16aa1..f211febaf 100644
--- a/core/common/src/traits/binary_mapper.rs
+++ b/core/common/src/traits/binary_mapper.rs
@@ -24,6 +24,7 @@ use crate::{
Stream, StreamDetails, Topic, TopicDetails, UserInfo, UserInfoDetails,
UserStatus,
};
use bytes::Bytes;
+use secrecy::SecretString;
use std::collections::HashMap;
use std::str::from_utf8;
@@ -469,7 +470,9 @@ pub fn map_raw_pat(payload: Bytes) ->
Result<RawPersonalAccessToken, IggyError>
let token = from_utf8(&payload[1..1 + token_length as usize])
.map_err(|_| IggyError::InvalidUtf8)?
.to_string();
- Ok(RawPersonalAccessToken { token })
+ Ok(RawPersonalAccessToken {
+ token: SecretString::from(token),
+ })
}
pub fn map_client(payload: Bytes) -> Result<ClientInfoDetails, IggyError> {
diff --git a/core/common/src/types/configuration/auth_config/auto_login.rs
b/core/common/src/types/configuration/auth_config/auto_login.rs
index b9d08dd8b..63ff2e541 100644
--- a/core/common/src/types/configuration/auth_config/auto_login.rs
+++ b/core/common/src/types/configuration/auth_config/auto_login.rs
@@ -17,7 +17,7 @@
use crate::Credentials;
-#[derive(Debug, Clone, PartialEq)]
+#[derive(Debug, Clone)]
pub enum AutoLogin {
Disabled,
Enabled(Credentials),
diff --git
a/core/common/src/types/configuration/auth_config/connection_string.rs
b/core/common/src/types/configuration/auth_config/connection_string.rs
index 391633d53..35332474a 100644
--- a/core/common/src/types/configuration/auth_config/connection_string.rs
+++ b/core/common/src/types/configuration/auth_config/connection_string.rs
@@ -17,6 +17,7 @@
*/
use crate::{AutoLogin, ConnectionStringOptions, Credentials, IggyError,
TransportProtocol};
+use secrecy::SecretString;
use std::str::FromStr;
const DEFAULT_CONNECTION_STRING_PREFIX: &str = "iggy://";
@@ -105,7 +106,7 @@ impl<T: ConnectionStringOptions + Default>
ConnectionString<T> {
return Ok(ConnectionString {
server_address: server_address.to_owned(),
auto_login:
AutoLogin::Enabled(Credentials::PersonalAccessToken(
- pat_token.to_owned(),
+ SecretString::from(pat_token),
)),
options: connection_string_options,
});
@@ -115,7 +116,7 @@ impl<T: ConnectionStringOptions + Default>
ConnectionString<T> {
server_address: server_address.to_owned(),
auto_login: AutoLogin::Enabled(Credentials::UsernamePassword(
username.to_owned(),
- password.to_owned(),
+ SecretString::from(password),
)),
options: connection_string_options,
})
@@ -160,6 +161,7 @@ mod tests {
use super::*;
use crate::IggyDuration;
use crate::TcpConnectionStringOptions;
+ use secrecy::ExposeSecret;
#[test]
fn should_fail_without_username() {
@@ -243,13 +245,13 @@ mod tests {
connection_string.server_address,
format!("{server_address}:{port}")
);
- assert_eq!(
- connection_string.auto_login,
- AutoLogin::Enabled(Credentials::UsernamePassword(
- username.to_string(),
- password.to_string()
- ))
- );
+ match &connection_string.auto_login {
+ AutoLogin::Enabled(Credentials::UsernamePassword(u, p)) => {
+ assert_eq!(u, username);
+ assert_eq!(p.expose_secret(), password);
+ }
+ _ => panic!("Expected UsernamePassword credentials"),
+ }
assert!(connection_string.options.retries().is_none());
assert_eq!(
@@ -277,13 +279,13 @@ mod tests {
connection_string.server_address,
format!("{server_address}:{port}")
);
- assert_eq!(
- connection_string.auto_login,
- AutoLogin::Enabled(Credentials::UsernamePassword(
- username.to_string(),
- password.to_string()
- ))
- );
+ match &connection_string.auto_login {
+ AutoLogin::Enabled(Credentials::UsernamePassword(u, p)) => {
+ assert_eq!(u, username);
+ assert_eq!(p.expose_secret(), password);
+ }
+ _ => panic!("Expected UsernamePassword credentials"),
+ }
assert_eq!(connection_string.options.retries().unwrap(), 3);
assert_eq!(
@@ -306,10 +308,12 @@ mod tests {
connection_string.server_address,
format!("{server_address}:{port}")
);
- assert_eq!(
- connection_string.auto_login,
-
AutoLogin::Enabled(Credentials::PersonalAccessToken(pat.to_string()))
- );
+ match &connection_string.auto_login {
+ AutoLogin::Enabled(Credentials::PersonalAccessToken(token)) => {
+ assert_eq!(token.expose_secret(), pat);
+ }
+ _ => panic!("Expected PersonalAccessToken credentials"),
+ }
assert!(connection_string.options.retries().is_none());
assert_eq!(
diff --git a/core/common/src/types/configuration/auth_config/credentials.rs
b/core/common/src/types/configuration/auth_config/credentials.rs
index ca6e00f67..61e291c6e 100644
--- a/core/common/src/types/configuration/auth_config/credentials.rs
+++ b/core/common/src/types/configuration/auth_config/credentials.rs
@@ -15,8 +15,27 @@
// specific language governing permissions and limitations
// under the License.
-#[derive(Debug, Clone, PartialEq)]
+use secrecy::SecretString;
+use std::fmt;
+
+#[derive(Clone)]
pub enum Credentials {
- UsernamePassword(String, String),
- PersonalAccessToken(String),
+ UsernamePassword(String, SecretString),
+ PersonalAccessToken(SecretString),
+}
+
+impl fmt::Debug for Credentials {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ match self {
+ Credentials::UsernamePassword(username, _) => f
+ .debug_tuple("UsernamePassword")
+ .field(username)
+ .field(&"[REDACTED]")
+ .finish(),
+ Credentials::PersonalAccessToken(_) => f
+ .debug_tuple("PersonalAccessToken")
+ .field(&"[REDACTED]")
+ .finish(),
+ }
+ }
}
diff --git a/core/common/src/types/permissions/personal_access_token.rs
b/core/common/src/types/permissions/personal_access_token.rs
index 0837ba17c..6fc5be45c 100644
--- a/core/common/src/types/permissions/personal_access_token.rs
+++ b/core/common/src/types/permissions/personal_access_token.rs
@@ -16,16 +16,28 @@
* under the License.
*/
+use crate::utils::serde_secret::serialize_secret;
use crate::utils::timestamp::IggyTimestamp;
+use secrecy::SecretString;
use serde::{Deserialize, Serialize};
+use std::fmt;
/// `RawPersonalAccessToken` represents the raw personal access token - the
secured token which is returned only once during the creation.
/// It consists of the following fields:
/// - `token`: the unique token that should be securely stored by the user and
can be used for authentication.
-#[derive(Debug, Serialize, Deserialize)]
+#[derive(Serialize, Deserialize)]
pub struct RawPersonalAccessToken {
/// The unique token that should be securely stored by the user and can be
used for authentication.
- pub token: String,
+ #[serde(serialize_with = "serialize_secret")]
+ pub token: SecretString,
+}
+
+impl fmt::Debug for RawPersonalAccessToken {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("RawPersonalAccessToken")
+ .field("token", &"[REDACTED]")
+ .finish()
+ }
}
/// `PersonalAccessToken` represents the personal access token. It does not
contain the token itself, but the information about the token.
diff --git a/core/common/src/types/user/user_identity_info.rs
b/core/common/src/types/user/user_identity_info.rs
index 64005eec2..fef4946d3 100644
--- a/core/common/src/types/user/user_identity_info.rs
+++ b/core/common/src/types/user/user_identity_info.rs
@@ -17,7 +17,10 @@
*/
use crate::UserId;
+use crate::utils::serde_secret::serialize_secret;
+use secrecy::SecretString;
use serde::{Deserialize, Serialize};
+use std::fmt;
/// `IdentityInfo` represents the information about an identity.
/// It consists of the following fields:
@@ -35,10 +38,20 @@ pub struct IdentityInfo {
/// It consists of the following fields:
/// - `token`: the value of token.
/// - `expiry`: the expiry of token.
-#[derive(Debug, Serialize, Deserialize)]
+#[derive(Serialize, Deserialize)]
pub struct TokenInfo {
/// The value of token.
- pub token: String,
+ #[serde(serialize_with = "serialize_secret")]
+ pub token: SecretString,
/// The expiry of token.
pub expiry: u64,
}
+
+impl fmt::Debug for TokenInfo {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("TokenInfo")
+ .field("token", &"[REDACTED]")
+ .field("expiry", &self.expiry)
+ .finish()
+ }
+}
diff --git a/core/common/src/utils/mod.rs b/core/common/src/utils/mod.rs
index 00793cad1..0a300136e 100644
--- a/core/common/src/utils/mod.rs
+++ b/core/common/src/utils/mod.rs
@@ -25,6 +25,7 @@ pub(crate) mod expiry;
pub(crate) mod hash;
pub(crate) mod personal_access_token_expiry;
pub mod random_id;
+pub mod serde_secret;
pub mod text;
pub(crate) mod timestamp;
pub(crate) mod topic_size;
diff --git a/core/common/src/utils/serde_secret.rs
b/core/common/src/utils/serde_secret.rs
new file mode 100644
index 000000000..8c6a7700c
--- /dev/null
+++ b/core/common/src/utils/serde_secret.rs
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+//! Serde serialization helpers for `SecretString` fields.
+//!
+//! `SecretString` intentionally does not implement `Serialize` to prevent
+//! accidental secret exposure. These helpers are for fields that **must** be
+//! serialized (e.g., wire protocol payloads, persisted TOML configs, API
+//! responses that already expose credentials by design).
+//!
+//! Usage:
+//! ```ignore
+//! #[serde(serialize_with = "crate::utils::serde_secret::serialize_secret")]
+//! pub password: SecretString,
+//! ```
+//!
+//! Do **not** add `serialize_with` to fields that should remain redacted in
+//! serialized output — rely on `SecretString`'s default behavior instead.
+
+use secrecy::{ExposeSecret, SecretString};
+
+pub fn serialize_secret<S: serde::Serializer>(
+ secret: &SecretString,
+ serializer: S,
+) -> Result<S::Ok, S::Error> {
+ serializer.serialize_str(secret.expose_secret())
+}
+
+pub fn serialize_optional_secret<S: serde::Serializer>(
+ secret: &Option<SecretString>,
+ serializer: S,
+) -> Result<S::Ok, S::Error> {
+ match secret {
+ Some(s) => serializer.serialize_some(s.expose_secret()),
+ None => serializer.serialize_none(),
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use serde::{Deserialize, Serialize};
+
+ #[derive(Serialize, Deserialize)]
+ struct WithSecret {
+ #[serde(serialize_with = "serialize_secret")]
+ password: SecretString,
+ }
+
+ #[derive(Serialize, Deserialize)]
+ struct WithOptionalSecret {
+ #[serde(serialize_with = "serialize_optional_secret")]
+ token: Option<SecretString>,
+ }
+
+ #[test]
+ fn serialize_secret_preserves_value_in_json() {
+ let s = WithSecret {
+ password: SecretString::from("my_password"),
+ };
+ let json = serde_json::to_string(&s).unwrap();
+ assert_eq!(json, r#"{"password":"my_password"}"#);
+ }
+
+ #[test]
+ fn serialize_secret_roundtrips_through_json() {
+ let original = WithSecret {
+ password: SecretString::from("roundtrip"),
+ };
+ let json = serde_json::to_string(&original).unwrap();
+ let restored: WithSecret = serde_json::from_str(&json).unwrap();
+ assert_eq!(restored.password.expose_secret(), "roundtrip");
+ }
+
+ #[test]
+ fn serialize_optional_secret_with_some_value() {
+ let s = WithOptionalSecret {
+ token: Some(SecretString::from("tok_123")),
+ };
+ let json = serde_json::to_string(&s).unwrap();
+ assert_eq!(json, r#"{"token":"tok_123"}"#);
+ }
+
+ #[test]
+ fn serialize_optional_secret_with_none() {
+ let s = WithOptionalSecret { token: None };
+ let json = serde_json::to_string(&s).unwrap();
+ assert_eq!(json, r#"{"token":null}"#);
+ }
+}
diff --git a/core/connectors/runtime/Cargo.toml
b/core/connectors/runtime/Cargo.toml
index cee58dd17..20d42deec 100644
--- a/core/connectors/runtime/Cargo.toml
+++ b/core/connectors/runtime/Cargo.toml
@@ -60,6 +60,7 @@ reqwest = { workspace = true }
reqwest-middleware = { workspace = true }
reqwest-retry = { workspace = true }
reqwest-tracing = { workspace = true }
+secrecy = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
serde_with = { workspace = true }
diff --git a/core/connectors/runtime/src/api/auth.rs
b/core/connectors/runtime/src/api/auth.rs
index 6f4439af1..57ccf207a 100644
--- a/core/connectors/runtime/src/api/auth.rs
+++ b/core/connectors/runtime/src/api/auth.rs
@@ -24,6 +24,7 @@ use axum::{
middleware::Next,
response::Response,
};
+use secrecy::ExposeSecret;
use std::sync::Arc;
const API_KEY_HEADER: &str = "api-key";
@@ -38,7 +39,7 @@ pub async fn resolve_api_key(
return Ok(next.run(request).await);
}
- if context.api_key.is_empty() {
+ if context.api_key.expose_secret().is_empty() {
return Ok(next.run(request).await);
};
@@ -50,7 +51,7 @@ pub async fn resolve_api_key(
return Err(StatusCode::UNAUTHORIZED);
};
- if api_key != context.api_key {
+ if api_key != context.api_key.expose_secret() {
return Err(StatusCode::UNAUTHORIZED);
}
diff --git a/core/connectors/runtime/src/api/config.rs
b/core/connectors/runtime/src/api/config.rs
index 2b5bc009a..72b48f9a0 100644
--- a/core/connectors/runtime/src/api/config.rs
+++ b/core/connectors/runtime/src/api/config.rs
@@ -21,6 +21,8 @@ use crate::configs::connectors::ConfigFormat;
use crate::error::RuntimeError;
use axum::http::{HeaderValue, Method};
use configs_derive::ConfigEnv;
+use iggy_common::serde_secret::serialize_secret;
+use secrecy::SecretString;
use serde::{Deserialize, Serialize};
use std::fmt::Formatter;
use tower_http::cors::{AllowOrigin, CorsLayer};
@@ -31,17 +33,31 @@ pub const YAML_HEADER: HeaderValue =
HeaderValue::from_static("application/yaml"
pub const TOML_HEADER: HeaderValue =
HeaderValue::from_static("application/toml");
pub const TEXT_HEADER: HeaderValue = HeaderValue::from_static("text/plain");
-#[derive(Debug, Clone, Deserialize, Serialize, ConfigEnv)]
+#[derive(Clone, Deserialize, Serialize, ConfigEnv)]
pub struct HttpConfig {
pub enabled: bool,
pub address: String,
- #[config_env(secret)]
- pub api_key: String,
+ #[config_env(secret, leaf)]
+ #[serde(serialize_with = "serialize_secret")]
+ pub api_key: SecretString,
pub cors: HttpCorsConfig,
pub tls: HttpTlsConfig,
pub metrics: HttpMetricsConfig,
}
+impl std::fmt::Debug for HttpConfig {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("HttpConfig")
+ .field("enabled", &self.enabled)
+ .field("address", &self.address)
+ .field("api_key", &"[REDACTED]")
+ .field("cors", &self.cors)
+ .field("tls", &self.tls)
+ .field("metrics", &self.metrics)
+ .finish()
+ }
+}
+
#[derive(Debug, Clone, Deserialize, Serialize, ConfigEnv)]
pub struct HttpMetricsConfig {
pub enabled: bool,
@@ -166,7 +182,7 @@ impl Default for HttpConfig {
Self {
enabled: true,
address: "localhost:8081".to_owned(),
- api_key: "".to_owned(),
+ api_key: SecretString::from(""),
cors: HttpCorsConfig::default(),
tls: HttpTlsConfig::default(),
metrics: HttpMetricsConfig::default(),
@@ -178,8 +194,8 @@ impl std::fmt::Display for HttpConfig {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
- "{{ address: {}, api_key: {}, cors: {}, tls: {}, metrics: {} }}",
- self.address, self.api_key, self.cors, self.tls, self.metrics
+ "{{ address: {}, api_key: ******, cors: {}, tls: {}, metrics: {}
}}",
+ self.address, self.cors, self.tls, self.metrics
)
}
}
diff --git a/core/connectors/runtime/src/context.rs
b/core/connectors/runtime/src/context.rs
index a87095463..a15d4bea6 100644
--- a/core/connectors/runtime/src/context.rs
+++ b/core/connectors/runtime/src/context.rs
@@ -30,6 +30,7 @@ use crate::{
use iggy_common::IggyTimestamp;
use iggy_connector_sdk::api::ConnectorError;
use iggy_connector_sdk::api::ConnectorStatus;
+use secrecy::SecretString;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::Mutex;
@@ -38,7 +39,7 @@ use tracing::error;
pub struct RuntimeContext {
pub sinks: SinkManager,
pub sources: SourceManager,
- pub api_key: String,
+ pub api_key: SecretString,
pub config_provider: Arc<dyn ConnectorsConfigProvider>,
pub metrics: Arc<Metrics>,
pub start_time: IggyTimestamp,
@@ -67,7 +68,7 @@ pub fn init(
RuntimeContext {
sinks,
sources,
- api_key: config.http.api_key.to_owned(),
+ api_key: config.http.api_key.clone(),
config_provider: Arc::from(config_provider),
metrics,
start_time: IggyTimestamp::now(),
diff --git a/core/connectors/sinks/elasticsearch_sink/Cargo.toml
b/core/connectors/sinks/elasticsearch_sink/Cargo.toml
index 2598fcc8f..40923ea9d 100644
--- a/core/connectors/sinks/elasticsearch_sink/Cargo.toml
+++ b/core/connectors/sinks/elasticsearch_sink/Cargo.toml
@@ -42,6 +42,7 @@ elasticsearch = { workspace = true }
iggy_common = { workspace = true }
iggy_connector_sdk = { workspace = true }
once_cell = { workspace = true }
+secrecy = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
simd-json = { workspace = true }
diff --git a/core/connectors/sinks/elasticsearch_sink/src/lib.rs
b/core/connectors/sinks/elasticsearch_sink/src/lib.rs
index b9f17d6e8..36c76935c 100644
--- a/core/connectors/sinks/elasticsearch_sink/src/lib.rs
+++ b/core/connectors/sinks/elasticsearch_sink/src/lib.rs
@@ -28,6 +28,7 @@ use iggy_common::IggyTimestamp;
use iggy_connector_sdk::{
ConsumedMessage, Error, MessagesMetadata, Payload, Sink, TopicMetadata,
sink_connector,
};
+use secrecy::{ExposeSecret, SecretString};
use serde::{Deserialize, Serialize};
use serde_json::json;
use simd_json::{OwnedValue, prelude::*};
@@ -73,7 +74,8 @@ pub struct ElasticsearchSinkConfig {
pub url: String,
pub index: String,
pub username: Option<String>,
- pub password: Option<String>,
+ #[serde(serialize_with =
"iggy_common::serde_secret::serialize_optional_secret")]
+ pub password: Option<SecretString>,
pub batch_size: Option<usize>,
pub timeout_seconds: Option<u64>,
pub create_index_if_not_exists: Option<bool>,
@@ -110,7 +112,8 @@ impl ElasticsearchSink {
let mut transport_builder = TransportBuilder::new(conn_pool);
if let (Some(username), Some(password)) = (&self.config.username,
&self.config.password) {
- let credentials = Credentials::Basic(username.clone(),
password.clone());
+ let credentials =
+ Credentials::Basic(username.clone(),
password.expose_secret().to_string());
transport_builder = transport_builder.auth(credentials);
}
diff --git a/core/connectors/sinks/mongodb_sink/Cargo.toml
b/core/connectors/sinks/mongodb_sink/Cargo.toml
index 4acbf7a38..0a4305f2f 100644
--- a/core/connectors/sinks/mongodb_sink/Cargo.toml
+++ b/core/connectors/sinks/mongodb_sink/Cargo.toml
@@ -34,8 +34,10 @@ crate-type = ["cdylib", "lib"]
[dependencies]
async-trait = { workspace = true }
humantime = { workspace = true }
+iggy_common = { workspace = true }
iggy_connector_sdk = { workspace = true }
mongodb = { workspace = true }
+secrecy = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
tokio = { workspace = true }
diff --git a/core/connectors/sinks/mongodb_sink/src/lib.rs
b/core/connectors/sinks/mongodb_sink/src/lib.rs
index 34b01ca7b..2a4e89101 100644
--- a/core/connectors/sinks/mongodb_sink/src/lib.rs
+++ b/core/connectors/sinks/mongodb_sink/src/lib.rs
@@ -22,6 +22,7 @@ use iggy_connector_sdk::{
ConsumedMessage, Error, MessagesMetadata, Sink, TopicMetadata,
sink_connector,
};
use mongodb::{Client, Collection, bson, options::ClientOptions};
+use secrecy::{ExposeSecret, SecretString};
use serde::{Deserialize, Serialize};
use std::str::FromStr;
use std::sync::atomic::{AtomicU64, Ordering};
@@ -52,7 +53,8 @@ pub struct MongoDbSink {
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MongoDbSinkConfig {
- pub connection_uri: String,
+ #[serde(serialize_with = "iggy_common::serde_secret::serialize_secret")]
+ pub connection_uri: SecretString,
pub database: String,
pub collection: String,
pub max_pool_size: Option<u32>,
@@ -169,11 +171,11 @@ impl Sink for MongoDbSink {
impl MongoDbSink {
/// Build a MongoDB client using ClientOptions so max_pool_size can be
applied.
async fn connect(&mut self) -> Result<(), Error> {
- let redacted = redact_connection_uri(&self.config.connection_uri);
+ let redacted =
redact_connection_uri(self.config.connection_uri.expose_secret());
info!("Connecting to MongoDB: {redacted}");
- let mut options = ClientOptions::parse(&self.config.connection_uri)
+ let mut options =
ClientOptions::parse(self.config.connection_uri.expose_secret())
.await
.map_err(|e| Error::InitError(format!("Failed to parse connection
URI: {e}")))?;
@@ -598,7 +600,7 @@ mod tests {
fn given_default_config() -> MongoDbSinkConfig {
MongoDbSinkConfig {
- connection_uri: "mongodb://localhost:27017".to_string(),
+ connection_uri: SecretString::from("mongodb://localhost:27017"),
database: "test_db".to_string(),
collection: "test_collection".to_string(),
max_pool_size: None,
diff --git a/core/connectors/sinks/postgres_sink/Cargo.toml
b/core/connectors/sinks/postgres_sink/Cargo.toml
index fb992f1eb..56ab685a3 100644
--- a/core/connectors/sinks/postgres_sink/Cargo.toml
+++ b/core/connectors/sinks/postgres_sink/Cargo.toml
@@ -42,6 +42,7 @@ humantime = { workspace = true }
iggy_common = { workspace = true }
iggy_connector_sdk = { workspace = true }
once_cell = { workspace = true }
+secrecy = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
simd-json = { workspace = true }
diff --git a/core/connectors/sinks/postgres_sink/src/lib.rs
b/core/connectors/sinks/postgres_sink/src/lib.rs
index 2cbdcd605..e45572a72 100644
--- a/core/connectors/sinks/postgres_sink/src/lib.rs
+++ b/core/connectors/sinks/postgres_sink/src/lib.rs
@@ -22,6 +22,7 @@ use iggy_common::{DateTime, Utc};
use iggy_connector_sdk::{
ConsumedMessage, Error, MessagesMetadata, Sink, TopicMetadata,
sink_connector,
};
+use secrecy::{ExposeSecret, SecretString};
use serde::{Deserialize, Serialize};
use sqlx::postgres::PgPoolOptions;
use sqlx::{Pool, Postgres};
@@ -47,7 +48,8 @@ pub struct PostgresSink {
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PostgresSinkConfig {
- pub connection_string: String,
+ #[serde(serialize_with = "iggy_common::serde_secret::serialize_secret")]
+ pub connection_string: SecretString,
pub target_table: String,
pub batch_size: Option<u32>,
pub max_connections: Option<u32>,
@@ -159,13 +161,13 @@ impl Sink for PostgresSink {
impl PostgresSink {
async fn connect(&mut self) -> Result<(), Error> {
let max_connections = self.config.max_connections.unwrap_or(10);
- let redacted =
redact_connection_string(&self.config.connection_string);
+ let redacted =
redact_connection_string(self.config.connection_string.expose_secret());
info!("Connecting to PostgreSQL with max {max_connections}
connections: {redacted}");
let pool = PgPoolOptions::new()
.max_connections(max_connections)
- .connect(&self.config.connection_string)
+ .connect(self.config.connection_string.expose_secret())
.await
.map_err(|e| Error::InitError(format!("Failed to connect to
PostgreSQL: {e}")))?;
@@ -546,7 +548,7 @@ mod tests {
fn test_config() -> PostgresSinkConfig {
PostgresSinkConfig {
- connection_string: "postgres://localhost/db".to_string(),
+ connection_string: SecretString::from("postgres://localhost/db"),
target_table: "messages".to_string(),
batch_size: Some(100),
max_connections: None,
diff --git a/core/connectors/sources/elasticsearch_source/Cargo.toml
b/core/connectors/sources/elasticsearch_source/Cargo.toml
index f705bde7f..d1bb9c654 100644
--- a/core/connectors/sources/elasticsearch_source/Cargo.toml
+++ b/core/connectors/sources/elasticsearch_source/Cargo.toml
@@ -42,6 +42,7 @@ humantime = { workspace = true }
iggy_common = { workspace = true }
iggy_connector_sdk = { workspace = true }
once_cell = { workspace = true }
+secrecy = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
simd-json = { workspace = true }
diff --git a/core/connectors/sources/elasticsearch_source/src/lib.rs
b/core/connectors/sources/elasticsearch_source/src/lib.rs
index fc934847b..42466d15b 100644
--- a/core/connectors/sources/elasticsearch_source/src/lib.rs
+++ b/core/connectors/sources/elasticsearch_source/src/lib.rs
@@ -26,6 +26,7 @@ use iggy_common::{DateTime, Utc};
use iggy_connector_sdk::{
ConnectorState, Error, ProducedMessage, ProducedMessages, Schema, Source,
source_connector,
};
+use secrecy::{ExposeSecret, SecretString};
use serde::{Deserialize, Serialize};
use serde_json::{Value, json};
use std::str::FromStr;
@@ -95,7 +96,8 @@ pub struct ElasticsearchSourceConfig {
pub url: String,
pub index: String,
pub username: Option<String>,
- pub password: Option<String>,
+ #[serde(serialize_with =
"iggy_common::serde_secret::serialize_optional_secret")]
+ pub password: Option<SecretString>,
pub query: Option<Value>,
pub polling_interval: Option<String>,
pub batch_size: Option<usize>,
@@ -303,7 +305,8 @@ impl ElasticsearchSource {
let mut transport_builder = TransportBuilder::new(conn_pool);
if let (Some(username), Some(password)) = (&self.config.username,
&self.config.password) {
- let credentials = Credentials::Basic(username.clone(),
password.clone());
+ let credentials =
+ Credentials::Basic(username.clone(),
password.expose_secret().to_string());
transport_builder = transport_builder.auth(credentials);
}
diff --git a/core/connectors/sources/postgres_source/Cargo.toml
b/core/connectors/sources/postgres_source/Cargo.toml
index 2b0969901..bbc82bed2 100644
--- a/core/connectors/sources/postgres_source/Cargo.toml
+++ b/core/connectors/sources/postgres_source/Cargo.toml
@@ -47,6 +47,7 @@ humantime = { workspace = true }
iggy_common = { workspace = true }
iggy_connector_sdk = { workspace = true }
once_cell = { workspace = true }
+secrecy = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
simd-json = { workspace = true }
diff --git a/core/connectors/sources/postgres_source/src/lib.rs
b/core/connectors/sources/postgres_source/src/lib.rs
index 798c4b9e9..f01bcba3f 100644
--- a/core/connectors/sources/postgres_source/src/lib.rs
+++ b/core/connectors/sources/postgres_source/src/lib.rs
@@ -23,6 +23,7 @@ use iggy_common::{DateTime, Utc};
use iggy_connector_sdk::{
ConnectorState, Error, ProducedMessage, ProducedMessages, Schema, Source,
source_connector,
};
+use secrecy::{ExposeSecret, SecretString};
use serde::{Deserialize, Serialize};
use sqlx::postgres::PgPoolOptions;
use sqlx::{Column, Pool, Postgres, Row, TypeInfo};
@@ -51,7 +52,8 @@ pub struct PostgresSource {
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PostgresSourceConfig {
- pub connection_string: String,
+ #[serde(serialize_with = "iggy_common::serde_secret::serialize_secret")]
+ pub connection_string: SecretString,
pub mode: String,
pub tables: Vec<String>,
pub poll_interval: Option<String>,
@@ -263,13 +265,13 @@ impl Source for PostgresSource {
impl PostgresSource {
async fn connect(&mut self) -> Result<(), Error> {
let max_connections = self.config.max_connections.unwrap_or(10);
- let redacted =
redact_connection_string(&self.config.connection_string);
+ let redacted =
redact_connection_string(self.config.connection_string.expose_secret());
info!("Connecting to PostgreSQL with max {max_connections}
connections: {redacted}");
let pool = PgPoolOptions::new()
.max_connections(max_connections)
- .connect(&self.config.connection_string)
+ .connect(self.config.connection_string.expose_secret())
.await
.map_err(|e| Error::InitError(format!("Failed to connect to
PostgreSQL: {e}")))?;
@@ -1143,7 +1145,7 @@ mod tests {
fn test_config() -> PostgresSourceConfig {
PostgresSourceConfig {
- connection_string: "postgres://localhost/db".to_string(),
+ connection_string: SecretString::from("postgres://localhost/db"),
mode: "polling".to_string(),
tables: vec!["users".to_string()],
poll_interval: Some("5s".to_string()),
diff --git a/core/integration/Cargo.toml b/core/integration/Cargo.toml
index b0cf3baf9..9851e5be0 100644
--- a/core/integration/Cargo.toml
+++ b/core/integration/Cargo.toml
@@ -61,6 +61,7 @@ rmcp = { workspace = true, features = [
"transport-streamable-http-client",
"transport-streamable-http-client-reqwest",
] }
+secrecy = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
serial_test = { workspace = true }
diff --git
a/core/integration/tests/cli/personal_access_token/test_pat_login_options.rs
b/core/integration/tests/cli/personal_access_token/test_pat_login_options.rs
index 144163998..2709c5254 100644
--- a/core/integration/tests/cli/personal_access_token/test_pat_login_options.rs
+++ b/core/integration/tests/cli/personal_access_token/test_pat_login_options.rs
@@ -23,6 +23,7 @@ use iggy::prelude::Client;
use iggy::prelude::PersonalAccessTokenExpiry;
use keyring::Entry;
use predicates::str::{contains, starts_with};
+use secrecy::ExposeSecret;
use serial_test::parallel;
use std::fmt::{Display, Formatter, Result};
@@ -88,11 +89,11 @@ impl IggyCmdTestCase for TestLoginOptions {
.await;
assert!(token.is_ok());
let token = token.unwrap();
- let token_value = token.token.clone();
- self.token_value = Some(token.token);
+ let token_value = token.token.expose_secret().to_owned();
self.keyring
- .set_password(token_value.as_str())
+ .set_password(&token_value)
.expect("Failed to set token");
+ self.token_value = Some(token_value);
}
fn get_command(&self) -> IggyCmdCommand {
diff --git a/core/integration/tests/cli/user/test_user_create_command.rs
b/core/integration/tests/cli/user/test_user_create_command.rs
index f880cf46a..310485a3d 100644
--- a/core/integration/tests/cli/user/test_user_create_command.rs
+++ b/core/integration/tests/cli/user/test_user_create_command.rs
@@ -101,10 +101,10 @@ impl IggyCmdTestCase for TestUserCreateCmd {
}
fn verify_command(&self, command_state: Assert) {
- command_state
- .success()
- .stdout(diff(format!("Executing create user with username: {} and
password: {}\nUser with username: {} and password: {} created\n",
- self.username, self.password,
self.username, self.password)));
+ command_state.success().stdout(diff(format!(
+ "Executing create user with username: {}\nUser with username: {}
created\n",
+ self.username, self.username
+ )));
}
async fn verify_server_state(&self, client: &dyn Client) {
diff --git a/core/integration/tests/mcp/mod.rs
b/core/integration/tests/mcp/mod.rs
index 2d08b4b4f..61560bb8c 100644
--- a/core/integration/tests/mcp/mod.rs
+++ b/core/integration/tests/mcp/mod.rs
@@ -31,6 +31,7 @@ use rmcp::{
serde::de::DeserializeOwned,
serde_json::{self, json},
};
+use secrecy::ExposeSecret;
async fn invoke<T: DeserializeOwned>(
client: &McpClient,
@@ -512,7 +513,7 @@ async fn should_create_personal_access_token(harness:
&TestHarness) {
)
.await;
- assert!(!token.token.is_empty());
+ assert!(!token.token.expose_secret().is_empty());
}
#[iggy_harness(server(mcp), seed = seeds::mcp_standard)]
diff --git a/core/integration/tests/server/scenarios/authentication_scenario.rs
b/core/integration/tests/server/scenarios/authentication_scenario.rs
index 4dc440c4f..f7b08b520 100644
--- a/core/integration/tests/server/scenarios/authentication_scenario.rs
+++ b/core/integration/tests/server/scenarios/authentication_scenario.rs
@@ -30,6 +30,7 @@ use crate::server::scenarios::create_client;
use bytes::Bytes;
use iggy::prelude::*;
use integration::harness::{TestHarness, login_root};
+use secrecy::ExposeSecret;
use server::binary::command::ServerCommand;
use strum::IntoEnumIterator;
@@ -90,7 +91,7 @@ pub async fn run(harness: &TestHarness) {
);
let identity = client
- .login_with_personal_access_token(&raw_pat.token)
+ .login_with_personal_access_token(raw_pat.token.expose_secret())
.await
.expect("PAT login should work");
assert_eq!(identity.user_id, 0, "PAT should authenticate as root");
diff --git
a/core/integration/tests/server/scenarios/cross_protocol_pat_scenario.rs
b/core/integration/tests/server/scenarios/cross_protocol_pat_scenario.rs
index bedfcfe67..130c557a6 100644
--- a/core/integration/tests/server/scenarios/cross_protocol_pat_scenario.rs
+++ b/core/integration/tests/server/scenarios/cross_protocol_pat_scenario.rs
@@ -24,6 +24,7 @@ use iggy::prelude::*;
use iggy_common::TransportProtocol;
use integration::harness::TestHarness;
use integration::iggy_harness;
+use secrecy::ExposeSecret;
const PAT_NAME: &str = "cross-protocol-test-pat";
const TCP_CLIENT_COUNT: usize = 20;
@@ -48,7 +49,7 @@ pub async fn
should_see_pat_created_via_http_when_listing_via_tcp(harness: &Test
.await
.expect("Failed to create PAT via HTTP");
- assert!(!created_pat.token.is_empty());
+ assert!(!created_pat.token.expose_secret().is_empty());
let http_pats = http_client
.get_personal_access_tokens()
@@ -107,7 +108,7 @@ pub async fn
should_see_pat_created_via_tcp_when_listing_via_http(harness: &Test
.await
.expect("Failed to create PAT via TCP");
- assert!(!created_pat.token.is_empty());
+ assert!(!created_pat.token.expose_secret().is_empty());
let http_client = create_root_client(harness,
TransportProtocol::Http).await;
diff --git a/core/integration/tests/server/scenarios/purge_delete_scenario.rs
b/core/integration/tests/server/scenarios/purge_delete_scenario.rs
index 2e971be40..211f3c5fc 100644
--- a/core/integration/tests/server/scenarios/purge_delete_scenario.rs
+++ b/core/integration/tests/server/scenarios/purge_delete_scenario.rs
@@ -19,6 +19,7 @@ use bytes::Bytes;
use iggy::prelude::*;
use iggy_common::Credentials;
use integration::harness::TestHarness;
+use secrecy::SecretString;
use std::fs::{metadata, read_dir};
use std::path::Path;
use std::str::FromStr;
@@ -961,7 +962,7 @@ fn build_root_client(harness: &TestHarness) -> IggyClient {
.with_server_address(addr.to_string())
.with_auto_sign_in(AutoLogin::Enabled(Credentials::UsernamePassword(
DEFAULT_ROOT_USERNAME.to_string(),
- DEFAULT_ROOT_PASSWORD.to_string(),
+ SecretString::from(DEFAULT_ROOT_PASSWORD),
)))
.with_reconnection_max_retries(Some(10))
.with_reconnection_interval(interval)
diff --git
a/core/integration/tests/server/scenarios/stale_client_consumer_group_scenario.rs
b/core/integration/tests/server/scenarios/stale_client_consumer_group_scenario.rs
index e6fc95650..b3b83fff5 100644
---
a/core/integration/tests/server/scenarios/stale_client_consumer_group_scenario.rs
+++
b/core/integration/tests/server/scenarios/stale_client_consumer_group_scenario.rs
@@ -20,6 +20,7 @@ use futures::StreamExt;
use iggy::prelude::*;
use iggy_common::Credentials;
use integration::iggy_harness;
+use secrecy::SecretString;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
@@ -50,7 +51,7 @@ async fn create_reconnecting_client(server_addr: &str) ->
IggyClient {
nodelay: true,
auto_login: AutoLogin::Enabled(Credentials::UsernamePassword(
DEFAULT_ROOT_USERNAME.to_string(),
- DEFAULT_ROOT_PASSWORD.to_string(),
+ SecretString::from(DEFAULT_ROOT_PASSWORD),
)),
reconnection: TcpClientReconnectionConfig {
enabled: true,
diff --git a/core/integration/tests/server/scenarios/user_scenario.rs
b/core/integration/tests/server/scenarios/user_scenario.rs
index f582711b3..d1904d6c1 100644
--- a/core/integration/tests/server/scenarios/user_scenario.rs
+++ b/core/integration/tests/server/scenarios/user_scenario.rs
@@ -24,6 +24,7 @@ use iggy::prelude::defaults::DEFAULT_ROOT_USERNAME;
use iggy::prelude::{GlobalPermissions, Permissions};
use iggy::prelude::{PersonalAccessTokenClient, SEC_IN_MICRO, SystemClient,
UserClient};
use integration::harness::{TestHarness, assert_clean_system, login_root};
+use secrecy::ExposeSecret;
pub async fn run(harness: &TestHarness) {
let client = create_client(harness).await;
@@ -146,14 +147,14 @@ pub async fn run(harness: &TestHarness) {
.await
.unwrap();
- assert!(!raw_pat1.token.is_empty());
+ assert!(!raw_pat1.token.expose_secret().is_empty());
let raw_pat2 = client
.create_personal_access_token(pat_name2,
PersonalAccessTokenExpiry::NeverExpire)
.await
.unwrap();
- assert!(!raw_pat2.token.is_empty());
+ assert!(!raw_pat2.token.expose_secret().is_empty());
// 14. Get personal access tokens and verify that the token is there
let personal_access_tokens =
client.get_personal_access_tokens().await.unwrap();
@@ -164,14 +165,14 @@ pub async fn run(harness: &TestHarness) {
// 16. Login with the personal access tokens
let identity_info = client
- .login_with_personal_access_token(&raw_pat1.token)
+ .login_with_personal_access_token(raw_pat1.token.expose_secret())
.await
.unwrap();
assert_eq!(identity_info.user_id, 1);
let identity_info = client
- .login_with_personal_access_token(&raw_pat2.token)
+ .login_with_personal_access_token(raw_pat2.token.expose_secret())
.await
.unwrap();
diff --git a/core/integration/tests/state/file.rs
b/core/integration/tests/state/file.rs
index 9b1a9d181..838c8d4a6 100644
--- a/core/integration/tests/state/file.rs
+++ b/core/integration/tests/state/file.rs
@@ -21,6 +21,7 @@ use bytes::Bytes;
use iggy::prelude::BytesSerializable;
use iggy_common::create_stream::CreateStream;
use iggy_common::create_user::CreateUser;
+use secrecy::SecretString;
use server::state::command::EntryCommand;
use server::state::entry::StateEntry;
use server::state::models::{CreateStreamWithId, CreateUserWithId};
@@ -45,7 +46,7 @@ async fn should_apply_single_entry() {
user_id: 1,
command: CreateUser {
username: "test".to_string(),
- password: "secret".to_string(),
+ password: SecretString::from("secret"),
status: Default::default(),
permissions: None,
},
@@ -71,7 +72,7 @@ async fn should_apply_encrypted_entry() {
user_id: 1,
command: CreateUser {
username: "test".to_string(),
- password: "secret".to_string(),
+ password: SecretString::from("secret"),
status: Default::default(),
permissions: None,
},
@@ -103,7 +104,7 @@ async fn should_apply_multiple_entries() {
user_id: created_user_id,
command: CreateUser {
username: "test".to_string(),
- password: "secret".to_string(),
+ password: SecretString::from("secret"),
status: Default::default(),
permissions: None,
},
diff --git a/core/integration/tests/state/system.rs
b/core/integration/tests/state/system.rs
index d54146014..ca567de41 100644
--- a/core/integration/tests/state/system.rs
+++ b/core/integration/tests/state/system.rs
@@ -25,6 +25,7 @@ use iggy_common::create_stream::CreateStream;
use iggy_common::create_topic::CreateTopic;
use iggy_common::create_user::CreateUser;
use iggy_common::delete_stream::DeleteStream;
+use secrecy::{ExposeSecret, SecretString};
use server::state::command::EntryCommand;
use server::state::models::{
CreateConsumerGroupWithId, CreatePersonalAccessTokenWithHash,
CreateStreamWithId,
@@ -41,13 +42,13 @@ async fn should_be_initialized_based_on_state_entries() {
let user_id = 0;
let create_user = CreateUser {
username: "user".to_string(),
- password: "secret".to_string(),
+ password: SecretString::from("secret"),
status: Default::default(),
permissions: None,
};
let create_user_clone = CreateUser {
username: "user".to_string(),
- password: "secret".to_string(),
+ password: SecretString::from("secret"),
status: Default::default(),
permissions: None,
};
@@ -222,7 +223,10 @@ async fn should_be_initialized_based_on_state_entries() {
let mut user = system.users.remove(&user_id).unwrap();
assert_eq!(user.id, user_id);
assert_eq!(user.username, create_user_clone.username);
- assert_eq!(user.password_hash, create_user_clone.password);
+ assert_eq!(
+ user.password_hash,
+ create_user_clone.password.expose_secret()
+ );
assert_eq!(user.personal_access_tokens.len(), 1);
let personal_access_token = user
diff --git a/core/metadata/Cargo.toml b/core/metadata/Cargo.toml
index 6cdad3ad4..e0d024aaf 100644
--- a/core/metadata/Cargo.toml
+++ b/core/metadata/Cargo.toml
@@ -37,6 +37,7 @@ left-right = { workspace = true }
message_bus = { workspace = true }
paste = { workspace = true }
rmp-serde = { workspace = true }
+secrecy = { workspace = true }
serde = { workspace = true, features = ["derive"] }
slab = { workspace = true }
tracing = { workspace = true }
diff --git a/core/metadata/src/stm/user.rs b/core/metadata/src/stm/user.rs
index 8e0ec5d8e..5ebf2ee5c 100644
--- a/core/metadata/src/stm/user.rs
+++ b/core/metadata/src/stm/user.rs
@@ -32,6 +32,7 @@ use iggy_common::{
GlobalPermissions, IggyTimestamp, Permissions, PersonalAccessToken,
StreamPermissions, UserId,
UserStatus,
};
+use secrecy::ExposeSecret;
use serde::{Deserialize, Serialize};
use slab::Slab;
use std::sync::Arc;
@@ -137,7 +138,7 @@ impl StateHandler for CreateUser {
let user = User {
id: 0,
username: username_arc.clone(),
- password_hash: Arc::from(self.password.as_str()),
+ password_hash: Arc::from(self.password.expose_secret()),
status: self.status,
created_at: iggy_common::IggyTimestamp::now(),
permissions: self.permissions.as_ref().map(|p|
Arc::new(p.clone())),
@@ -214,7 +215,7 @@ impl StateHandler for ChangePassword {
};
if let Some(user) = state.items.get_mut(user_id) {
- user.password_hash = Arc::from(self.new_password.as_str());
+ user.password_hash = Arc::from(self.new_password.expose_secret());
}
Bytes::new()
}
diff --git a/core/sdk/Cargo.toml b/core/sdk/Cargo.toml
index 4d2af3e6e..6bee2175f 100644
--- a/core/sdk/Cargo.toml
+++ b/core/sdk/Cargo.toml
@@ -45,6 +45,7 @@ reqwest-middleware = { workspace = true }
reqwest-retry = { workspace = true }
reqwest-tracing = { workspace = true }
rustls = { workspace = true }
+secrecy = { workspace = true }
serde = { workspace = true }
tokio = { workspace = true }
tokio-rustls = { workspace = true }
diff --git a/core/sdk/src/client_provider.rs b/core/sdk/src/client_provider.rs
index fa93fd2c8..694351815 100644
--- a/core/sdk/src/client_provider.rs
+++ b/core/sdk/src/client_provider.rs
@@ -30,6 +30,7 @@ use iggy_common::{
AutoLogin, Credentials, TransportProtocol, WebSocketClientConfig,
WebSocketClientReconnectionConfig, WebSocketConfig,
};
+use secrecy::SecretString;
use std::str::FromStr;
use std::sync::Arc;
@@ -106,7 +107,7 @@ impl ClientProviderConfig {
auto_login: if auto_login {
AutoLogin::Enabled(Credentials::UsernamePassword(
args.username,
- args.password,
+ SecretString::from(args.password),
))
} else {
AutoLogin::Disabled
@@ -150,7 +151,7 @@ impl ClientProviderConfig {
auto_login: if auto_login {
AutoLogin::Enabled(Credentials::UsernamePassword(
args.username,
- args.password,
+ SecretString::from(args.password),
))
} else {
AutoLogin::Disabled
@@ -175,7 +176,7 @@ impl ClientProviderConfig {
auto_login: if auto_login {
AutoLogin::Enabled(Credentials::UsernamePassword(
args.username,
- args.password,
+ SecretString::from(args.password),
))
} else {
AutoLogin::Disabled
diff --git a/core/sdk/src/http/http_client.rs b/core/sdk/src/http/http_client.rs
index a6ddcaa65..e2142f4f5 100644
--- a/core/sdk/src/http/http_client.rs
+++ b/core/sdk/src/http/http_client.rs
@@ -29,6 +29,7 @@ use reqwest::{Response, StatusCode, Url};
use reqwest_middleware::{ClientBuilder, ClientWithMiddleware};
use reqwest_retry::{RetryTransientMiddleware, policies::ExponentialBackoff};
use reqwest_tracing::{SpanBackendWithUrl, TracingMiddleware};
+use secrecy::ExposeSecret;
use serde::Serialize;
use std::ops::Deref;
use std::str::FromStr;
@@ -249,7 +250,7 @@ impl HttpTransport for HttpClient {
}
let access_token = identity.access_token.as_ref().unwrap();
- self.set_access_token(Some(access_token.token.clone()))
+
self.set_access_token(Some(access_token.token.expose_secret().to_owned()))
.await;
Ok(())
}
diff --git a/core/sdk/src/http/personal_access_tokens.rs
b/core/sdk/src/http/personal_access_tokens.rs
index f9c6923a2..902f0baf4 100644
--- a/core/sdk/src/http/personal_access_tokens.rs
+++ b/core/sdk/src/http/personal_access_tokens.rs
@@ -26,6 +26,7 @@ use iggy_common::PersonalAccessTokenExpiry;
use iggy_common::create_personal_access_token::CreatePersonalAccessToken;
use
iggy_common::login_with_personal_access_token::LoginWithPersonalAccessToken;
use iggy_common::{PersonalAccessTokenInfo, RawPersonalAccessToken};
+use secrecy::SecretString;
const PATH: &str = "/personal-access-tokens";
@@ -74,7 +75,7 @@ impl PersonalAccessTokenClient for HttpClient {
.post(
&format!("{PATH}/login"),
&LoginWithPersonalAccessToken {
- token: token.to_string(),
+ token: SecretString::from(token),
},
)
.await?;
diff --git a/core/sdk/src/http/users.rs b/core/sdk/src/http/users.rs
index 496e94e86..08ad3b504 100644
--- a/core/sdk/src/http/users.rs
+++ b/core/sdk/src/http/users.rs
@@ -27,6 +27,7 @@ use iggy_common::login_user::LoginUser;
use iggy_common::update_permissions::UpdatePermissions;
use iggy_common::update_user::UpdateUser;
use iggy_common::{IdentityInfo, Permissions, UserInfo, UserInfoDetails,
UserStatus};
+use secrecy::SecretString;
const PATH: &str = "/users";
@@ -70,7 +71,7 @@ impl UserClient for HttpClient {
PATH,
&CreateUser {
username: username.to_string(),
- password: password.to_string(),
+ password: SecretString::from(password),
status,
permissions,
},
@@ -133,8 +134,8 @@ impl UserClient for HttpClient {
&format!("{PATH}/{}/password", &user_id.as_cow_str()),
&ChangePassword {
user_id: user_id.clone(),
- current_password: current_password.to_string(),
- new_password: new_password.to_string(),
+ current_password: SecretString::from(current_password),
+ new_password: SecretString::from(new_password),
},
)
.await?;
@@ -147,7 +148,7 @@ impl UserClient for HttpClient {
&format!("{PATH}/login"),
&LoginUser {
username: username.to_string(),
- password: password.to_string(),
+ password: SecretString::from(password),
version: Some(env!("CARGO_PKG_VERSION").to_string()),
context: Some("".to_string()),
},
diff --git a/core/sdk/src/quic/quic_client.rs b/core/sdk/src/quic/quic_client.rs
index 6cde88509..a53dab901 100644
--- a/core/sdk/src/quic/quic_client.rs
+++ b/core/sdk/src/quic/quic_client.rs
@@ -32,6 +32,7 @@ use iggy_common::{
use quinn::crypto::rustls::QuicClientConfig as QuinnQuicClientConfig;
use quinn::{ClientConfig, Connection, Endpoint, IdleTimeout, RecvStream,
VarInt};
use rustls::crypto::CryptoProvider;
+use secrecy::ExposeSecret;
use std::net::{SocketAddr, ToSocketAddrs};
use std::str::FromStr;
use std::sync::Arc;
@@ -393,7 +394,7 @@ impl QuicClient {
self.set_state(ClientState::Authenticating).await;
match credentials {
Credentials::UsernamePassword(username, password) => {
- self.login_user(username, password).await?;
+ self.login_user(username,
password.expose_secret()).await?;
self.publish_event(DiagnosticEvent::SignedIn).await;
info!(
"{NAME} client: {} has signed in with the user
credentials, username: {username}",
@@ -401,7 +402,8 @@ impl QuicClient {
);
}
Credentials::PersonalAccessToken(token) => {
-
self.login_with_personal_access_token(token).await?;
+
self.login_with_personal_access_token(token.expose_secret())
+ .await?;
self.publish_event(DiagnosticEvent::SignedIn).await;
info!(
"{NAME} client: {} has signed in with a
personal access token.",
@@ -785,13 +787,13 @@ mod tests {
quic_client_config.server_address,
format!("{server_address}:{port}")
);
- assert_eq!(
- quic_client_config.auto_login,
- AutoLogin::Enabled(Credentials::UsernamePassword(
- username.to_string(),
- password.to_string()
- ))
- );
+ match &quic_client_config.auto_login {
+ AutoLogin::Enabled(Credentials::UsernamePassword(u, p)) => {
+ assert_eq!(u, &username.to_string());
+ assert_eq!(p.expose_secret(), password);
+ }
+ other => panic!("expected UsernamePassword auto_login, got
{other:?}"),
+ }
assert_eq!(quic_client_config.response_buffer_size, 10_000_000);
assert_eq!(quic_client_config.max_concurrent_bidi_streams, 10_000);
@@ -840,13 +842,13 @@ mod tests {
quic_client_config.server_address,
format!("{server_address}:{port}")
);
- assert_eq!(
- quic_client_config.auto_login,
- AutoLogin::Enabled(Credentials::UsernamePassword(
- username.to_string(),
- password.to_string()
- ))
- );
+ match &quic_client_config.auto_login {
+ AutoLogin::Enabled(Credentials::UsernamePassword(u, p)) => {
+ assert_eq!(u, &username.to_string());
+ assert_eq!(p.expose_secret(), password);
+ }
+ other => panic!("expected UsernamePassword auto_login, got
{other:?}"),
+ }
assert_eq!(quic_client_config.response_buffer_size, 10_000_000);
assert_eq!(quic_client_config.max_concurrent_bidi_streams, 10_000);
@@ -893,10 +895,12 @@ mod tests {
quic_client_config.server_address,
format!("{server_address}:{port}")
);
- assert_eq!(
- quic_client_config.auto_login,
-
AutoLogin::Enabled(Credentials::PersonalAccessToken(pat.to_string()))
- );
+ match &quic_client_config.auto_login {
+ AutoLogin::Enabled(Credentials::PersonalAccessToken(t)) => {
+ assert_eq!(t.expose_secret(), pat);
+ }
+ other => panic!("expected PersonalAccessToken auto_login, got
{other:?}"),
+ }
assert_eq!(quic_client_config.response_buffer_size, 10_000_000);
assert_eq!(quic_client_config.max_concurrent_bidi_streams, 10_000);
diff --git a/core/sdk/src/tcp/tcp_client.rs b/core/sdk/src/tcp/tcp_client.rs
index bd61df2ca..12ffb1c9a 100644
--- a/core/sdk/src/tcp/tcp_client.rs
+++ b/core/sdk/src/tcp/tcp_client.rs
@@ -32,6 +32,7 @@ use iggy_common::{
};
use iggy_common::{BinaryClient, BinaryTransport, PersonalAccessTokenClient,
UserClient};
use rustls::pki_types::{CertificateDer, ServerName, pem::PemObject};
+use secrecy::ExposeSecret;
use std::net::SocketAddr;
use std::str::FromStr;
use std::sync::Arc;
@@ -443,13 +444,14 @@ impl TcpClient {
self.set_state(ClientState::Authenticating).await;
match credentials {
Credentials::UsernamePassword(username, password) => {
- self.login_user(username, password).await?;
+ self.login_user(username,
password.expose_secret()).await?;
info!(
"{NAME} client: {client_address} has signed in
with the user credentials, username: {username}",
);
}
Credentials::PersonalAccessToken(token) => {
-
self.login_with_personal_access_token(token).await?;
+
self.login_with_personal_access_token(token.expose_secret())
+ .await?;
info!(
"{NAME} client: {client_address} has signed in
with a personal access token.",
);
@@ -766,13 +768,13 @@ mod tests {
tcp_client_config.server_address,
format!("{server_address}:{port}")
);
- assert_eq!(
- tcp_client_config.auto_login,
- AutoLogin::Enabled(Credentials::UsernamePassword(
- username.to_string(),
- password.to_string()
- ))
- );
+ match &tcp_client_config.auto_login {
+ AutoLogin::Enabled(Credentials::UsernamePassword(u, p)) => {
+ assert_eq!(u, &username.to_string());
+ assert_eq!(p.expose_secret(), password);
+ }
+ other => panic!("expected UsernamePassword auto_login, got
{other:?}"),
+ }
assert!(!tcp_client_config.tls_enabled);
assert!(tcp_client_config.tls_domain.is_empty());
@@ -815,13 +817,13 @@ mod tests {
tcp_client_config.server_address,
format!("{server_address}:{port}")
);
- assert_eq!(
- tcp_client_config.auto_login,
- AutoLogin::Enabled(Credentials::UsernamePassword(
- username.to_string(),
- password.to_string()
- ))
- );
+ match &tcp_client_config.auto_login {
+ AutoLogin::Enabled(Credentials::UsernamePassword(u, p)) => {
+ assert_eq!(u, &username.to_string());
+ assert_eq!(p.expose_secret(), password);
+ }
+ other => panic!("expected UsernamePassword auto_login, got
{other:?}"),
+ }
assert!(!tcp_client_config.tls_enabled);
assert!(tcp_client_config.tls_domain.is_empty());
@@ -862,10 +864,12 @@ mod tests {
tcp_client_config.server_address,
format!("{server_address}:{port}")
);
- assert_eq!(
- tcp_client_config.auto_login,
-
AutoLogin::Enabled(Credentials::PersonalAccessToken(pat.to_string()))
- );
+ match &tcp_client_config.auto_login {
+ AutoLogin::Enabled(Credentials::PersonalAccessToken(t)) => {
+ assert_eq!(t.expose_secret(), pat);
+ }
+ other => panic!("expected PersonalAccessToken auto_login, got
{other:?}"),
+ }
assert!(!tcp_client_config.tls_enabled);
assert!(tcp_client_config.tls_domain.is_empty());
diff --git a/core/sdk/src/websocket/websocket_client.rs
b/core/sdk/src/websocket/websocket_client.rs
index 0cbeb6b28..f86856d98 100644
--- a/core/sdk/src/websocket/websocket_client.rs
+++ b/core/sdk/src/websocket/websocket_client.rs
@@ -32,6 +32,7 @@ use iggy_common::{
WebSocketConnectionStringOptions,
};
use iggy_common::{BinaryClient, BinaryTransport, PersonalAccessTokenClient,
UserClient};
+use secrecy::ExposeSecret;
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::net::TcpStream;
@@ -496,14 +497,15 @@ impl WebSocketClient {
self.set_state(ClientState::Authenticating).await;
match credentials {
Credentials::UsernamePassword(username, password) => {
- self.login_user(username, password).await?;
+ self.login_user(username,
password.expose_secret()).await?;
info!(
"{NAME} client: {client_address} has signed in
with the user credentials, username: {username}",
);
Ok(())
}
Credentials::PersonalAccessToken(token) => {
- self.login_with_personal_access_token(token).await?;
+
self.login_with_personal_access_token(token.expose_secret())
+ .await?;
info!(
"{NAME} client: {client_address} has signed in
with a personal access token.",
);
diff --git a/core/server/Cargo.toml b/core/server/Cargo.toml
index c16ce5a6c..b1eb2a74f 100644
--- a/core/server/Cargo.toml
+++ b/core/server/Cargo.toml
@@ -85,6 +85,7 @@ rolling-file = { workspace = true }
rust-embed = { workspace = true, optional = true }
rustls = { workspace = true }
rustls-pemfile = { workspace = true }
+secrecy = { workspace = true }
send_wrapper = { workspace = true }
serde = { workspace = true }
slab = { workspace = true }
diff --git a/core/server/src/binary/command.rs
b/core/server/src/binary/command.rs
index 78a2b4417..cac319f59 100644
--- a/core/server/src/binary/command.rs
+++ b/core/server/src/binary/command.rs
@@ -424,9 +424,9 @@ mod tests {
let mut bytes = BytesMut::with_capacity(payload.len());
bytes.put_slice(&payload);
let bytes = Bytes::from(bytes);
- assert_eq!(
- &ServerCommand::from_code_and_payload(command_id, bytes).unwrap(),
- command
- );
+ let deserialized = ServerCommand::from_code_and_payload(command_id,
bytes).unwrap();
+ // SecretString doesn't implement PartialEq, so we compare serialized
bytes instead.
+ // This is sufficient since to_bytes() covers all
wire-protocol-relevant fields.
+ assert_eq!(deserialized.to_bytes(), command.to_bytes());
}
}
diff --git
a/core/server/src/binary/handlers/personal_access_tokens/login_with_personal_access_token_handler.rs
b/core/server/src/binary/handlers/personal_access_tokens/login_with_personal_access_token_handler.rs
index db8a94f51..3a7475d3b 100644
---
a/core/server/src/binary/handlers/personal_access_tokens/login_with_personal_access_token_handler.rs
+++
b/core/server/src/binary/handlers/personal_access_tokens/login_with_personal_access_token_handler.rs
@@ -28,6 +28,7 @@ use crate::streaming::session::Session;
use err_trail::ErrContext;
use
iggy_common::login_with_personal_access_token::LoginWithPersonalAccessToken;
use iggy_common::{IggyError, SenderKind};
+use secrecy::ExposeSecret;
use tracing::{debug, instrument};
impl ServerCommandHandler for LoginWithPersonalAccessToken {
@@ -44,16 +45,12 @@ impl ServerCommandHandler for LoginWithPersonalAccessToken {
shard: &Rc<IggyShard>,
) -> Result<HandlerResult, IggyError> {
debug!("session: {session}, command: {self}");
+ let token = self.token.expose_secret();
let user = shard
- .login_with_personal_access_token(&self.token, Some(session))
+ .login_with_personal_access_token(token, Some(session))
.error(|e: &IggyError| {
- let redacted_token = if self.token.len() > 4 {
- format!("{}****", &self.token[..4])
- } else {
- "****".to_string()
- };
format!(
- "{COMPONENT} (error: {e}) - failed to login with personal
access token: {redacted_token}, session: {session}",
+ "{COMPONENT} (error: {e}) - failed to login with personal
access token, session: {session}",
)
})?;
let identity_info = mapper::map_identity_info(user.id);
diff --git a/core/server/src/binary/handlers/users/login_user_handler.rs
b/core/server/src/binary/handlers/users/login_user_handler.rs
index 89a981c2a..4836ffc7c 100644
--- a/core/server/src/binary/handlers/users/login_user_handler.rs
+++ b/core/server/src/binary/handlers/users/login_user_handler.rs
@@ -27,6 +27,7 @@ use crate::streaming::session::Session;
use err_trail::ErrContext;
use iggy_common::login_user::LoginUser;
use iggy_common::{IggyError, SenderKind};
+use secrecy::ExposeSecret;
use std::rc::Rc;
use tracing::{debug, info, instrument, warn};
@@ -54,7 +55,7 @@ impl ServerCommandHandler for LoginUser {
} = self;
info!("Logging in user: {} ...", &username);
let user = shard
- .login_user(&username, &password, Some(session))
+ .login_user(&username, password.expose_secret(), Some(session))
.error(|e: &IggyError| {
format!(
"{COMPONENT} (error: {e}) - failed to login user with
name: {}, session: {session}",
diff --git a/core/server/src/binary/macros.rs b/core/server/src/binary/macros.rs
index 22e5a6677..655705088 100644
--- a/core/server/src/binary/macros.rs
+++ b/core/server/src/binary/macros.rs
@@ -33,7 +33,7 @@ macro_rules! define_server_command_enum {
);* $(;)?
) => {
#[enum_dispatch(ServerCommandHandler)]
- #[derive(Debug, PartialEq, EnumString, EnumIter)]
+ #[derive(Debug, EnumString, EnumIter)]
pub enum ServerCommand {
$(
$variant($ty),
diff --git a/core/server/src/http/mapper.rs b/core/server/src/http/mapper.rs
index 879d98338..04bc16396 100644
--- a/core/server/src/http/mapper.rs
+++ b/core/server/src/http/mapper.rs
@@ -24,6 +24,7 @@ use iggy_common::PersonalAccessToken;
use iggy_common::{ConsumerGroupDetails, ConsumerGroupInfo,
ConsumerGroupMember, IggyByteSize};
use iggy_common::{IdentityInfo, PersonalAccessTokenInfo, TokenInfo,
TopicDetails};
use iggy_common::{UserInfo, UserInfoDetails};
+use secrecy::SecretString;
pub fn map_user(user: &User) -> UserInfoDetails {
UserInfoDetails {
@@ -105,7 +106,7 @@ pub fn map_generated_access_token_to_identity_info(token:
GeneratedToken) -> Ide
IdentityInfo {
user_id: token.user_id,
access_token: Some(TokenInfo {
- token: token.access_token,
+ token: SecretString::from(token.access_token),
expiry: token.access_token_expiry,
}),
}
diff --git a/core/server/src/http/personal_access_tokens.rs
b/core/server/src/http/personal_access_tokens.rs
index 209d9237a..1ff235ddb 100644
--- a/core/server/src/http/personal_access_tokens.rs
+++ b/core/server/src/http/personal_access_tokens.rs
@@ -36,6 +36,7 @@ use
iggy_common::create_personal_access_token::CreatePersonalAccessToken;
use iggy_common::delete_personal_access_token::DeletePersonalAccessToken;
use
iggy_common::login_with_personal_access_token::LoginWithPersonalAccessToken;
use iggy_common::{IggyError, RawPersonalAccessToken};
+use secrecy::{ExposeSecret, SecretString};
use std::sync::Arc;
use tracing::instrument;
@@ -92,7 +93,9 @@ async fn create_personal_access_token(
match state.shard.send_to_control_plane(request).await? {
ShardResponse::CreatePersonalAccessTokenResponse(_, token) => {
- Ok(Json(RawPersonalAccessToken { token }))
+ Ok(Json(RawPersonalAccessToken {
+ token: SecretString::from(token),
+ }))
}
ShardResponse::ErrorResponse(err) => Err(err.into()),
_ => unreachable!("Expected CreatePersonalAccessTokenResponse"),
@@ -129,7 +132,7 @@ async fn login_with_personal_access_token(
let user = state
.shard
.shard()
- .login_with_personal_access_token(&command.token, None)
+ .login_with_personal_access_token(command.token.expose_secret(), None)
.error(|e: &IggyError| {
format!("{COMPONENT} (error: {e}) - failed to login with personal
access token")
})?;
diff --git a/core/server/src/http/users.rs b/core/server/src/http/users.rs
index ea0db9b9f..b8d577342 100644
--- a/core/server/src/http/users.rs
+++ b/core/server/src/http/users.rs
@@ -41,6 +41,7 @@ use iggy_common::Identifier;
use iggy_common::IdentityInfo;
use iggy_common::Validatable;
use iggy_common::{IggyError, UserInfo, UserInfoDetails};
+use secrecy::ExposeSecret;
use send_wrapper::SendWrapper;
use serde::Deserialize;
use std::sync::Arc;
@@ -229,7 +230,7 @@ async fn login_user(
let user = state
.shard
.shard()
- .login_user(&command.username, &command.password, None)
+ .login_user(&command.username, command.password.expose_secret(), None)
.error(|e: &IggyError| {
format!(
"{COMPONENT} (error: {e}) - failed to login, username: {}",
diff --git a/core/server/src/shard/execution.rs
b/core/server/src/shard/execution.rs
index 2989b2c6d..1cdaea83d 100644
--- a/core/server/src/shard/execution.rs
+++ b/core/server/src/shard/execution.rs
@@ -46,6 +46,7 @@ use iggy_common::{
purge_stream::PurgeStream, purge_topic::PurgeTopic,
update_permissions::UpdatePermissions,
update_stream::UpdateStream, update_topic::UpdateTopic,
update_user::UpdateUser,
};
+use secrecy::{ExposeSecret, SecretString};
pub struct DeleteStreamResult {
pub stream_id: usize,
}
@@ -558,7 +559,7 @@ pub async fn execute_create_user(
let user = shard.create_user(
&command.username,
- &command.password,
+ command.password.expose_secret(),
command.status,
command.permissions.clone(),
)?;
@@ -570,7 +571,9 @@ pub async fn execute_create_user(
&EntryCommand::CreateUser(CreateUserWithId {
user_id: user.id,
command: iggy_common::create_user::CreateUser {
- password: crypto::hash_password(&command.password),
+ password: SecretString::from(crypto::hash_password(
+ command.password.expose_secret(),
+ )),
..command
},
}),
@@ -626,8 +629,8 @@ pub async fn execute_change_password(
shard.change_password(
&command.user_id,
- &command.current_password,
- &command.new_password,
+ command.current_password.expose_secret(),
+ command.new_password.expose_secret(),
)?;
shard
@@ -635,8 +638,10 @@ pub async fn execute_change_password(
.apply(
user_id,
&EntryCommand::ChangePassword(ChangePassword {
- current_password: String::new(),
- new_password: crypto::hash_password(&command.new_password),
+ current_password: SecretString::from(String::new()),
+ new_password: SecretString::from(crypto::hash_password(
+ command.new_password.expose_secret(),
+ )),
..command
}),
)
diff --git a/core/server/src/state/command.rs b/core/server/src/state/command.rs
index 0cf32621f..b9783d2a6 100644
--- a/core/server/src/state/command.rs
+++ b/core/server/src/state/command.rs
@@ -47,7 +47,7 @@ use iggy_common::{
};
use std::fmt::{Display, Formatter};
-#[derive(Debug, PartialEq)]
+#[derive(Debug)]
pub enum EntryCommand {
CreateStream(CreateStreamWithId),
UpdateStream(UpdateStream),
diff --git a/core/server/src/state/models.rs b/core/server/src/state/models.rs
index 8af43ec88..e0b97106f 100644
--- a/core/server/src/state/models.rs
+++ b/core/server/src/state/models.rs
@@ -51,7 +51,7 @@ pub struct CreateConsumerGroupWithId {
pub command: CreateConsumerGroup,
}
-#[derive(Debug, PartialEq, Serialize, Deserialize)]
+#[derive(Debug, Serialize, Deserialize)]
pub struct CreateUserWithId {
pub user_id: u32,
pub command: CreateUser,
diff --git a/core/server/src/state/system.rs b/core/server/src/state/system.rs
index 35a560164..666d62475 100644
--- a/core/server/src/state/system.rs
+++ b/core/server/src/state/system.rs
@@ -31,6 +31,7 @@ use iggy_common::PersonalAccessToken;
use iggy_common::create_user::CreateUser;
use iggy_common::defaults::DEFAULT_ROOT_USER_ID;
use iggy_common::{IdKind, Identifier, Permissions, UserStatus};
+use secrecy::{ExposeSecret, SecretString};
use std::collections::BTreeMap;
use std::fmt::Display;
use tracing::{debug, error, info};
@@ -74,14 +75,26 @@ pub struct PartitionState {
pub created_at: IggyTimestamp,
}
-#[derive(Debug, Clone)]
+// TODO: consider converting token_hash to SecretString (requires updating the
full hash flow across crates)
+#[derive(Clone)]
pub struct PersonalAccessTokenState {
pub name: String,
pub token_hash: String,
pub expiry_at: Option<IggyTimestamp>,
}
-#[derive(Debug, Clone)]
+impl std::fmt::Debug for PersonalAccessTokenState {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("PersonalAccessTokenState")
+ .field("name", &self.name)
+ .field("token_hash", &"[REDACTED]")
+ .field("expiry_at", &self.expiry_at)
+ .finish()
+ }
+}
+
+// TODO: consider converting password_hash to SecretString (requires updating
the full hash flow across crates)
+#[derive(Clone)]
pub struct UserState {
pub id: u32,
pub username: String,
@@ -92,6 +105,20 @@ pub struct UserState {
pub personal_access_tokens: AHashMap<String, PersonalAccessTokenState>,
}
+impl std::fmt::Debug for UserState {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("UserState")
+ .field("id", &self.id)
+ .field("username", &self.username)
+ .field("password_hash", &"[REDACTED]")
+ .field("status", &self.status)
+ .field("created_at", &self.created_at)
+ .field("permissions", &self.permissions)
+ .field("personal_access_tokens", &self.personal_access_tokens)
+ .finish()
+ }
+}
+
#[derive(Debug, Clone)]
pub struct ConsumerGroupState {
pub id: u32,
@@ -122,7 +149,7 @@ impl SystemState {
let root = create_root_user();
let command = CreateUser {
username: root.username.clone(),
- password: root.password.clone(),
+ password: SecretString::from(root.password.clone()),
status: root.status,
permissions: root.permissions.clone(),
};
@@ -379,7 +406,7 @@ impl SystemState {
let user = UserState {
id: user_id,
username: command.username,
- password_hash: command.password, // This is already
hashed
+ password_hash:
command.password.expose_secret().to_owned(), // This is already hashed
status: command.status,
created_at: entry.timestamp,
permissions: command.permissions,
@@ -408,7 +435,7 @@ impl SystemState {
let user = users
.get_mut(&user_id)
.unwrap_or_else(|| panic!("{}", format!("User:
{user_id} not found")));
- user.password_hash = command.new_password // This is
already hashed
+ user.password_hash =
command.new_password.expose_secret().to_owned() // This is already hashed
}
EntryCommand::UpdatePermissions(command) => {
let user_id = find_user_id(&users, &command.user_id);