This is an automated email from the ASF dual-hosted git repository.

piotr pushed a commit to branch mcp
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit 1a06612a6ff5703b48b1602cce8012c916c28131
Author: spetz <[email protected]>
AuthorDate: Thu Jul 10 21:57:37 2025 +0200

    feat(mcp): add Iggy MCP server
---
 Cargo.lock                  | 112 +++++++++++++++++++++++++++++++++-
 Cargo.toml                  |   1 +
 core/mcp/Cargo.toml         |  22 +++++++
 core/mcp/config.toml        |  33 ++++++++++
 core/mcp/src/configs.rs     |  60 +++++++++++++++++++
 core/mcp/src/error.rs       |  29 +++++++++
 core/mcp/src/main.rs        | 143 ++++++++++++++++++++++++++++++++++++++++++++
 core/mcp/src/service/mod.rs |  60 +++++++++++++++++++
 8 files changed, 459 insertions(+), 1 deletion(-)

diff --git a/Cargo.lock b/Cargo.lock
index a89b2ecf..aa59392c 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -3808,6 +3808,23 @@ dependencies = [
  "tracing-subscriber",
 ]
 
+[[package]]
+name = "iggy-mcp"
+version = "0.1.0"
+dependencies = [
+ "axum 0.8.4",
+ "config",
+ "dotenvy",
+ "figlet-rs",
+ "rmcp",
+ "serde",
+ "strum",
+ "thiserror 2.0.12",
+ "tokio",
+ "tracing",
+ "tracing-subscriber",
+]
+
 [[package]]
 name = "iggy_binary_protocol"
 version = "0.7.0"
@@ -6297,6 +6314,50 @@ dependencies = [
  "syn 1.0.109",
 ]
 
+[[package]]
+name = "rmcp"
+version = "0.2.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "37f2048a81a7ff7e8ef6bc5abced70c3d9114c8f03d85d7aaaafd9fd04f12e9e"
+dependencies = [
+ "axum 0.8.4",
+ "base64 0.22.1",
+ "bytes",
+ "chrono",
+ "futures",
+ "http 1.3.1",
+ "http-body",
+ "http-body-util",
+ "paste",
+ "pin-project-lite",
+ "rand 0.9.1",
+ "rmcp-macros",
+ "schemars 0.8.22",
+ "serde",
+ "serde_json",
+ "sse-stream",
+ "thiserror 2.0.12",
+ "tokio",
+ "tokio-stream",
+ "tokio-util",
+ "tower-service",
+ "tracing",
+ "uuid",
+]
+
+[[package]]
+name = "rmcp-macros"
+version = "0.2.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "72398e694b9f6dbb5de960cf158c8699e6a1854cb5bbaac7de0646b2005763c4"
+dependencies = [
+ "darling",
+ "proc-macro2",
+ "quote",
+ "serde_json",
+ "syn 2.0.104",
+]
+
 [[package]]
 name = "ron"
 version = "0.8.1"
@@ -6582,6 +6643,19 @@ dependencies = [
  "windows-sys 0.59.0",
 ]
 
+[[package]]
+name = "schemars"
+version = "0.8.22"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "3fbf2ae1b8bc8e02df939598064d22402220cd5bbcca1c76f7d6a310974d5615"
+dependencies = [
+ "chrono",
+ "dyn-clone",
+ "schemars_derive",
+ "serde",
+ "serde_json",
+]
+
 [[package]]
 name = "schemars"
 version = "0.9.0"
@@ -6594,6 +6668,18 @@ dependencies = [
  "serde_json",
 ]
 
+[[package]]
+name = "schemars_derive"
+version = "0.8.22"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "32e265784ad618884abaea0600a9adf15393368d840e0222d101a072f3f7534d"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "serde_derive_internals",
+ "syn 2.0.104",
+]
+
 [[package]]
 name = "scoped-tls"
 version = "1.0.1"
