This is an automated email from the ASF dual-hosted git repository. maciej pushed a commit to branch connectors-mcp-token-file in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 912778da5acfd30e124a7405144d72bfa3b0c136 Author: Maciej Modzelewski <[email protected]> AuthorDate: Thu Jan 15 11:51:24 2026 +0100 feat(connectors,mcp): Implement reading iggy token from file --- Cargo.lock | 2 ++ core/ai/mcp/Cargo.toml | 1 + core/ai/mcp/src/error.rs | 6 ++++ core/ai/mcp/src/stream.rs | 51 +++++++++++++++++++++++++++-- core/connectors/runtime/Cargo.toml | 1 + core/connectors/runtime/src/error.rs | 9 ++++++ core/connectors/runtime/src/stream.rs | 61 ++++++++++++++++++++++++++++++++--- 7 files changed, 124 insertions(+), 7 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a9e03e889..cb266009f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4592,6 +4592,7 @@ dependencies = [ "chrono", "clap", "dashmap", + "dirs", "dlopen2", "dotenvy", "figlet-rs", @@ -4627,6 +4628,7 @@ version = "0.2.1-edge.1" dependencies = [ "axum", "axum-server", + "dirs", "dotenvy", "figlet-rs", "figment", diff --git a/core/ai/mcp/Cargo.toml b/core/ai/mcp/Cargo.toml index b1046754d..164d940f9 100644 --- a/core/ai/mcp/Cargo.toml +++ b/core/ai/mcp/Cargo.toml @@ -29,6 +29,7 @@ readme = "README.md" [dependencies] axum = { workspace = true } axum-server = { workspace = true } +dirs = { workspace = true } dotenvy = { workspace = true } figlet-rs = { workspace = true } figment = { workspace = true } diff --git a/core/ai/mcp/src/error.rs b/core/ai/mcp/src/error.rs index 950056020..966397315 100644 --- a/core/ai/mcp/src/error.rs +++ b/core/ai/mcp/src/error.rs @@ -37,4 +37,10 @@ pub enum McpRuntimeError { FailedToCreateConsumerId, #[error("Invalid API path")] InvalidApiPath, + #[error("Token file not found: {0}")] + TokenFileNotFound(String), + #[error("Failed to read token file '{0}': {1}")] + TokenFileReadError(String, String), + #[error("Token file is empty: {0}")] + TokenFileEmpty(String), } diff --git a/core/ai/mcp/src/stream.rs b/core/ai/mcp/src/stream.rs index 9ef9be6f4..aa5a8c858 100644 --- a/core/ai/mcp/src/stream.rs +++ b/core/ai/mcp/src/stream.rs @@ -19,15 +19,62 @@ use crate::{configs::IggyConfig, error::McpRuntimeError}; use iggy::prelude::{Client, IggyClient, IggyClientBuilder}; +use std::path::PathBuf; use tracing::{error, info}; +const TOKEN_FILE_PREFIX: &str = "file:"; + +fn expand_home(path: &str) -> PathBuf { + if let Some(rest) = path.strip_prefix("~/") { + if let Some(home) = dirs::home_dir() { + return home.join(rest); + } + } else if path == "~" + && let Some(home) = dirs::home_dir() + { + return home; + } + PathBuf::from(path) +} + +fn resolve_token(token: &str) -> Result<String, McpRuntimeError> { + if let Some(path) = token.strip_prefix(TOKEN_FILE_PREFIX) { + let file_path = expand_home(path); + + if !file_path.exists() { + error!("Token file does not exist: {}", path); + return Err(McpRuntimeError::TokenFileNotFound(path.to_string())); + } + + let content = std::fs::read_to_string(&file_path).map_err(|e| { + error!("Failed to read token file '{}': {}", path, e); + McpRuntimeError::TokenFileReadError(path.to_string(), e.to_string()) + })?; + + let trimmed = content.trim().to_string(); + + if trimmed.is_empty() { + error!("Token file is empty: {}", path); + return Err(McpRuntimeError::TokenFileEmpty(path.to_string())); + } + + Ok(trimmed) + } else { + Ok(token.to_string()) + } +} + pub async fn init(config: IggyConfig) -> Result<IggyClient, McpRuntimeError> { let address = config.address; let username = config.username; let password = config.password; - let token = config.token; + let token = if config.token.is_empty() { + None + } else { + Some(resolve_token(&config.token)?) + }; - let connection_string = if !token.is_empty() { + let connection_string = if let Some(token) = token { let redacted_token = token.chars().take(3).collect::<String>(); info!("Using token: {redacted_token}*** for Iggy authentication"); format!("iggy://{token}@{address}") diff --git a/core/connectors/runtime/Cargo.toml b/core/connectors/runtime/Cargo.toml index 7c0b2ff43..28cbf24c1 100644 --- a/core/connectors/runtime/Cargo.toml +++ b/core/connectors/runtime/Cargo.toml @@ -35,6 +35,7 @@ axum-server = { workspace = true } chrono = { workspace = true } clap = { workspace = true } dashmap = { workspace = true } +dirs = { workspace = true } dlopen2 = { workspace = true } dotenvy = { workspace = true } figlet-rs = { workspace = true } diff --git a/core/connectors/runtime/src/error.rs b/core/connectors/runtime/src/error.rs index b392b60b6..0f6705faf 100644 --- a/core/connectors/runtime/src/error.rs +++ b/core/connectors/runtime/src/error.rs @@ -57,6 +57,12 @@ pub enum RuntimeError { IoError(#[from] std::io::Error), #[error("HTTP request failed: {0}")] HttpRequestFailed(String), + #[error("Token file not found: {0}")] + TokenFileNotFound(String), + #[error("Failed to read token file '{0}': {1}")] + TokenFileReadError(String, String), + #[error("Token file is empty: {0}")] + TokenFileEmpty(String), } impl RuntimeError { @@ -69,6 +75,9 @@ impl RuntimeError { RuntimeError::MissingIggyCredentials => "invalid_configuration", RuntimeError::InvalidConfiguration(_) => "invalid_configuration", RuntimeError::HttpRequestFailed(_) => "http_request_failed", + RuntimeError::TokenFileNotFound(_) => "invalid_configuration", + RuntimeError::TokenFileReadError(_, _) => "invalid_configuration", + RuntimeError::TokenFileEmpty(_) => "invalid_configuration", _ => "error", } } diff --git a/core/connectors/runtime/src/stream.rs b/core/connectors/runtime/src/stream.rs index 79a5b8b98..e8d1b20d5 100644 --- a/core/connectors/runtime/src/stream.rs +++ b/core/connectors/runtime/src/stream.rs @@ -18,30 +18,81 @@ */ use iggy::prelude::{Client, IggyClient, IggyClientBuilder}; +use std::path::PathBuf; use tracing::{error, info}; use crate::configs::runtime::IggyConfig; use crate::error::RuntimeError; +const TOKEN_FILE_PREFIX: &str = "file:"; + +fn expand_home(path: &str) -> PathBuf { + if let Some(rest) = path.strip_prefix("~/") { + if let Some(home) = dirs::home_dir() { + return home.join(rest); + } + } else if path == "~" + && let Some(home) = dirs::home_dir() + { + return home; + } + PathBuf::from(path) +} + +fn resolve_token(token: &str) -> Result<String, RuntimeError> { + if let Some(path) = token.strip_prefix(TOKEN_FILE_PREFIX) { + let file_path = expand_home(path); + + if !file_path.exists() { + error!("Token file does not exist: {}", path); + return Err(RuntimeError::TokenFileNotFound(path.to_string())); + } + + let content = std::fs::read_to_string(&file_path).map_err(|e| { + error!("Failed to read token file '{}': {}", path, e); + RuntimeError::TokenFileReadError(path.to_string(), e.to_string()) + })?; + + let trimmed = content.trim().to_string(); + + if trimmed.is_empty() { + error!("Token file is empty: {}", path); + return Err(RuntimeError::TokenFileEmpty(path.to_string())); + } + + Ok(trimmed) + } else { + Ok(token.to_string()) + } +} + pub struct IggyClients { pub producer: IggyClient, pub consumer: IggyClient, } pub async fn init(config: IggyConfig) -> Result<IggyClients, RuntimeError> { - let consumer = create_client(&config).await?; - let producer = create_client(&config).await?; + let token = if config.token.is_empty() { + None + } else { + Some(resolve_token(&config.token)?) + }; + + let consumer = create_client(&config, token.as_deref()).await?; + let producer = create_client(&config, token.as_deref()).await?; let iggy_clients = IggyClients { producer, consumer }; Ok(iggy_clients) } -async fn create_client(config: &IggyConfig) -> Result<IggyClient, RuntimeError> { +async fn create_client( + config: &IggyConfig, + token: Option<&str>, +) -> Result<IggyClient, RuntimeError> { let address = config.address.to_owned(); let username = config.username.to_owned(); let password = config.password.to_owned(); - let token = config.token.to_owned(); - let connection_string = if !token.is_empty() { + let connection_string = if let Some(token) = token { let redacted_token = token.chars().take(3).collect::<String>(); info!("Using token: {redacted_token}*** for Iggy authentication"); format!("iggy://{token}@{address}")
