This is an automated email from the ASF dual-hosted git repository.

piotr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/master by this push:
     new 8eebb4108 feat(connectors,mcp): Implement reading iggy token from file 
(#2565)
8eebb4108 is described below

commit 8eebb41082533cc0f1430055726458b2036a2a46
Author: Maciej Modzelewski <[email protected]>
AuthorDate: Thu Jan 15 14:32:51 2026 +0100

    feat(connectors,mcp): Implement reading iggy token from file (#2565)
---
 Cargo.lock                            |   8 +-
 DEPENDENCIES.md                       |   4 +-
 core/ai/mcp/Cargo.toml                |   6 +-
 core/ai/mcp/src/error.rs              |   6 ++
 core/ai/mcp/src/stream.rs             | 155 +++++++++++++++++++++++++++++++-
 core/connectors/runtime/Cargo.toml    |   6 +-
 core/connectors/runtime/src/error.rs  |   9 ++
 core/connectors/runtime/src/stream.rs | 165 ++++++++++++++++++++++++++++++++--
 8 files changed, 346 insertions(+), 13 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index a9e03e889..c333aaabe 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -4584,7 +4584,7 @@ dependencies = [
 
 [[package]]
 name = "iggy-connectors"
-version = "0.2.1-edge.1"
+version = "0.2.1-edge.2"
 dependencies = [
  "async-trait",
  "axum",
@@ -4592,6 +4592,7 @@ dependencies = [
  "chrono",
  "clap",
  "dashmap",
+ "dirs",
  "dlopen2",
  "dotenvy",
  "figlet-rs",
@@ -4613,6 +4614,7 @@ dependencies = [
  "serde_with",
  "serde_yaml_ng",
  "strum 0.27.2",
+ "tempfile",
  "thiserror 2.0.17",
  "tokio",
  "toml 0.9.10+spec-1.1.0",
@@ -4623,10 +4625,11 @@ dependencies = [
 
 [[package]]
 name = "iggy-mcp"
-version = "0.2.1-edge.1"
+version = "0.2.1-edge.2"
 dependencies = [
  "axum",
  "axum-server",
+ "dirs",
  "dotenvy",
  "figlet-rs",
  "figment",
@@ -4636,6 +4639,7 @@ dependencies = [
  "serde",
  "serde_json",
  "strum 0.27.2",
+ "tempfile",
  "thiserror 2.0.17",
  "tokio",
  "tower-http",
diff --git a/DEPENDENCIES.md b/DEPENDENCIES.md
index ac097d204..2da51928b 100644
--- a/DEPENDENCIES.md
+++ b/DEPENDENCIES.md
@@ -396,8 +396,8 @@ iggy: 0.8.1-edge.3, "Apache-2.0",
 iggy-bench: 0.3.1-edge.1, "Apache-2.0",
 iggy-bench-dashboard-server: 0.5.1-edge.1, "Apache-2.0",
 iggy-cli: 0.10.1-edge.1, "Apache-2.0",
-iggy-connectors: 0.2.1-edge.1, "Apache-2.0",
-iggy-mcp: 0.2.1-edge.1, "Apache-2.0",
+iggy-connectors: 0.2.1-edge.2, "Apache-2.0",
+iggy-mcp: 0.2.1-edge.2, "Apache-2.0",
 iggy_binary_protocol: 0.8.1-edge.1, "Apache-2.0",
 iggy_common: 0.8.1-edge.1, "Apache-2.0",
 iggy_connector_elasticsearch_sink: 0.1.0, "Apache-2.0",
diff --git a/core/ai/mcp/Cargo.toml b/core/ai/mcp/Cargo.toml
index b1046754d..b5cb5b0e0 100644
--- a/core/ai/mcp/Cargo.toml
+++ b/core/ai/mcp/Cargo.toml
@@ -17,7 +17,7 @@
 
 [package]
 name = "iggy-mcp"
-version = "0.2.1-edge.1"
+version = "0.2.1-edge.2"
 description = "MCP Server for Iggy message streaming platform"
 edition = "2024"
 license = "Apache-2.0"
@@ -29,6 +29,7 @@ readme = "README.md"
 [dependencies]
 axum = { workspace = true }
 axum-server = { workspace = true }
+dirs = { workspace = true }
 dotenvy = { workspace = true }
 figlet-rs = { workspace = true }
 figment = { workspace = true }
@@ -47,3 +48,6 @@ tokio = { workspace = true }
 tower-http = { workspace = true }
 tracing = { workspace = true }
 tracing-subscriber = { workspace = true }
+
+[dev-dependencies]
+tempfile = { workspace = true }
diff --git a/core/ai/mcp/src/error.rs b/core/ai/mcp/src/error.rs
index 950056020..966397315 100644
--- a/core/ai/mcp/src/error.rs
+++ b/core/ai/mcp/src/error.rs
@@ -37,4 +37,10 @@ pub enum McpRuntimeError {
     FailedToCreateConsumerId,
     #[error("Invalid API path")]
     InvalidApiPath,
+    #[error("Token file not found: {0}")]
+    TokenFileNotFound(String),
+    #[error("Failed to read token file '{0}': {1}")]
+    TokenFileReadError(String, String),
+    #[error("Token file is empty: {0}")]
+    TokenFileEmpty(String),
 }
diff --git a/core/ai/mcp/src/stream.rs b/core/ai/mcp/src/stream.rs
index 9ef9be6f4..184c44ec8 100644
--- a/core/ai/mcp/src/stream.rs
+++ b/core/ai/mcp/src/stream.rs
@@ -19,15 +19,62 @@
 
 use crate::{configs::IggyConfig, error::McpRuntimeError};
 use iggy::prelude::{Client, IggyClient, IggyClientBuilder};
+use std::path::PathBuf;
 use tracing::{error, info};
 
+const TOKEN_FILE_PREFIX: &str = "file:";
+
+fn expand_home(path: &str) -> PathBuf {
+    if let Some(rest) = path.strip_prefix("~/") {
+        if let Some(home) = dirs::home_dir() {
+            return home.join(rest);
+        }
+    } else if path == "~"
+        && let Some(home) = dirs::home_dir()
+    {
+        return home;
+    }
+    PathBuf::from(path)
+}
+
+fn resolve_token(token: &str) -> Result<String, McpRuntimeError> {
+    if let Some(path) = token.strip_prefix(TOKEN_FILE_PREFIX) {
+        let file_path = expand_home(path);
+
+        if !file_path.exists() {
+            error!("Token file does not exist: {}", path);
+            return Err(McpRuntimeError::TokenFileNotFound(path.to_string()));
+        }
+
+        let content = std::fs::read_to_string(&file_path).map_err(|e| {
+            error!("Failed to read token file '{}': {}", path, e);
+            McpRuntimeError::TokenFileReadError(path.to_string(), 
e.to_string())
+        })?;
+
+        let trimmed = content.trim().to_string();
+
+        if trimmed.is_empty() {
+            error!("Token file is empty: {}", path);
+            return Err(McpRuntimeError::TokenFileEmpty(path.to_string()));
+        }
+
+        Ok(trimmed)
+    } else {
+        Ok(token.to_string())
+    }
+}
+
 pub async fn init(config: IggyConfig) -> Result<IggyClient, McpRuntimeError> {
     let address = config.address;
     let username = config.username;
     let password = config.password;
-    let token = config.token;
+    let token = if config.token.is_empty() {
+        None
+    } else {
+        Some(resolve_token(&config.token)?)
+    };
 
-    let connection_string = if !token.is_empty() {
+    let connection_string = if let Some(token) = token {
         let redacted_token = token.chars().take(3).collect::<String>();
         info!("Using token: {redacted_token}*** for Iggy authentication");
         format!("iggy://{token}@{address}")
@@ -73,3 +120,107 @@ pub async fn init(config: IggyConfig) -> 
Result<IggyClient, McpRuntimeError> {
     client.connect().await?;
     Ok(client)
 }
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use std::io::Write;
+    use tempfile::NamedTempFile;
+
+    #[test]
+    fn test_expand_home_with_tilde_prefix() {
+        let path = "~/some/path";
+        let result = expand_home(path);
+
+        if let Some(home) = dirs::home_dir() {
+            assert_eq!(result, home.join("some/path"));
+        } else {
+            assert_eq!(result, PathBuf::from(path));
+        }
+    }
+
+    #[test]
+    fn test_expand_home_with_only_tilde() {
+        let path = "~";
+        let result = expand_home(path);
+
+        if let Some(home) = dirs::home_dir() {
+            assert_eq!(result, home);
+        } else {
+            assert_eq!(result, PathBuf::from(path));
+        }
+    }
+
+    #[test]
+    fn test_expand_home_without_tilde() {
+        let path = "/absolute/path";
+        let result = expand_home(path);
+        assert_eq!(result, PathBuf::from("/absolute/path"));
+    }
+
+    #[test]
+    fn test_expand_home_relative_path() {
+        let path = "relative/path";
+        let result = expand_home(path);
+        assert_eq!(result, PathBuf::from("relative/path"));
+    }
+
+    #[test]
+    fn test_resolve_token_direct_value() {
+        let token = "my-secret-token";
+        let result = resolve_token(token).unwrap();
+        assert_eq!(result, "my-secret-token");
+    }
+
+    #[test]
+    fn test_resolve_token_from_file() {
+        let mut temp_file = NamedTempFile::new().unwrap();
+        writeln!(temp_file, "token-from-file").unwrap();
+
+        let token = format!("file:{}", temp_file.path().display());
+        let result = resolve_token(&token).unwrap();
+        assert_eq!(result, "token-from-file");
+    }
+
+    #[test]
+    fn test_resolve_token_from_file_trims_whitespace() {
+        let mut temp_file = NamedTempFile::new().unwrap();
+        writeln!(temp_file, "  token-with-spaces  \n\n").unwrap();
+
+        let token = format!("file:{}", temp_file.path().display());
+        let result = resolve_token(&token).unwrap();
+        assert_eq!(result, "token-with-spaces");
+    }
+
+    #[test]
+    fn test_resolve_token_file_not_found() {
+        let token = "file:/nonexistent/path/to/token.txt";
+        let result = resolve_token(token);
+
+        assert!(result.is_err());
+        assert!(matches!(result, Err(McpRuntimeError::TokenFileNotFound(_))));
+    }
+
+    #[test]
+    fn test_resolve_token_empty_file() {
+        let temp_file = NamedTempFile::new().unwrap();
+
+        let token = format!("file:{}", temp_file.path().display());
+        let result = resolve_token(&token);
+
+        assert!(result.is_err());
+        assert!(matches!(result, Err(McpRuntimeError::TokenFileEmpty(_))));
+    }
+
+    #[test]
+    fn test_resolve_token_file_with_only_whitespace() {
+        let mut temp_file = NamedTempFile::new().unwrap();
+        writeln!(temp_file, "   \n\t\n   ").unwrap();
+
+        let token = format!("file:{}", temp_file.path().display());
+        let result = resolve_token(&token);
+
+        assert!(result.is_err());
+        assert!(matches!(result, Err(McpRuntimeError::TokenFileEmpty(_))));
+    }
+}
diff --git a/core/connectors/runtime/Cargo.toml 
b/core/connectors/runtime/Cargo.toml
index 7c0b2ff43..2f2d18971 100644
--- a/core/connectors/runtime/Cargo.toml
+++ b/core/connectors/runtime/Cargo.toml
@@ -17,7 +17,7 @@
 
 [package]
 name = "iggy-connectors"
-version = "0.2.1-edge.1"
+version = "0.2.1-edge.2"
 description = "Connectors runtime for Iggy message streaming platform"
 edition = "2024"
 license = "Apache-2.0"
@@ -35,6 +35,7 @@ axum-server = { workspace = true }
 chrono = { workspace = true }
 clap = { workspace = true }
 dashmap = { workspace = true }
+dirs = { workspace = true }
 dlopen2 = { workspace = true }
 dotenvy = { workspace = true }
 figlet-rs = { workspace = true }
@@ -62,3 +63,6 @@ toml = { workspace = true }
 tower-http = { workspace = true }
 tracing = { workspace = true }
 tracing-subscriber = { workspace = true }
+
+[dev-dependencies]
+tempfile = { workspace = true }
diff --git a/core/connectors/runtime/src/error.rs 
b/core/connectors/runtime/src/error.rs
index b392b60b6..0f6705faf 100644
--- a/core/connectors/runtime/src/error.rs
+++ b/core/connectors/runtime/src/error.rs
@@ -57,6 +57,12 @@ pub enum RuntimeError {
     IoError(#[from] std::io::Error),
     #[error("HTTP request failed: {0}")]
     HttpRequestFailed(String),
+    #[error("Token file not found: {0}")]
+    TokenFileNotFound(String),
+    #[error("Failed to read token file '{0}': {1}")]
+    TokenFileReadError(String, String),
+    #[error("Token file is empty: {0}")]
+    TokenFileEmpty(String),
 }
 
 impl RuntimeError {
@@ -69,6 +75,9 @@ impl RuntimeError {
             RuntimeError::MissingIggyCredentials => "invalid_configuration",
             RuntimeError::InvalidConfiguration(_) => "invalid_configuration",
             RuntimeError::HttpRequestFailed(_) => "http_request_failed",
+            RuntimeError::TokenFileNotFound(_) => "invalid_configuration",
+            RuntimeError::TokenFileReadError(_, _) => "invalid_configuration",
+            RuntimeError::TokenFileEmpty(_) => "invalid_configuration",
             _ => "error",
         }
     }
diff --git a/core/connectors/runtime/src/stream.rs 
b/core/connectors/runtime/src/stream.rs
index 79a5b8b98..c2732d597 100644
--- a/core/connectors/runtime/src/stream.rs
+++ b/core/connectors/runtime/src/stream.rs
@@ -18,30 +18,81 @@
  */
 
 use iggy::prelude::{Client, IggyClient, IggyClientBuilder};
+use std::path::PathBuf;
 use tracing::{error, info};
 
 use crate::configs::runtime::IggyConfig;
 use crate::error::RuntimeError;
 
+const TOKEN_FILE_PREFIX: &str = "file:";
+
+fn expand_home(path: &str) -> PathBuf {
+    if let Some(rest) = path.strip_prefix("~/") {
+        if let Some(home) = dirs::home_dir() {
+            return home.join(rest);
+        }
+    } else if path == "~"
+        && let Some(home) = dirs::home_dir()
+    {
+        return home;
+    }
+    PathBuf::from(path)
+}
+
+fn resolve_token(token: &str) -> Result<String, RuntimeError> {
+    if let Some(path) = token.strip_prefix(TOKEN_FILE_PREFIX) {
+        let file_path = expand_home(path);
+
+        if !file_path.exists() {
+            error!("Token file does not exist: {}", path);
+            return Err(RuntimeError::TokenFileNotFound(path.to_string()));
+        }
+
+        let content = std::fs::read_to_string(&file_path).map_err(|e| {
+            error!("Failed to read token file '{}': {}", path, e);
+            RuntimeError::TokenFileReadError(path.to_string(), e.to_string())
+        })?;
+
+        let trimmed = content.trim().to_string();
+
+        if trimmed.is_empty() {
+            error!("Token file is empty: {}", path);
+            return Err(RuntimeError::TokenFileEmpty(path.to_string()));
+        }
+
+        Ok(trimmed)
+    } else {
+        Ok(token.to_string())
+    }
+}
+
 pub struct IggyClients {
     pub producer: IggyClient,
     pub consumer: IggyClient,
 }
 
 pub async fn init(config: IggyConfig) -> Result<IggyClients, RuntimeError> {
-    let consumer = create_client(&config).await?;
-    let producer = create_client(&config).await?;
+    let token = if config.token.is_empty() {
+        None
+    } else {
+        Some(resolve_token(&config.token)?)
+    };
+
+    let consumer = create_client(&config, token.as_deref()).await?;
+    let producer = create_client(&config, token.as_deref()).await?;
     let iggy_clients = IggyClients { producer, consumer };
     Ok(iggy_clients)
 }
 
-async fn create_client(config: &IggyConfig) -> Result<IggyClient, 
RuntimeError> {
+async fn create_client(
+    config: &IggyConfig,
+    token: Option<&str>,
+) -> Result<IggyClient, RuntimeError> {
     let address = config.address.to_owned();
     let username = config.username.to_owned();
     let password = config.password.to_owned();
-    let token = config.token.to_owned();
 
-    let connection_string = if !token.is_empty() {
+    let connection_string = if let Some(token) = token {
         let redacted_token = token.chars().take(3).collect::<String>();
         info!("Using token: {redacted_token}*** for Iggy authentication");
         format!("iggy://{token}@{address}")
@@ -87,3 +138,107 @@ async fn create_client(config: &IggyConfig) -> 
Result<IggyClient, RuntimeError>
     client.connect().await?;
     Ok(client)
 }
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use std::io::Write;
+    use tempfile::NamedTempFile;
+
+    #[test]
+    fn test_expand_home_with_tilde_prefix() {
+        let path = "~/some/path";
+        let result = expand_home(path);
+
+        if let Some(home) = dirs::home_dir() {
+            assert_eq!(result, home.join("some/path"));
+        } else {
+            assert_eq!(result, PathBuf::from(path));
+        }
+    }
+
+    #[test]
+    fn test_expand_home_with_only_tilde() {
+        let path = "~";
+        let result = expand_home(path);
+
+        if let Some(home) = dirs::home_dir() {
+            assert_eq!(result, home);
+        } else {
+            assert_eq!(result, PathBuf::from(path));
+        }
+    }
+
+    #[test]
+    fn test_expand_home_without_tilde() {
+        let path = "/absolute/path";
+        let result = expand_home(path);
+        assert_eq!(result, PathBuf::from("/absolute/path"));
+    }
+
+    #[test]
+    fn test_expand_home_relative_path() {
+        let path = "relative/path";
+        let result = expand_home(path);
+        assert_eq!(result, PathBuf::from("relative/path"));
+    }
+
+    #[test]
+    fn test_resolve_token_direct_value() {
+        let token = "my-secret-token";
+        let result = resolve_token(token).unwrap();
+        assert_eq!(result, "my-secret-token");
+    }
+
+    #[test]
+    fn test_resolve_token_from_file() {
+        let mut temp_file = NamedTempFile::new().unwrap();
+        writeln!(temp_file, "token-from-file").unwrap();
+
+        let token = format!("file:{}", temp_file.path().display());
+        let result = resolve_token(&token).unwrap();
+        assert_eq!(result, "token-from-file");
+    }
+
+    #[test]
+    fn test_resolve_token_from_file_trims_whitespace() {
+        let mut temp_file = NamedTempFile::new().unwrap();
+        writeln!(temp_file, "  token-with-spaces  \n\n").unwrap();
+
+        let token = format!("file:{}", temp_file.path().display());
+        let result = resolve_token(&token).unwrap();
+        assert_eq!(result, "token-with-spaces");
+    }
+
+    #[test]
+    fn test_resolve_token_file_not_found() {
+        let token = "file:/nonexistent/path/to/token.txt";
+        let result = resolve_token(token);
+
+        assert!(result.is_err());
+        assert!(matches!(result, Err(RuntimeError::TokenFileNotFound(_))));
+    }
+
+    #[test]
+    fn test_resolve_token_empty_file() {
+        let temp_file = NamedTempFile::new().unwrap();
+
+        let token = format!("file:{}", temp_file.path().display());
+        let result = resolve_token(&token);
+
+        assert!(result.is_err());
+        assert!(matches!(result, Err(RuntimeError::TokenFileEmpty(_))));
+    }
+
+    #[test]
+    fn test_resolve_token_file_with_only_whitespace() {
+        let mut temp_file = NamedTempFile::new().unwrap();
+        writeln!(temp_file, "   \n\t\n   ").unwrap();
+
+        let token = format!("file:{}", temp_file.path().display());
+        let result = resolve_token(&token);
+
+        assert!(result.is_err());
+        assert!(matches!(result, Err(RuntimeError::TokenFileEmpty(_))));
+    }
+}

Reply via email to