This is an automated email from the ASF dual-hosted git repository.
piotr pushed a commit to branch mcp
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/mcp by this push:
new d1c3cd81 add basic crud permissions for mcp config
d1c3cd81 is described below
commit d1c3cd81234069c41abc7fc7bd20fc70321151ed
Author: spetz <[email protected]>
AuthorDate: Wed Jul 16 11:42:17 2025 +0200
add basic crud permissions for mcp config
---
core/ai/mcp/config.toml | 6 ++++
core/ai/mcp/src/configs.rs | 9 ++++++
core/ai/mcp/src/main.rs | 71 ++++++++++++++++++++++++++++++++++++++++--
core/ai/mcp/src/service/mod.rs | 16 +++++++++-
4 files changed, 99 insertions(+), 3 deletions(-)
diff --git a/core/ai/mcp/config.toml b/core/ai/mcp/config.toml
index ab06c0ca..7bd59fd0 100644
--- a/core/ai/mcp/config.toml
+++ b/core/ai/mcp/config.toml
@@ -31,3 +31,9 @@ address = "localhost:8090"
username = "iggy"
password = "iggy"
# token = "secret" # Personal Access Token (PAT) can be used instead of
username and password
+
+[permissions]
+create = true
+read = true
+update = true
+delete = true
diff --git a/core/ai/mcp/src/configs.rs b/core/ai/mcp/src/configs.rs
index 89908a4e..85957b94 100644
--- a/core/ai/mcp/src/configs.rs
+++ b/core/ai/mcp/src/configs.rs
@@ -23,6 +23,7 @@ use strum::Display;
pub struct McpServerConfig {
pub http_api: Option<HttpApiConfig>,
pub iggy: IggyConfig,
+ pub permissions: PermissionsConfig,
pub transport: McpTransport,
}
@@ -41,6 +42,14 @@ pub struct HttpApiConfig {
pub tls: Option<HttpTlsConfig>,
}
+#[derive(Debug, Clone, Deserialize, Serialize)]
+pub struct PermissionsConfig {
+ pub create: bool,
+ pub read: bool,
+ pub update: bool,
+ pub delete: bool,
+}
+
#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct HttpTlsConfig {
pub enabled: bool,
diff --git a/core/ai/mcp/src/main.rs b/core/ai/mcp/src/main.rs
index 59f2ff8c..7eeb4342 100644
--- a/core/ai/mcp/src/main.rs
+++ b/core/ai/mcp/src/main.rs
@@ -23,6 +23,7 @@ use error::McpRuntimeError;
use figlet_rs::FIGfont;
use rmcp::{
ServiceExt,
+ model::ErrorData,
transport::{
StreamableHttpService, stdio,
streamable_http_server::session::local::LocalSessionManager,
},
@@ -87,9 +88,15 @@ async fn main() -> Result<(), McpRuntimeError> {
let iggy_clients = stream::init(config.iggy).await?;
let consumer = Arc::new(iggy_clients.consumer);
let producer = Arc::new(iggy_clients.producer);
+ let permissions = Permissions {
+ create: config.permissions.create,
+ read: config.permissions.read,
+ update: config.permissions.update,
+ delete: config.permissions.delete,
+ };
if transport == McpTransport::Stdio {
- let Ok(service) = IggyService::new(consumer, producer)
+ let Ok(service) = IggyService::new(consumer, producer, permissions)
.serve(stdio())
.await
.inspect_err(|e| {
@@ -110,7 +117,13 @@ async fn main() -> Result<(), McpRuntimeError> {
};
let service = StreamableHttpService::new(
- move || Ok(IggyService::new(consumer.clone(), producer.clone())),
+ move || {
+ Ok(IggyService::new(
+ consumer.clone(),
+ producer.clone(),
+ permissions,
+ ))
+ },
LocalSessionManager::default().into(),
Default::default(),
);
@@ -150,3 +163,57 @@ async fn main() -> Result<(), McpRuntimeError> {
info!("Iggy MCP Server stopped successfully");
Ok(())
}
+
+#[derive(Debug, Copy, Clone)]
+pub struct Permissions {
+ create: bool,
+ read: bool,
+ update: bool,
+ delete: bool,
+}
+
+impl Permissions {
+ pub fn ensure_read(&self) -> Result<(), ErrorData> {
+ if self.read {
+ Ok(())
+ } else {
+ Err(ErrorData::invalid_request(
+ "Insufficient 'read' permissions",
+ None,
+ ))
+ }
+ }
+
+ pub fn ensure_create(&self) -> Result<(), ErrorData> {
+ if self.create {
+ Ok(())
+ } else {
+ Err(ErrorData::invalid_request(
+ "Insufficient 'create' permissions",
+ None,
+ ))
+ }
+ }
+
+ pub fn ensure_update(&self) -> Result<(), ErrorData> {
+ if self.update {
+ Ok(())
+ } else {
+ Err(ErrorData::invalid_request(
+ "Insufficient 'update' permissions",
+ None,
+ ))
+ }
+ }
+
+ pub fn ensure_delete(&self) -> Result<(), ErrorData> {
+ if self.delete {
+ Ok(())
+ } else {
+ Err(ErrorData::invalid_request(
+ "Insufficient 'delete' permissions",
+ None,
+ ))
+ }
+ }
+}
diff --git a/core/ai/mcp/src/service/mod.rs b/core/ai/mcp/src/service/mod.rs
index d69bee31..71291595 100644
--- a/core/ai/mcp/src/service/mod.rs
+++ b/core/ai/mcp/src/service/mod.rs
@@ -28,6 +28,8 @@ use rmcp::{
};
use serde::Serialize;
use tracing::error;
+
+use crate::Permissions;
mod requests;
#[derive(Debug, Clone)]
@@ -35,15 +37,21 @@ pub struct IggyService {
tool_router: ToolRouter<Self>,
consumer: Arc<IggyClient>,
_producer: Arc<IggyClient>,
+ permissions: Permissions,
}
#[tool_router]
impl IggyService {
- pub fn new(consumer: Arc<IggyClient>, producer: Arc<IggyClient>) -> Self {
+ pub fn new(
+ consumer: Arc<IggyClient>,
+ producer: Arc<IggyClient>,
+ permissions: Permissions,
+ ) -> Self {
Self {
tool_router: Self::tool_router(),
consumer,
_producer: producer,
+ permissions,
}
}
@@ -52,11 +60,13 @@ impl IggyService {
&self,
Parameters(GetStream { stream_id }): Parameters<GetStream>,
) -> Result<CallToolResult, ErrorData> {
+ self.permissions.ensure_read()?;
request(self.consumer.get_stream(&id(&stream_id)?).await)
}
#[tool(description = "Get streams")]
pub async fn get_streams(&self) -> Result<CallToolResult, ErrorData> {
+ self.permissions.ensure_read()?;
request(self.consumer.get_streams().await)
}
@@ -65,6 +75,7 @@ impl IggyService {
&self,
Parameters(CreateStream { name, stream_id }): Parameters<CreateStream>,
) -> Result<CallToolResult, ErrorData> {
+ self.permissions.ensure_create()?;
request(self.consumer.create_stream(&name, stream_id).await)
}
@@ -73,6 +84,7 @@ impl IggyService {
&self,
Parameters(UpdateStream { stream_id, name }): Parameters<UpdateStream>,
) -> Result<CallToolResult, ErrorData> {
+ self.permissions.ensure_update()?;
request(self.consumer.update_stream(&id(&stream_id)?, &name).await)
}
@@ -81,6 +93,7 @@ impl IggyService {
&self,
Parameters(DeleteStream { stream_id }): Parameters<DeleteStream>,
) -> Result<CallToolResult, ErrorData> {
+ self.permissions.ensure_delete()?;
request(self.consumer.delete_stream(&id(&stream_id)?).await)
}
@@ -89,6 +102,7 @@ impl IggyService {
&self,
Parameters(PurgeStream { stream_id }): Parameters<PurgeStream>,
) -> Result<CallToolResult, ErrorData> {
+ self.permissions.ensure_delete()?;
request(self.consumer.purge_stream(&id(&stream_id)?).await)
}
}