This is an automated email from the ASF dual-hosted git repository. piotr pushed a commit to branch mcp in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 1a06612a6ff5703b48b1602cce8012c916c28131 Author: spetz <[email protected]> AuthorDate: Thu Jul 10 21:57:37 2025 +0200 feat(mcp): add Iggy MCP server --- Cargo.lock | 112 +++++++++++++++++++++++++++++++++- Cargo.toml | 1 + core/mcp/Cargo.toml | 22 +++++++ core/mcp/config.toml | 33 ++++++++++ core/mcp/src/configs.rs | 60 +++++++++++++++++++ core/mcp/src/error.rs | 29 +++++++++ core/mcp/src/main.rs | 143 ++++++++++++++++++++++++++++++++++++++++++++ core/mcp/src/service/mod.rs | 60 +++++++++++++++++++ 8 files changed, 459 insertions(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index a89b2ecf..aa59392c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3808,6 +3808,23 @@ dependencies = [ "tracing-subscriber", ] +[[package]] +name = "iggy-mcp" +version = "0.1.0" +dependencies = [ + "axum 0.8.4", + "config", + "dotenvy", + "figlet-rs", + "rmcp", + "serde", + "strum", + "thiserror 2.0.12", + "tokio", + "tracing", + "tracing-subscriber", +] + [[package]] name = "iggy_binary_protocol" version = "0.7.0" @@ -6297,6 +6314,50 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "rmcp" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37f2048a81a7ff7e8ef6bc5abced70c3d9114c8f03d85d7aaaafd9fd04f12e9e" +dependencies = [ + "axum 0.8.4", + "base64 0.22.1", + "bytes", + "chrono", + "futures", + "http 1.3.1", + "http-body", + "http-body-util", + "paste", + "pin-project-lite", + "rand 0.9.1", + "rmcp-macros", + "schemars 0.8.22", + "serde", + "serde_json", + "sse-stream", + "thiserror 2.0.12", + "tokio", + "tokio-stream", + "tokio-util", + "tower-service", + "tracing", + "uuid", +] + +[[package]] +name = "rmcp-macros" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72398e694b9f6dbb5de960cf158c8699e6a1854cb5bbaac7de0646b2005763c4" +dependencies = [ + "darling", + "proc-macro2", + "quote", + "serde_json", + "syn 2.0.104", +] + [[package]] name = "ron" version = "0.8.1" @@ -6582,6 +6643,19 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "schemars" +version = "0.8.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3fbf2ae1b8bc8e02df939598064d22402220cd5bbcca1c76f7d6a310974d5615" +dependencies = [ + "chrono", + "dyn-clone", + "schemars_derive", + "serde", + "serde_json", +] + [[package]] name = "schemars" version = "0.9.0" @@ -6594,6 +6668,18 @@ dependencies = [ "serde_json", ] +[[package]] +name = "schemars_derive" +version = "0.8.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32e265784ad618884abaea0600a9adf15393368d840e0222d101a072f3f7534d" +dependencies = [ + "proc-macro2", + "quote", + "serde_derive_internals", + "syn 2.0.104", +] + [[package]] name = "scoped-tls" version = "1.0.1" @@ -6713,6 +6799,17 @@ dependencies = [ "syn 2.0.104", ] +[[package]] +name = "serde_derive_internals" +version = "0.29.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "18d26a20a969b9e3fdf2fc2d9f21eda6c40e2de84c9408bb5d3b05d499aae711" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.104", +] + [[package]] name = "serde_json" version = "1.0.140" @@ -6767,7 +6864,7 @@ dependencies = [ "hex", "indexmap 1.9.3", "indexmap 2.9.0", - "schemars", + "schemars 0.9.0", "serde", "serde_derive", "serde_json", @@ -7066,6 +7163,19 @@ dependencies = [ "lock_api", ] +[[package]] +name = "sse-stream" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eb4dc4d33c68ec1f27d386b5610a351922656e1fdf5c05bbaad930cd1519479a" +dependencies = [ + "bytes", + "futures-util", + "http-body", + "http-body-util", + "pin-project-lite", +] + [[package]] name = "stable_deref_trait" version = "1.2.0" diff --git a/Cargo.toml b/Cargo.toml index f910068b..093f98e3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -37,6 +37,7 @@ members = [ "core/connectors/sinks/stdout_sink", "core/connectors/sources/random_source", "core/integration", + "core/mcp", "core/sdk", "core/server", "core/tools", diff --git a/core/mcp/Cargo.toml b/core/mcp/Cargo.toml new file mode 100644 index 00000000..86941174 --- /dev/null +++ b/core/mcp/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "iggy-mcp" +version = "0.1.0" +edition = "2024" + +[dependencies] +axum = { workspace = true } +config = { workspace = true } +dotenvy = { workspace = true } +figlet-rs = { workspace = true } +rmcp = { version = "0.2.1", features = [ + "server", + "transport-io", + "transport-sse-server", + "transport-streamable-http-server", +] } +serde = { workspace = true } +strum = { workspace = true } +thiserror = { workspace = true } +tokio = { workspace = true } +tracing = { workspace = true } +tracing-subscriber = { workspace = true } diff --git a/core/mcp/config.toml b/core/mcp/config.toml new file mode 100644 index 00000000..ab06c0ca --- /dev/null +++ b/core/mcp/config.toml @@ -0,0 +1,33 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +transport = "stdio" + +[http_api] # Optional HTTP API configuration +address = "127.0.0.1:8082" +# api_key = "secret" # Optional API key for authentication to be passed as `api-key` header + +[http_api.tls] # Optional TLS configuration for HTTP API +enabled = false +cert_file = "core/certs/iggy_cert.pem" +key_file = "core/certs/iggy_key.pem" + +[iggy] +address = "localhost:8090" +username = "iggy" +password = "iggy" +# token = "secret" # Personal Access Token (PAT) can be used instead of username and password diff --git a/core/mcp/src/configs.rs b/core/mcp/src/configs.rs new file mode 100644 index 00000000..89908a4e --- /dev/null +++ b/core/mcp/src/configs.rs @@ -0,0 +1,60 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use serde::{Deserialize, Serialize}; +use strum::Display; + +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct McpServerConfig { + pub http_api: Option<HttpApiConfig>, + pub iggy: IggyConfig, + pub transport: McpTransport, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct IggyConfig { + pub address: String, + pub username: Option<String>, + pub password: Option<String>, + pub token: Option<String>, +} + +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct HttpApiConfig { + pub address: String, + pub api_key: Option<String>, + pub tls: Option<HttpTlsConfig>, +} + +#[derive(Debug, Deserialize, Serialize, Clone)] +pub struct HttpTlsConfig { + pub enabled: bool, + pub cert_file: String, + pub key_file: String, +} + +#[derive(Clone, Copy, Debug, Default, Display, PartialEq, Eq, Serialize, Deserialize)] +#[strum(serialize_all = "snake_case")] +#[serde(rename_all = "snake_case")] +pub enum McpTransport { + #[default] + #[strum(to_string = "http")] + Http, + #[strum(to_string = "stdio")] + Stdio, +} diff --git a/core/mcp/src/error.rs b/core/mcp/src/error.rs new file mode 100644 index 00000000..592e6db1 --- /dev/null +++ b/core/mcp/src/error.rs @@ -0,0 +1,29 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use thiserror::Error; + +#[derive(Debug, Error)] +pub enum McpRuntimeError { + #[error("Failed to create service")] + FailedToCreateService, + #[error("Missing configuration")] + MissingConfig, + #[error("Failed to start HTTP server")] + FailedToStartHttpServer, +} diff --git a/core/mcp/src/main.rs b/core/mcp/src/main.rs new file mode 100644 index 00000000..fba53bac --- /dev/null +++ b/core/mcp/src/main.rs @@ -0,0 +1,143 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use config::{Config, Environment, File}; +use configs::{McpServerConfig, McpTransport}; +use dotenvy::dotenv; +use error::McpRuntimeError; +use figlet_rs::FIGfont; +use rmcp::{ + ServiceExt, + transport::{ + StreamableHttpService, stdio, streamable_http_server::session::local::LocalSessionManager, + }, +}; +use service::IggyService; +use std::env; +use tracing::{error, info}; +use tracing_subscriber::{EnvFilter, Registry, layer::SubscriberExt, util::SubscriberInitExt}; + +mod configs; +mod error; +mod service; + +#[tokio::main] +async fn main() -> Result<(), McpRuntimeError> { + let standard_font = FIGfont::standard().unwrap(); + let figure = standard_font.convert("Iggy MCP Server"); + eprintln!("{}", figure.unwrap()); + + if let Ok(env_path) = std::env::var("IGGY_MCP_ENV_PATH") { + if dotenvy::from_path(&env_path).is_ok() { + eprintln!("Loaded environment variables from path: {env_path}"); + } + } else if let Ok(path) = dotenv() { + eprintln!( + "Loaded environment variables from .env file at path: {}", + path.display() + ); + } + + let config_path = env::var("IGGY_MCP_CONFIG_PATH").unwrap_or_else(|_| "config".to_string()); + + eprintln!("Loading configuration from: {config_path}"); + + let builder = Config::builder() + .add_source(File::with_name(&config_path)) + .add_source(Environment::with_prefix("IGGY_MCP").separator("_")); + + let config: McpServerConfig = builder + .build() + .expect("Failed to build runtime config") + .try_deserialize() + .expect("Failed to deserialize runtime config"); + + let transport = config.transport; + if transport == McpTransport::Stdio { + tracing_subscriber::fmt() + .with_env_filter(EnvFilter::try_from_default_env().unwrap_or(EnvFilter::new("DEBUG"))) + .with_writer(std::io::stderr) + .with_ansi(false) + .init(); + } else { + Registry::default() + .with(tracing_subscriber::fmt::layer()) + .with(EnvFilter::try_from_default_env().unwrap_or(EnvFilter::new("INFO"))) + .init(); + } + + info!("Starting Iggy MCP Server, transport: {transport}..."); + + if transport == McpTransport::Stdio { + let Ok(service) = IggyService::new().serve(stdio()).await.inspect_err(|e| { + error!("Serving error: {:?}", e); + }) else { + error!("Failed to create service"); + return Err(McpRuntimeError::FailedToCreateService); + }; + + if let Err(error) = service.waiting().await { + error!("waiting error: {:?}", error); + } + } else { + let Some(http_config) = config.http_api else { + error!("HTTP API configuration not found"); + return Err(McpRuntimeError::MissingConfig); + }; + + let service = StreamableHttpService::new( + || Ok(IggyService::new()), + LocalSessionManager::default().into(), + Default::default(), + ); + + let router = axum::Router::new().nest_service("/mcp", service); + let tcp_listener = tokio::net::TcpListener::bind(&http_config.address) + .await + .map_err(|error| { + error!("Failed to bind TCP listener: {:?}", error); + McpRuntimeError::FailedToStartHttpServer + })?; + info!("HTTP API listening on: {}", http_config.address); + let _ = axum::serve(tcp_listener, router) + .with_graceful_shutdown(async { tokio::signal::ctrl_c().await.unwrap() }) + .await; + } + + #[cfg(unix)] + let (mut ctrl_c, mut sigterm) = { + use tokio::signal::unix::{SignalKind, signal}; + ( + signal(SignalKind::interrupt()).expect("Failed to create SIGINT signal"), + signal(SignalKind::terminate()).expect("Failed to create SIGTERM signal"), + ) + }; + + #[cfg(unix)] + tokio::select! { + _ = ctrl_c.recv() => { + info!("Received SIGINT. Shutting down Iggy MCP Server..."); + }, + _ = sigterm.recv() => { + info!("Received SIGTERM. Shutting down Iggy MCP Server..."); + } + } + + info!("Iggy MCP Server stopped successfully"); + Ok(()) +} diff --git a/core/mcp/src/service/mod.rs b/core/mcp/src/service/mod.rs new file mode 100644 index 00000000..77dad561 --- /dev/null +++ b/core/mcp/src/service/mod.rs @@ -0,0 +1,60 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use rmcp::{ + ServerHandler, + handler::server::{router::tool::ToolRouter, tool::Parameters}, + model::{ServerCapabilities, ServerInfo}, + schemars, tool, tool_handler, tool_router, +}; + +#[derive(Debug, serde::Deserialize, schemars::JsonSchema)] +pub struct HelloRequest { + #[schemars(description = "your name")] + pub name: String, +} + +#[derive(Debug, Clone)] +pub struct IggyService { + tool_router: ToolRouter<Self>, +} + +#[tool_router] +impl IggyService { + pub fn new() -> Self { + Self { + tool_router: Self::tool_router(), + } + } + + #[tool(description = "Say hello")] + fn hello(&self, Parameters(HelloRequest { name }): Parameters<HelloRequest>) -> String { + format!("Hello, {name}!") + } +} + +#[tool_handler] +impl ServerHandler for IggyService { + fn get_info(&self) -> ServerInfo { + ServerInfo { + instructions: Some("Iggy service".into()), + capabilities: ServerCapabilities::builder().enable_tools().build(), + ..Default::default() + } + } +}