@@ -6713,6 +6799,17 @@ dependencies = [
  "syn 2.0.104",
 ]
 
+[[package]]
+name = "serde_derive_internals"
+version = "0.29.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "18d26a20a969b9e3fdf2fc2d9f21eda6c40e2de84c9408bb5d3b05d499aae711"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn 2.0.104",
+]
+
 [[package]]
 name = "serde_json"
 version = "1.0.140"
@@ -6767,7 +6864,7 @@ dependencies = [
  "hex",
  "indexmap 1.9.3",
  "indexmap 2.9.0",
- "schemars",
+ "schemars 0.9.0",
  "serde",
  "serde_derive",
  "serde_json",
@@ -7066,6 +7163,19 @@ dependencies = [
  "lock_api",
 ]
 
+[[package]]
+name = "sse-stream"
+version = "0.2.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "eb4dc4d33c68ec1f27d386b5610a351922656e1fdf5c05bbaad930cd1519479a"
+dependencies = [
+ "bytes",
+ "futures-util",
+ "http-body",
+ "http-body-util",
+ "pin-project-lite",
+]
+
 [[package]]
 name = "stable_deref_trait"
 version = "1.2.0"
diff --git a/Cargo.toml b/Cargo.toml
index f910068b..093f98e3 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -37,6 +37,7 @@ members = [
     "core/connectors/sinks/stdout_sink",
     "core/connectors/sources/random_source",
     "core/integration",
+    "core/mcp",
     "core/sdk",
     "core/server",
     "core/tools",
diff --git a/core/mcp/Cargo.toml b/core/mcp/Cargo.toml
new file mode 100644
index 00000000..86941174
--- /dev/null
+++ b/core/mcp/Cargo.toml
@@ -0,0 +1,22 @@
+[package]
+name = "iggy-mcp"
+version = "0.1.0"
+edition = "2024"
+
+[dependencies]
+axum = { workspace = true }
+config = { workspace = true }
+dotenvy = { workspace = true }
+figlet-rs = { workspace = true }
+rmcp = { version = "0.2.1", features = [
+    "server",
+    "transport-io",
+    "transport-sse-server",
+    "transport-streamable-http-server",
+] }
+serde = { workspace = true }
+strum = { workspace = true }
+thiserror = { workspace = true }
+tokio = { workspace = true }
+tracing = { workspace = true }
+tracing-subscriber = { workspace = true }
diff --git a/core/mcp/config.toml b/core/mcp/config.toml
new file mode 100644
index 00000000..ab06c0ca
--- /dev/null
+++ b/core/mcp/config.toml
@@ -0,0 +1,33 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+transport = "stdio"
+
+[http_api] # Optional HTTP API configuration
+address = "127.0.0.1:8082"
+# api_key = "secret" # Optional API key for authentication to be passed as 
`api-key` header
+
+[http_api.tls] # Optional TLS configuration for HTTP API
+enabled = false
+cert_file = "core/certs/iggy_cert.pem"
+key_file = "core/certs/iggy_key.pem"
+
+[iggy]
+address = "localhost:8090"
+username = "iggy"
+password = "iggy"
+# token = "secret" # Personal Access Token (PAT) can be used instead of 
username and password
diff --git a/core/mcp/src/configs.rs b/core/mcp/src/configs.rs
new file mode 100644
index 00000000..89908a4e
--- /dev/null
+++ b/core/mcp/src/configs.rs
@@ -0,0 +1,60 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use serde::{Deserialize, Serialize};
+use strum::Display;
+
+#[derive(Debug, Clone, Deserialize, Serialize)]
+pub struct McpServerConfig {
+    pub http_api: Option<HttpApiConfig>,
+    pub iggy: IggyConfig,
+    pub transport: McpTransport,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct IggyConfig {
+    pub address: String,
+    pub username: Option<String>,
+    pub password: Option<String>,
+    pub token: Option<String>,
+}
+
+#[derive(Debug, Clone, Deserialize, Serialize)]
+pub struct HttpApiConfig {
+    pub address: String,
+    pub api_key: Option<String>,
+    pub tls: Option<HttpTlsConfig>,
+}
+
+#[derive(Debug, Deserialize, Serialize, Clone)]
+pub struct HttpTlsConfig {
+    pub enabled: bool,
+    pub cert_file: String,
+    pub key_file: String,
+}
+
+#[derive(Clone, Copy, Debug, Default, Display, PartialEq, Eq, Serialize, 
Deserialize)]
+#[strum(serialize_all = "snake_case")]
+#[serde(rename_all = "snake_case")]
+pub enum McpTransport {
+    #[default]
+    #[strum(to_string = "http")]
+    Http,
+    #[strum(to_string = "stdio")]
+    Stdio,
+}
diff --git a/core/mcp/src/error.rs b/core/mcp/src/error.rs
new file mode 100644
index 00000000..592e6db1
--- /dev/null
+++ b/core/mcp/src/error.rs
@@ -0,0 +1,29 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use thiserror::Error;
+
+#[derive(Debug, Error)]
+pub enum McpRuntimeError {
+    #[error("Failed to create service")]
+    FailedToCreateService,
+    #[error("Missing configuration")]
+    MissingConfig,
+    #[error("Failed to start HTTP server")]
+    FailedToStartHttpServer,
+}
diff --git a/core/mcp/src/main.rs b/core/mcp/src/main.rs
new file mode 100644
index 00000000..fba53bac
--- /dev/null
+++ b/core/mcp/src/main.rs
@@ -0,0 +1,143 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use config::{Config, Environment, File};
+use configs::{McpServerConfig, McpTransport};
+use dotenvy::dotenv;
+use error::McpRuntimeError;
+use figlet_rs::FIGfont;
+use rmcp::{
+    ServiceExt,
+    transport::{
+        StreamableHttpService, stdio, 
streamable_http_server::session::local::LocalSessionManager,
+    },
+};
+use service::IggyService;
+use std::env;
+use tracing::{error, info};
+use tracing_subscriber::{EnvFilter, Registry, layer::SubscriberExt, 
util::SubscriberInitExt};
+
+mod configs;
+mod error;
+mod service;
+
+#[tokio::main]
+async fn main() -> Result<(), McpRuntimeError> {
+    let standard_font = FIGfont::standard().unwrap();
+    let figure = standard_font.convert("Iggy MCP Server");
+    eprintln!("{}", figure.unwrap());
+
+    if let Ok(env_path) = std::env::var("IGGY_MCP_ENV_PATH") {
+        if dotenvy::from_path(&env_path).is_ok() {
+            eprintln!("Loaded environment variables from path: {env_path}");
+        }
+    } else if let Ok(path) = dotenv() {
+        eprintln!(
+            "Loaded environment variables from .env file at path: {}",
+            path.display()
+        );
+    }
+
+    let config_path = env::var("IGGY_MCP_CONFIG_PATH").unwrap_or_else(|_| 
"config".to_string());
+
+    eprintln!("Loading configuration from: {config_path}");
+
+    let builder = Config::builder()
+        .add_source(File::with_name(&config_path))
+        .add_source(Environment::with_prefix("IGGY_MCP").separator("_"));
+
+    let config: McpServerConfig = builder
+        .build()
+        .expect("Failed to build runtime config")
+        .try_deserialize()
+        .expect("Failed to deserialize runtime config");
+
+    let transport = config.transport;
+    if transport == McpTransport::Stdio {
+        tracing_subscriber::fmt()
+            
.with_env_filter(EnvFilter::try_from_default_env().unwrap_or(EnvFilter::new("DEBUG")))
+            .with_writer(std::io::stderr)
+            .with_ansi(false)
+            .init();
+    } else {
+        Registry::default()
+            .with(tracing_subscriber::fmt::layer())
+            
.with(EnvFilter::try_from_default_env().unwrap_or(EnvFilter::new("INFO")))
+            .init();
+    }
+
+    info!("Starting Iggy MCP Server, transport: {transport}...");
+
+    if transport == McpTransport::Stdio {
+        let Ok(service) = 
IggyService::new().serve(stdio()).await.inspect_err(|e| {
+            error!("Serving error: {:?}", e);
+        }) else {
+            error!("Failed to create service");
+            return Err(McpRuntimeError::FailedToCreateService);
+        };
+
+        if let Err(error) = service.waiting().await {
+            error!("waiting error: {:?}", error);
+        }
+    } else {
+        let Some(http_config) = config.http_api else {
+            error!("HTTP API configuration not found");
+            return Err(McpRuntimeError::MissingConfig);
+        };
+
+        let service = StreamableHttpService::new(
+            || Ok(IggyService::new()),
+            LocalSessionManager::default().into(),
+            Default::default(),
+        );
+
+        let router = axum::Router::new().nest_service("/mcp", service);
+        let tcp_listener = tokio::net::TcpListener::bind(&http_config.address)
+            .await
+            .map_err(|error| {
+                error!("Failed to bind TCP listener: {:?}", error);
+                McpRuntimeError::FailedToStartHttpServer
+            })?;
+        info!("HTTP API listening on: {}", http_config.address);
+        let _ = axum::serve(tcp_listener, router)
+            .with_graceful_shutdown(async { 
tokio::signal::ctrl_c().await.unwrap() })
+            .await;
+    }
+
+    #[cfg(unix)]
+    let (mut ctrl_c, mut sigterm) = {
+        use tokio::signal::unix::{SignalKind, signal};
+        (
+            signal(SignalKind::interrupt()).expect("Failed to create SIGINT 
signal"),
+            signal(SignalKind::terminate()).expect("Failed to create SIGTERM 
signal"),
+        )
+    };
+
+    #[cfg(unix)]
+    tokio::select! {
+        _ = ctrl_c.recv() => {
+            info!("Received SIGINT. Shutting down Iggy MCP Server...");
+        },
+        _ = sigterm.recv() => {
+            info!("Received SIGTERM. Shutting down Iggy MCP Server...");
+        }
+    }
+
+    info!("Iggy MCP Server stopped successfully");
+    Ok(())
+}
diff --git a/core/mcp/src/service/mod.rs b/core/mcp/src/service/mod.rs
new file mode 100644
index 00000000..77dad561
--- /dev/null
+++ b/core/mcp/src/service/mod.rs
@@ -0,0 +1,60 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use rmcp::{
+    ServerHandler,
+    handler::server::{router::tool::ToolRouter, tool::Parameters},
+    model::{ServerCapabilities, ServerInfo},
+    schemars, tool, tool_handler, tool_router,
+};
+
+#[derive(Debug, serde::Deserialize, schemars::JsonSchema)]
+pub struct HelloRequest {
+    #[schemars(description = "your name")]
+    pub name: String,
+}
+
+#[derive(Debug, Clone)]
+pub struct IggyService {
+    tool_router: ToolRouter<Self>,
+}
+
+#[tool_router]
+impl IggyService {
+    pub fn new() -> Self {
+        Self {
+            tool_router: Self::tool_router(),
+        }
+    }
+
+    #[tool(description = "Say hello")]
+    fn hello(&self, Parameters(HelloRequest { name }): 
Parameters<HelloRequest>) -> String {
+        format!("Hello, {name}!")
+    }
+}
+
+#[tool_handler]
+impl ServerHandler for IggyService {
+    fn get_info(&self) -> ServerInfo {
+        ServerInfo {
+            instructions: Some("Iggy service".into()),
+            capabilities: ServerCapabilities::builder().enable_tools().build(),
+            ..Default::default()
+        }
+    }
+}

Reply via email to