This is an automated email from the ASF dual-hosted git repository.
piotr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new 8eebb4108 feat(connectors,mcp): Implement reading iggy token from file
(#2565)
8eebb4108 is described below
commit 8eebb41082533cc0f1430055726458b2036a2a46
Author: Maciej Modzelewski <[email protected]>
AuthorDate: Thu Jan 15 14:32:51 2026 +0100
feat(connectors,mcp): Implement reading iggy token from file (#2565)
---
Cargo.lock | 8 +-
DEPENDENCIES.md | 4 +-
core/ai/mcp/Cargo.toml | 6 +-
core/ai/mcp/src/error.rs | 6 ++
core/ai/mcp/src/stream.rs | 155 +++++++++++++++++++++++++++++++-
core/connectors/runtime/Cargo.toml | 6 +-
core/connectors/runtime/src/error.rs | 9 ++
core/connectors/runtime/src/stream.rs | 165 ++++++++++++++++++++++++++++++++--
8 files changed, 346 insertions(+), 13 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index a9e03e889..c333aaabe 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -4584,7 +4584,7 @@ dependencies = [
[[package]]
name = "iggy-connectors"
-version = "0.2.1-edge.1"
+version = "0.2.1-edge.2"
dependencies = [
"async-trait",
"axum",
@@ -4592,6 +4592,7 @@ dependencies = [
"chrono",
"clap",
"dashmap",
+ "dirs",
"dlopen2",
"dotenvy",
"figlet-rs",
@@ -4613,6 +4614,7 @@ dependencies = [
"serde_with",
"serde_yaml_ng",
"strum 0.27.2",
+ "tempfile",
"thiserror 2.0.17",
"tokio",
"toml 0.9.10+spec-1.1.0",
@@ -4623,10 +4625,11 @@ dependencies = [
[[package]]
name = "iggy-mcp"
-version = "0.2.1-edge.1"
+version = "0.2.1-edge.2"
dependencies = [
"axum",
"axum-server",
+ "dirs",
"dotenvy",
"figlet-rs",
"figment",
@@ -4636,6 +4639,7 @@ dependencies = [
"serde",
"serde_json",
"strum 0.27.2",
+ "tempfile",
"thiserror 2.0.17",
"tokio",
"tower-http",
diff --git a/DEPENDENCIES.md b/DEPENDENCIES.md
index ac097d204..2da51928b 100644
--- a/DEPENDENCIES.md
+++ b/DEPENDENCIES.md
@@ -396,8 +396,8 @@ iggy: 0.8.1-edge.3, "Apache-2.0",
iggy-bench: 0.3.1-edge.1, "Apache-2.0",
iggy-bench-dashboard-server: 0.5.1-edge.1, "Apache-2.0",
iggy-cli: 0.10.1-edge.1, "Apache-2.0",
-iggy-connectors: 0.2.1-edge.1, "Apache-2.0",
-iggy-mcp: 0.2.1-edge.1, "Apache-2.0",
+iggy-connectors: 0.2.1-edge.2, "Apache-2.0",
+iggy-mcp: 0.2.1-edge.2, "Apache-2.0",
iggy_binary_protocol: 0.8.1-edge.1, "Apache-2.0",
iggy_common: 0.8.1-edge.1, "Apache-2.0",
iggy_connector_elasticsearch_sink: 0.1.0, "Apache-2.0",
diff --git a/core/ai/mcp/Cargo.toml b/core/ai/mcp/Cargo.toml
index b1046754d..b5cb5b0e0 100644
--- a/core/ai/mcp/Cargo.toml
+++ b/core/ai/mcp/Cargo.toml
@@ -17,7 +17,7 @@
[package]
name = "iggy-mcp"
-version = "0.2.1-edge.1"
+version = "0.2.1-edge.2"
description = "MCP Server for Iggy message streaming platform"
edition = "2024"
license = "Apache-2.0"
@@ -29,6 +29,7 @@ readme = "README.md"
[dependencies]
axum = { workspace = true }
axum-server = { workspace = true }
+dirs = { workspace = true }
dotenvy = { workspace = true }
figlet-rs = { workspace = true }
figment = { workspace = true }
@@ -47,3 +48,6 @@ tokio = { workspace = true }
tower-http = { workspace = true }
tracing = { workspace = true }
tracing-subscriber = { workspace = true }
+
+[dev-dependencies]
+tempfile = { workspace = true }
diff --git a/core/ai/mcp/src/error.rs b/core/ai/mcp/src/error.rs
index 950056020..966397315 100644
--- a/core/ai/mcp/src/error.rs
+++ b/core/ai/mcp/src/error.rs
@@ -37,4 +37,10 @@ pub enum McpRuntimeError {
FailedToCreateConsumerId,
#[error("Invalid API path")]
InvalidApiPath,
+ #[error("Token file not found: {0}")]
+ TokenFileNotFound(String),
+ #[error("Failed to read token file '{0}': {1}")]
+ TokenFileReadError(String, String),
+ #[error("Token file is empty: {0}")]
+ TokenFileEmpty(String),
}
diff --git a/core/ai/mcp/src/stream.rs b/core/ai/mcp/src/stream.rs
index 9ef9be6f4..184c44ec8 100644
--- a/core/ai/mcp/src/stream.rs
+++ b/core/ai/mcp/src/stream.rs
@@ -19,15 +19,62 @@
use crate::{configs::IggyConfig, error::McpRuntimeError};
use iggy::prelude::{Client, IggyClient, IggyClientBuilder};
+use std::path::PathBuf;
use tracing::{error, info};
+const TOKEN_FILE_PREFIX: &str = "file:";
+
+fn expand_home(path: &str) -> PathBuf {
+ if let Some(rest) = path.strip_prefix("~/") {
+ if let Some(home) = dirs::home_dir() {
+ return home.join(rest);
+ }
+ } else if path == "~"
+ && let Some(home) = dirs::home_dir()
+ {
+ return home;
+ }
+ PathBuf::from(path)
+}
+
+fn resolve_token(token: &str) -> Result<String, McpRuntimeError> {
+ if let Some(path) = token.strip_prefix(TOKEN_FILE_PREFIX) {
+ let file_path = expand_home(path);
+
+ if !file_path.exists() {
+ error!("Token file does not exist: {}", path);
+ return Err(McpRuntimeError::TokenFileNotFound(path.to_string()));
+ }
+
+ let content = std::fs::read_to_string(&file_path).map_err(|e| {
+ error!("Failed to read token file '{}': {}", path, e);
+ McpRuntimeError::TokenFileReadError(path.to_string(),
e.to_string())
+ })?;
+
+ let trimmed = content.trim().to_string();
+
+ if trimmed.is_empty() {
+ error!("Token file is empty: {}", path);
+ return Err(McpRuntimeError::TokenFileEmpty(path.to_string()));
+ }
+
+ Ok(trimmed)
+ } else {
+ Ok(token.to_string())
+ }
+}
+
pub async fn init(config: IggyConfig) -> Result<IggyClient, McpRuntimeError> {
let address = config.address;
let username = config.username;
let password = config.password;
- let token = config.token;
+ let token = if config.token.is_empty() {
+ None
+ } else {
+ Some(resolve_token(&config.token)?)
+ };
- let connection_string = if !token.is_empty() {
+ let connection_string = if let Some(token) = token {
let redacted_token = token.chars().take(3).collect::<String>();
info!("Using token: {redacted_token}*** for Iggy authentication");
format!("iggy://{token}@{address}")
@@ -73,3 +120,107 @@ pub async fn init(config: IggyConfig) ->
Result<IggyClient, McpRuntimeError> {
client.connect().await?;
Ok(client)
}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use std::io::Write;
+ use tempfile::NamedTempFile;
+
+ #[test]
+ fn test_expand_home_with_tilde_prefix() {
+ let path = "~/some/path";
+ let result = expand_home(path);
+
+ if let Some(home) = dirs::home_dir() {
+ assert_eq!(result, home.join("some/path"));
+ } else {
+ assert_eq!(result, PathBuf::from(path));
+ }
+ }
+
+ #[test]
+ fn test_expand_home_with_only_tilde() {
+ let path = "~";
+ let result = expand_home(path);
+
+ if let Some(home) = dirs::home_dir() {
+ assert_eq!(result, home);
+ } else {
+ assert_eq!(result, PathBuf::from(path));
+ }
+ }
+
+ #[test]
+ fn test_expand_home_without_tilde() {
+ let path = "/absolute/path";
+ let result = expand_home(path);
+ assert_eq!(result, PathBuf::from("/absolute/path"));
+ }
+
+ #[test]
+ fn test_expand_home_relative_path() {
+ let path = "relative/path";
+ let result = expand_home(path);
+ assert_eq!(result, PathBuf::from("relative/path"));
+ }
+
+ #[test]
+ fn test_resolve_token_direct_value() {
+ let token = "my-secret-token";
+ let result = resolve_token(token).unwrap();
+ assert_eq!(result, "my-secret-token");
+ }
+
+ #[test]
+ fn test_resolve_token_from_file() {
+ let mut temp_file = NamedTempFile::new().unwrap();
+ writeln!(temp_file, "token-from-file").unwrap();
+
+ let token = format!("file:{}", temp_file.path().display());
+ let result = resolve_token(&token).unwrap();
+ assert_eq!(result, "token-from-file");
+ }
+
+ #[test]
+ fn test_resolve_token_from_file_trims_whitespace() {
+ let mut temp_file = NamedTempFile::new().unwrap();
+ writeln!(temp_file, " token-with-spaces \n\n").unwrap();
+
+ let token = format!("file:{}", temp_file.path().display());
+ let result = resolve_token(&token).unwrap();
+ assert_eq!(result, "token-with-spaces");
+ }
+
+ #[test]
+ fn test_resolve_token_file_not_found() {
+ let token = "file:/nonexistent/path/to/token.txt";
+ let result = resolve_token(token);
+
+ assert!(result.is_err());
+ assert!(matches!(result, Err(McpRuntimeError::TokenFileNotFound(_))));
+ }
+
+ #[test]
+ fn test_resolve_token_empty_file() {
+ let temp_file = NamedTempFile::new().unwrap();
+
+ let token = format!("file:{}", temp_file.path().display());
+ let result = resolve_token(&token);
+
+ assert!(result.is_err());
+ assert!(matches!(result, Err(McpRuntimeError::TokenFileEmpty(_))));
+ }
+
+ #[test]
+ fn test_resolve_token_file_with_only_whitespace() {
+ let mut temp_file = NamedTempFile::new().unwrap();
+ writeln!(temp_file, " \n\t\n ").unwrap();
+
+ let token = format!("file:{}", temp_file.path().display());
+ let result = resolve_token(&token);
+
+ assert!(result.is_err());
+ assert!(matches!(result, Err(McpRuntimeError::TokenFileEmpty(_))));
+ }
+}
diff --git a/core/connectors/runtime/Cargo.toml
b/core/connectors/runtime/Cargo.toml
index 7c0b2ff43..2f2d18971 100644
--- a/core/connectors/runtime/Cargo.toml
+++ b/core/connectors/runtime/Cargo.toml
@@ -17,7 +17,7 @@
[package]
name = "iggy-connectors"
-version = "0.2.1-edge.1"
+version = "0.2.1-edge.2"
description = "Connectors runtime for Iggy message streaming platform"
edition = "2024"
license = "Apache-2.0"
@@ -35,6 +35,7 @@ axum-server = { workspace = true }
chrono = { workspace = true }
clap = { workspace = true }
dashmap = { workspace = true }
+dirs = { workspace = true }
dlopen2 = { workspace = true }
dotenvy = { workspace = true }
figlet-rs = { workspace = true }
@@ -62,3 +63,6 @@ toml = { workspace = true }
tower-http = { workspace = true }
tracing = { workspace = true }
tracing-subscriber = { workspace = true }
+
+[dev-dependencies]
+tempfile = { workspace = true }
diff --git a/core/connectors/runtime/src/error.rs
b/core/connectors/runtime/src/error.rs
index b392b60b6..0f6705faf 100644
--- a/core/connectors/runtime/src/error.rs
+++ b/core/connectors/runtime/src/error.rs
@@ -57,6 +57,12 @@ pub enum RuntimeError {
IoError(#[from] std::io::Error),
#[error("HTTP request failed: {0}")]
HttpRequestFailed(String),
+ #[error("Token file not found: {0}")]
+ TokenFileNotFound(String),
+ #[error("Failed to read token file '{0}': {1}")]
+ TokenFileReadError(String, String),
+ #[error("Token file is empty: {0}")]
+ TokenFileEmpty(String),
}
impl RuntimeError {
@@ -69,6 +75,9 @@ impl RuntimeError {
RuntimeError::MissingIggyCredentials => "invalid_configuration",
RuntimeError::InvalidConfiguration(_) => "invalid_configuration",
RuntimeError::HttpRequestFailed(_) => "http_request_failed",
+ RuntimeError::TokenFileNotFound(_) => "invalid_configuration",
+ RuntimeError::TokenFileReadError(_, _) => "invalid_configuration",
+ RuntimeError::TokenFileEmpty(_) => "invalid_configuration",
_ => "error",
}
}
diff --git a/core/connectors/runtime/src/stream.rs
b/core/connectors/runtime/src/stream.rs
index 79a5b8b98..c2732d597 100644
--- a/core/connectors/runtime/src/stream.rs
+++ b/core/connectors/runtime/src/stream.rs
@@ -18,30 +18,81 @@
*/
use iggy::prelude::{Client, IggyClient, IggyClientBuilder};
+use std::path::PathBuf;
use tracing::{error, info};
use crate::configs::runtime::IggyConfig;
use crate::error::RuntimeError;
+const TOKEN_FILE_PREFIX: &str = "file:";
+
+fn expand_home(path: &str) -> PathBuf {
+ if let Some(rest) = path.strip_prefix("~/") {
+ if let Some(home) = dirs::home_dir() {
+ return home.join(rest);
+ }
+ } else if path == "~"
+ && let Some(home) = dirs::home_dir()
+ {
+ return home;
+ }
+ PathBuf::from(path)
+}
+
+fn resolve_token(token: &str) -> Result<String, RuntimeError> {
+ if let Some(path) = token.strip_prefix(TOKEN_FILE_PREFIX) {
+ let file_path = expand_home(path);
+
+ if !file_path.exists() {
+ error!("Token file does not exist: {}", path);
+ return Err(RuntimeError::TokenFileNotFound(path.to_string()));
+ }
+
+ let content = std::fs::read_to_string(&file_path).map_err(|e| {
+ error!("Failed to read token file '{}': {}", path, e);
+ RuntimeError::TokenFileReadError(path.to_string(), e.to_string())
+ })?;
+
+ let trimmed = content.trim().to_string();
+
+ if trimmed.is_empty() {
+ error!("Token file is empty: {}", path);
+ return Err(RuntimeError::TokenFileEmpty(path.to_string()));
+ }
+
+ Ok(trimmed)
+ } else {
+ Ok(token.to_string())
+ }
+}
+
pub struct IggyClients {
pub producer: IggyClient,
pub consumer: IggyClient,
}
pub async fn init(config: IggyConfig) -> Result<IggyClients, RuntimeError> {
- let consumer = create_client(&config).await?;
- let producer = create_client(&config).await?;
+ let token = if config.token.is_empty() {
+ None
+ } else {
+ Some(resolve_token(&config.token)?)
+ };
+
+ let consumer = create_client(&config, token.as_deref()).await?;
+ let producer = create_client(&config, token.as_deref()).await?;
let iggy_clients = IggyClients { producer, consumer };
Ok(iggy_clients)
}
-async fn create_client(config: &IggyConfig) -> Result<IggyClient,
RuntimeError> {
+async fn create_client(
+ config: &IggyConfig,
+ token: Option<&str>,
+) -> Result<IggyClient, RuntimeError> {
let address = config.address.to_owned();
let username = config.username.to_owned();
let password = config.password.to_owned();
- let token = config.token.to_owned();
- let connection_string = if !token.is_empty() {
+ let connection_string = if let Some(token) = token {
let redacted_token = token.chars().take(3).collect::<String>();
info!("Using token: {redacted_token}*** for Iggy authentication");
format!("iggy://{token}@{address}")
@@ -87,3 +138,107 @@ async fn create_client(config: &IggyConfig) ->
Result<IggyClient, RuntimeError>
client.connect().await?;
Ok(client)
}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use std::io::Write;
+ use tempfile::NamedTempFile;
+
+ #[test]
+ fn test_expand_home_with_tilde_prefix() {
+ let path = "~/some/path";
+ let result = expand_home(path);
+
+ if let Some(home) = dirs::home_dir() {
+ assert_eq!(result, home.join("some/path"));
+ } else {
+ assert_eq!(result, PathBuf::from(path));
+ }
+ }
+
+ #[test]
+ fn test_expand_home_with_only_tilde() {
+ let path = "~";
+ let result = expand_home(path);
+
+ if let Some(home) = dirs::home_dir() {
+ assert_eq!(result, home);
+ } else {
+ assert_eq!(result, PathBuf::from(path));
+ }
+ }
+
+ #[test]
+ fn test_expand_home_without_tilde() {
+ let path = "/absolute/path";
+ let result = expand_home(path);
+ assert_eq!(result, PathBuf::from("/absolute/path"));
+ }
+
+ #[test]
+ fn test_expand_home_relative_path() {
+ let path = "relative/path";
+ let result = expand_home(path);
+ assert_eq!(result, PathBuf::from("relative/path"));
+ }
+
+ #[test]
+ fn test_resolve_token_direct_value() {
+ let token = "my-secret-token";
+ let result = resolve_token(token).unwrap();
+ assert_eq!(result, "my-secret-token");
+ }
+
+ #[test]
+ fn test_resolve_token_from_file() {
+ let mut temp_file = NamedTempFile::new().unwrap();
+ writeln!(temp_file, "token-from-file").unwrap();
+
+ let token = format!("file:{}", temp_file.path().display());
+ let result = resolve_token(&token).unwrap();
+ assert_eq!(result, "token-from-file");
+ }
+
+ #[test]
+ fn test_resolve_token_from_file_trims_whitespace() {
+ let mut temp_file = NamedTempFile::new().unwrap();
+ writeln!(temp_file, " token-with-spaces \n\n").unwrap();
+
+ let token = format!("file:{}", temp_file.path().display());
+ let result = resolve_token(&token).unwrap();
+ assert_eq!(result, "token-with-spaces");
+ }
+
+ #[test]
+ fn test_resolve_token_file_not_found() {
+ let token = "file:/nonexistent/path/to/token.txt";
+ let result = resolve_token(token);
+
+ assert!(result.is_err());
+ assert!(matches!(result, Err(RuntimeError::TokenFileNotFound(_))));
+ }
+
+ #[test]
+ fn test_resolve_token_empty_file() {
+ let temp_file = NamedTempFile::new().unwrap();
+
+ let token = format!("file:{}", temp_file.path().display());
+ let result = resolve_token(&token);
+
+ assert!(result.is_err());
+ assert!(matches!(result, Err(RuntimeError::TokenFileEmpty(_))));
+ }
+
+ #[test]
+ fn test_resolve_token_file_with_only_whitespace() {
+ let mut temp_file = NamedTempFile::new().unwrap();
+ writeln!(temp_file, " \n\t\n ").unwrap();
+
+ let token = format!("file:{}", temp_file.path().display());
+ let result = resolve_token(&token);
+
+ assert!(result.is_err());
+ assert!(matches!(result, Err(RuntimeError::TokenFileEmpty(_))));
+ }
+}