This is an automated email from the ASF dual-hosted git repository.

maciej pushed a commit to branch connectors-mcp-token-file
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit 912778da5acfd30e124a7405144d72bfa3b0c136
Author: Maciej Modzelewski <[email protected]>
AuthorDate: Thu Jan 15 11:51:24 2026 +0100

    feat(connectors,mcp): Implement reading iggy token from file
---
 Cargo.lock                            |  2 ++
 core/ai/mcp/Cargo.toml                |  1 +
 core/ai/mcp/src/error.rs              |  6 ++++
 core/ai/mcp/src/stream.rs             | 51 +++++++++++++++++++++++++++--
 core/connectors/runtime/Cargo.toml    |  1 +
 core/connectors/runtime/src/error.rs  |  9 ++++++
 core/connectors/runtime/src/stream.rs | 61 ++++++++++++++++++++++++++++++++---
 7 files changed, 124 insertions(+), 7 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index a9e03e889..cb266009f 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -4592,6 +4592,7 @@ dependencies = [
  "chrono",
  "clap",
  "dashmap",
+ "dirs",
  "dlopen2",
  "dotenvy",
  "figlet-rs",
@@ -4627,6 +4628,7 @@ version = "0.2.1-edge.1"
 dependencies = [
  "axum",
  "axum-server",
+ "dirs",
  "dotenvy",
  "figlet-rs",
  "figment",
diff --git a/core/ai/mcp/Cargo.toml b/core/ai/mcp/Cargo.toml
index b1046754d..164d940f9 100644
--- a/core/ai/mcp/Cargo.toml
+++ b/core/ai/mcp/Cargo.toml
@@ -29,6 +29,7 @@ readme = "README.md"
 [dependencies]
 axum = { workspace = true }
 axum-server = { workspace = true }
+dirs = { workspace = true }
 dotenvy = { workspace = true }
 figlet-rs = { workspace = true }
 figment = { workspace = true }
diff --git a/core/ai/mcp/src/error.rs b/core/ai/mcp/src/error.rs
index 950056020..966397315 100644
--- a/core/ai/mcp/src/error.rs
+++ b/core/ai/mcp/src/error.rs
@@ -37,4 +37,10 @@ pub enum McpRuntimeError {
     FailedToCreateConsumerId,
     #[error("Invalid API path")]
     InvalidApiPath,
+    #[error("Token file not found: {0}")]
+    TokenFileNotFound(String),
+    #[error("Failed to read token file '{0}': {1}")]
+    TokenFileReadError(String, String),
+    #[error("Token file is empty: {0}")]
+    TokenFileEmpty(String),
 }
diff --git a/core/ai/mcp/src/stream.rs b/core/ai/mcp/src/stream.rs
index 9ef9be6f4..aa5a8c858 100644
--- a/core/ai/mcp/src/stream.rs
+++ b/core/ai/mcp/src/stream.rs
@@ -19,15 +19,62 @@
 
 use crate::{configs::IggyConfig, error::McpRuntimeError};
 use iggy::prelude::{Client, IggyClient, IggyClientBuilder};
+use std::path::PathBuf;
 use tracing::{error, info};
 
+const TOKEN_FILE_PREFIX: &str = "file:";
+
+fn expand_home(path: &str) -> PathBuf {
+    if let Some(rest) = path.strip_prefix("~/") {
+        if let Some(home) = dirs::home_dir() {
+            return home.join(rest);
+        }
+    } else if path == "~"
+        && let Some(home) = dirs::home_dir()
+    {
+        return home;
+    }
+    PathBuf::from(path)
+}
+
+fn resolve_token(token: &str) -> Result<String, McpRuntimeError> {
+    if let Some(path) = token.strip_prefix(TOKEN_FILE_PREFIX) {
+        let file_path = expand_home(path);
+
+        if !file_path.exists() {
+            error!("Token file does not exist: {}", path);
+            return Err(McpRuntimeError::TokenFileNotFound(path.to_string()));
+        }
+
+        let content = std::fs::read_to_string(&file_path).map_err(|e| {
+            error!("Failed to read token file '{}': {}", path, e);
+            McpRuntimeError::TokenFileReadError(path.to_string(), 
e.to_string())
+        })?;
+
+        let trimmed = content.trim().to_string();
+
+        if trimmed.is_empty() {
+            error!("Token file is empty: {}", path);
+            return Err(McpRuntimeError::TokenFileEmpty(path.to_string()));
+        }
+
+        Ok(trimmed)
+    } else {
+        Ok(token.to_string())
+    }
+}
+
 pub async fn init(config: IggyConfig) -> Result<IggyClient, McpRuntimeError> {
     let address = config.address;
     let username = config.username;
     let password = config.password;
-    let token = config.token;
+    let token = if config.token.is_empty() {
+        None
+    } else {
+        Some(resolve_token(&config.token)?)
+    };
 
-    let connection_string = if !token.is_empty() {
+    let connection_string = if let Some(token) = token {
         let redacted_token = token.chars().take(3).collect::<String>();
         info!("Using token: {redacted_token}*** for Iggy authentication");
         format!("iggy://{token}@{address}")
diff --git a/core/connectors/runtime/Cargo.toml 
b/core/connectors/runtime/Cargo.toml
index 7c0b2ff43..28cbf24c1 100644
--- a/core/connectors/runtime/Cargo.toml
+++ b/core/connectors/runtime/Cargo.toml
@@ -35,6 +35,7 @@ axum-server = { workspace = true }
 chrono = { workspace = true }
 clap = { workspace = true }
 dashmap = { workspace = true }
+dirs = { workspace = true }
 dlopen2 = { workspace = true }
 dotenvy = { workspace = true }
 figlet-rs = { workspace = true }
diff --git a/core/connectors/runtime/src/error.rs 
b/core/connectors/runtime/src/error.rs
index b392b60b6..0f6705faf 100644
--- a/core/connectors/runtime/src/error.rs
+++ b/core/connectors/runtime/src/error.rs
@@ -57,6 +57,12 @@ pub enum RuntimeError {
     IoError(#[from] std::io::Error),
     #[error("HTTP request failed: {0}")]
     HttpRequestFailed(String),
+    #[error("Token file not found: {0}")]
+    TokenFileNotFound(String),
+    #[error("Failed to read token file '{0}': {1}")]
+    TokenFileReadError(String, String),
+    #[error("Token file is empty: {0}")]
+    TokenFileEmpty(String),
 }
 
 impl RuntimeError {
@@ -69,6 +75,9 @@ impl RuntimeError {
             RuntimeError::MissingIggyCredentials => "invalid_configuration",
             RuntimeError::InvalidConfiguration(_) => "invalid_configuration",
             RuntimeError::HttpRequestFailed(_) => "http_request_failed",
+            RuntimeError::TokenFileNotFound(_) => "invalid_configuration",
+            RuntimeError::TokenFileReadError(_, _) => "invalid_configuration",
+            RuntimeError::TokenFileEmpty(_) => "invalid_configuration",
             _ => "error",
         }
     }
diff --git a/core/connectors/runtime/src/stream.rs 
b/core/connectors/runtime/src/stream.rs
index 79a5b8b98..e8d1b20d5 100644
--- a/core/connectors/runtime/src/stream.rs
+++ b/core/connectors/runtime/src/stream.rs
@@ -18,30 +18,81 @@
  */
 
 use iggy::prelude::{Client, IggyClient, IggyClientBuilder};
+use std::path::PathBuf;
 use tracing::{error, info};
 
 use crate::configs::runtime::IggyConfig;
 use crate::error::RuntimeError;
 
+const TOKEN_FILE_PREFIX: &str = "file:";
+
+fn expand_home(path: &str) -> PathBuf {
+    if let Some(rest) = path.strip_prefix("~/") {
+        if let Some(home) = dirs::home_dir() {
+            return home.join(rest);
+        }
+    } else if path == "~"
+        && let Some(home) = dirs::home_dir()
+    {
+        return home;
+    }
+    PathBuf::from(path)
+}
+
+fn resolve_token(token: &str) -> Result<String, RuntimeError> {
+    if let Some(path) = token.strip_prefix(TOKEN_FILE_PREFIX) {
+        let file_path = expand_home(path);
+
+        if !file_path.exists() {
+            error!("Token file does not exist: {}", path);
+            return Err(RuntimeError::TokenFileNotFound(path.to_string()));
+        }
+
+        let content = std::fs::read_to_string(&file_path).map_err(|e| {
+            error!("Failed to read token file '{}': {}", path, e);
+            RuntimeError::TokenFileReadError(path.to_string(), e.to_string())
+        })?;
+
+        let trimmed = content.trim().to_string();
+
+        if trimmed.is_empty() {
+            error!("Token file is empty: {}", path);
+            return Err(RuntimeError::TokenFileEmpty(path.to_string()));
+        }
+
+        Ok(trimmed)
+    } else {
+        Ok(token.to_string())
+    }
+}
+
 pub struct IggyClients {
     pub producer: IggyClient,
     pub consumer: IggyClient,
 }
 
 pub async fn init(config: IggyConfig) -> Result<IggyClients, RuntimeError> {
-    let consumer = create_client(&config).await?;
-    let producer = create_client(&config).await?;
+    let token = if config.token.is_empty() {
+        None
+    } else {
+        Some(resolve_token(&config.token)?)
+    };
+
+    let consumer = create_client(&config, token.as_deref()).await?;
+    let producer = create_client(&config, token.as_deref()).await?;
     let iggy_clients = IggyClients { producer, consumer };
     Ok(iggy_clients)
 }
 
-async fn create_client(config: &IggyConfig) -> Result<IggyClient, 
RuntimeError> {
+async fn create_client(
+    config: &IggyConfig,
+    token: Option<&str>,
+) -> Result<IggyClient, RuntimeError> {
     let address = config.address.to_owned();
     let username = config.username.to_owned();
     let password = config.password.to_owned();
-    let token = config.token.to_owned();
 
-    let connection_string = if !token.is_empty() {
+    let connection_string = if let Some(token) = token {
         let redacted_token = token.chars().take(3).collect::<String>();
         info!("Using token: {redacted_token}*** for Iggy authentication");
         format!("iggy://{token}@{address}")

Reply via email to