This is an automated email from the ASF dual-hosted git repository.

piotr pushed a commit to branch connectors_api
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit e15e23a014a897a9a74de28660bab8c89eb21a98
Author: spetz <[email protected]>
AuthorDate: Tue Jun 10 20:31:04 2025 +0200

    add initial version of connectors http api
---
 Cargo.lock                                    |   1 +
 core/connectors/runtime/Cargo.toml            |   1 +
 core/connectors/runtime/config.toml           |   4 +
 core/connectors/runtime/src/api.rs            | 228 ++++++++++++++++++++++++++
 core/connectors/runtime/src/configs.rs        |  19 ++-
 core/connectors/runtime/src/context.rs        |  88 ++++++++++
 core/connectors/runtime/src/error.rs          |  18 ++
 core/connectors/runtime/src/main.rs           |  26 ++-
 core/connectors/runtime/src/manager/mod.rs    |  20 +++
 core/connectors/runtime/src/manager/sink.rs   |  79 +++++++++
 core/connectors/runtime/src/manager/source.rs |  79 +++++++++
 core/connectors/runtime/src/sink.rs           |   6 +
 core/connectors/runtime/src/source.rs         |   6 +
 13 files changed, 561 insertions(+), 14 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 6be9aa34..7e021dd6 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -3857,6 +3857,7 @@ dependencies = [
 name = "iggy_connector_runtime"
 version = "0.1.0"
 dependencies = [
+ "axum 0.8.4",
  "config",
  "dashmap",
  "dlopen2",
diff --git a/core/connectors/runtime/Cargo.toml 
b/core/connectors/runtime/Cargo.toml
index a975d0ae..75121533 100644
--- a/core/connectors/runtime/Cargo.toml
+++ b/core/connectors/runtime/Cargo.toml
@@ -29,6 +29,7 @@ repository = "https://github.com/apache/iggy";
 readme = "../../README.md"
 
 [dependencies]
+axum = { workspace = true }
 config = { workspace = true }
 dashmap = { workspace = true }
 dlopen2 = { workspace = true }
diff --git a/core/connectors/runtime/config.toml 
b/core/connectors/runtime/config.toml
index d7361472..f9d5e14d 100644
--- a/core/connectors/runtime/config.toml
+++ b/core/connectors/runtime/config.toml
@@ -15,6 +15,10 @@
 # specific language governing permissions and limitations
 # under the License.
 
+[http_api]
+enabled = true
+address = "127.0.0.1:8081"
+
 [iggy]
 address = "localhost:8090"
 username = "iggy"
diff --git a/core/connectors/runtime/src/api.rs 
b/core/connectors/runtime/src/api.rs
new file mode 100644
index 00000000..5c72b8f9
--- /dev/null
+++ b/core/connectors/runtime/src/api.rs
@@ -0,0 +1,228 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use std::sync::Arc;
+
+use axum::{
+    Json, Router,
+    extract::{Path, State},
+    http::StatusCode,
+    response::IntoResponse,
+    routing::get,
+};
+use serde::{Deserialize, Serialize};
+use thiserror::Error;
+use tokio::{net::TcpListener, spawn};
+use tracing::{error, info};
+
+use crate::{
+    context::RuntimeContext,
+    error::RuntimeError,
+    manager::{
+        sink::{SinkDetailsResponse, SinkInfoResponse},
+        source::{SourceDetailsResponse, SourceInfoResponse},
+    },
+};
+
+const NAME: &str = env!("CARGO_PKG_NAME");
+
+#[derive(Debug, Clone, Deserialize, Serialize)]
+pub struct HttpApiConfig {
+    pub enabled: bool,
+    pub address: String,
+}
+
+pub async fn init(config: &HttpApiConfig, context: Arc<RuntimeContext>) {
+    if !config.enabled {
+        info!("HTTP API is disabled");
+        return;
+    }
+
+    let address = config.address.to_owned();
+    let app = Router::new()
+        .route("/", get(|| async { "Connector Runtime API" }))
+        .route(
+            "/health",
+            get(|| async { Json(serde_json::json!({ "status": "healthy" })) }),
+        )
+        .route("/sinks", get(get_sinks))
+        .route("/sinks/{key}", get(get_sink))
+        .route("/sinks/{key}/config", get(get_sink_config))
+        .route("/sources", get(get_sources))
+        .route("/sources/{key}", get(get_source))
+        .route("/sources/{key}/config", get(get_source_config))
+        .with_state(context);
+
+    let Ok(listener) = TcpListener::bind(&address).await else {
+        panic!("Failed to bind to HTTP API address: {address}");
+    };
+
+    spawn(async move {
+        info!("Started {NAME} HTTP API on: {address}");
+        if let Err(error) = axum::serve(listener, 
app.into_make_service()).await {
+            panic!("Failed to start {NAME} HTTP API on: {address}. {error}");
+        }
+    });
+}
+
+async fn get_sinks(
+    State(context): State<Arc<RuntimeContext>>,
+) -> Result<Json<Vec<SinkInfoResponse>>, ApiError> {
+    let sinks = context
+        .sinks
+        .get_all()
+        .into_iter()
+        .map(|(key, sink)| SinkInfoResponse {
+            id: sink.id,
+            key: key.to_owned(),
+            name: sink.name.to_owned(),
+            path: sink.path.to_owned(),
+            enabled: sink.enabled,
+            running: sink.running,
+        })
+        .collect::<Vec<_>>();
+    Ok(Json(sinks))
+}
+
+async fn get_sink(
+    State(context): State<Arc<RuntimeContext>>,
+    Path(key): Path<String>,
+) -> Result<Json<SinkDetailsResponse>, ApiError> {
+    let Some(sink) = context.sinks.get(&key) else {
+        return Err(ApiError::Error(RuntimeError::SinkNotFound(key)));
+    };
+    let sink = SinkDetailsResponse {
+        info: SinkInfoResponse {
+            id: sink.info.id,
+            key: sink.key.to_owned(),
+            name: sink.info.name.to_owned(),
+            path: sink.info.path.to_owned(),
+            enabled: sink.info.enabled,
+            running: sink.info.running,
+        },
+    };
+    Ok(Json(sink))
+}
+
+async fn get_sink_config(
+    State(context): State<Arc<RuntimeContext>>,
+    Path(key): Path<String>,
+) -> Result<Json<Option<serde_json::Value>>, ApiError> {
+    let Some(sink) = context.sinks.get(&key) else {
+        return Err(ApiError::Error(RuntimeError::SinkNotFound(key)));
+    };
+    Ok(Json(sink.config.clone()))
+}
+
+async fn get_sources(
+    State(context): State<Arc<RuntimeContext>>,
+) -> Result<Json<Vec<SourceInfoResponse>>, ApiError> {
+    let sources = context
+        .sources
+        .get_all()
+        .into_iter()
+        .map(|(key, source)| SourceInfoResponse {
+            id: source.id,
+            key: key.to_owned(),
+            name: source.name.to_owned(),
+            path: source.path.to_owned(),
+            enabled: source.enabled,
+            running: source.running,
+        })
+        .collect::<Vec<_>>();
+    Ok(Json(sources))
+}
+
+async fn get_source(
+    State(context): State<Arc<RuntimeContext>>,
+    Path(key): Path<String>,
+) -> Result<Json<SourceDetailsResponse>, ApiError> {
+    let Some(source) = context.sources.get(&key) else {
+        return Err(ApiError::Error(RuntimeError::SourceNotFound(key)));
+    };
+    let source = SourceDetailsResponse {
+        info: SourceInfoResponse {
+            id: source.info.id,
+            key: source.key.to_owned(),
+            name: source.info.name.to_owned(),
+            path: source.info.path.to_owned(),
+            enabled: source.info.enabled,
+            running: source.info.running,
+        },
+    };
+    Ok(Json(source))
+}
+
+async fn get_source_config(
+    State(context): State<Arc<RuntimeContext>>,
+    Path(key): Path<String>,
+) -> Result<Json<Option<serde_json::Value>>, ApiError> {
+    let Some(sink) = context.sources.get(&key) else {
+        return Err(ApiError::Error(RuntimeError::SourceNotFound(key)));
+    };
+    Ok(Json(sink.config.clone()))
+}
+
+#[derive(Debug, Error)]
+enum ApiError {
+    #[error(transparent)]
+    Error(#[from] RuntimeError),
+    #[error(transparent)]
+    JsonError(#[from] serde_json::Error),
+}
+
+#[derive(Debug, Serialize)]
+pub struct ErrorResponse {
+    pub code: String,
+    pub reason: String,
+}
+
+impl IntoResponse for ApiError {
+    fn into_response(self) -> axum::response::Response {
+        match self {
+            ApiError::Error(error) => {
+                error!("There was an error: {error}");
+                let status_code = match error {
+                    RuntimeError::MissingIggyCredentials => 
StatusCode::BAD_REQUEST,
+                    RuntimeError::InvalidConfiguration(_) => 
StatusCode::BAD_REQUEST,
+                    RuntimeError::SinkNotFound(_) => StatusCode::NOT_FOUND,
+                    RuntimeError::SourceNotFound(_) => StatusCode::NOT_FOUND,
+                    _ => StatusCode::INTERNAL_SERVER_ERROR,
+                };
+                (
+                    status_code,
+                    Json(ErrorResponse {
+                        code: error.as_code().to_owned(),
+                        reason: error.to_string(),
+                    }),
+                )
+            }
+            ApiError::JsonError(error) => {
+                error!("There was a JSON error: {error}");
+                (
+                    StatusCode::INTERNAL_SERVER_ERROR,
+                    Json(ErrorResponse {
+                        code: "json_error".to_owned(),
+                        reason: error.to_string(),
+                    }),
+                )
+            }
+        }
+        .into_response()
+    }
+}
diff --git a/core/connectors/runtime/src/configs.rs 
b/core/connectors/runtime/src/configs.rs
index aa6d8fa8..b1ecb504 100644
--- a/core/connectors/runtime/src/configs.rs
+++ b/core/connectors/runtime/src/configs.rs
@@ -20,14 +20,17 @@ use iggy_connector_sdk::{Schema, transforms::TransformType};
 use serde::{Deserialize, Serialize};
 use std::collections::HashMap;
 
-#[derive(Debug, Deserialize, Serialize)]
+use crate::api::HttpApiConfig;
+
+#[derive(Debug, Clone, Deserialize, Serialize)]
 pub struct RuntimeConfig {
+    pub http_api: HttpApiConfig,
     pub iggy: IggyConfig,
     pub sinks: HashMap<String, SinkConfig>,
     pub sources: HashMap<String, SourceConfig>,
 }
 
-#[derive(Debug, Serialize, Deserialize)]
+#[derive(Debug, Clone, Serialize, Deserialize)]
 pub struct IggyConfig {
     pub address: String,
     pub username: Option<String>,
@@ -35,7 +38,7 @@ pub struct IggyConfig {
     pub token: Option<String>,
 }
 
-#[derive(Debug, Serialize, Deserialize)]
+#[derive(Debug, Clone, Serialize, Deserialize)]
 pub struct SinkConfig {
     pub enabled: bool,
     pub name: String,
@@ -45,7 +48,7 @@ pub struct SinkConfig {
     pub config: Option<serde_json::Value>,
 }
 
-#[derive(Debug, Serialize, Deserialize)]
+#[derive(Debug, Clone, Serialize, Deserialize)]
 pub struct StreamConsumerConfig {
     pub stream: String,
     pub topics: Vec<String>,
@@ -55,7 +58,7 @@ pub struct StreamConsumerConfig {
     pub consumer_group: Option<String>,
 }
 
-#[derive(Debug, Serialize, Deserialize)]
+#[derive(Debug, Clone, Serialize, Deserialize)]
 pub struct StreamProducerConfig {
     pub stream: String,
     pub topic: String,
@@ -64,7 +67,7 @@ pub struct StreamProducerConfig {
     pub send_interval: Option<String>,
 }
 
-#[derive(Debug, Serialize, Deserialize)]
+#[derive(Debug, Clone, Serialize, Deserialize)]
 pub struct SourceConfig {
     pub enabled: bool,
     pub name: String,
@@ -74,13 +77,13 @@ pub struct SourceConfig {
     pub config: Option<serde_json::Value>,
 }
 
-#[derive(Debug, Default, Serialize, Deserialize)]
+#[derive(Debug, Clone, Default, Serialize, Deserialize)]
 pub struct TransformsConfig {
     #[serde(flatten)]
     pub transforms: HashMap<TransformType, serde_json::Value>,
 }
 
-#[derive(Debug, Default, Serialize, Deserialize)]
+#[derive(Debug, Clone, Default, Serialize, Deserialize)]
 pub struct SharedTransformConfig {
     pub enabled: bool,
 }
diff --git a/core/connectors/runtime/src/context.rs 
b/core/connectors/runtime/src/context.rs
new file mode 100644
index 00000000..c2a3f194
--- /dev/null
+++ b/core/connectors/runtime/src/context.rs
@@ -0,0 +1,88 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::{
+    SinkConnectorWrapper, SourceConnectorWrapper,
+    configs::RuntimeConfig,
+    manager::{
+        sink::{SinkDetails, SinkInfo, SinkManager},
+        source::{SourceDetails, SourceInfo, SourceManager},
+    },
+};
+
+use tracing::error;
+
+pub struct RuntimeContext {
+    pub sinks: SinkManager,
+    pub sources: SourceManager,
+}
+
+pub fn init(
+    config: &RuntimeConfig,
+    sink_wrappers: &[SinkConnectorWrapper],
+    source_wrappers: &[SourceConnectorWrapper],
+) -> RuntimeContext {
+    let mut sinks = vec![];
+    for sink_wrapper in sink_wrappers.iter() {
+        for sink_plugin in sink_wrapper.plugins.iter() {
+            let Some(sink_config) = config.sinks.get(&sink_plugin.key) else {
+                error!("Missing sink config for: {}", sink_plugin.key);
+                continue;
+            };
+
+            sinks.push(SinkDetails {
+                key: sink_plugin.key.to_owned(),
+                info: SinkInfo {
+                    id: sink_plugin.id,
+                    name: sink_plugin.name.to_owned(),
+                    path: sink_plugin.path.to_owned(),
+                    enabled: sink_config.enabled,
+                    running: true,
+                },
+                config: sink_config.config.clone(),
+            });
+        }
+    }
+
+    let mut sources = vec![];
+    for source_wrapper in source_wrappers.iter() {
+        for source_plugin in source_wrapper.plugins.iter() {
+            let Some(source_config) = config.sources.get(&source_plugin.key) 
else {
+                error!("Missing source config for: {}", source_plugin.key);
+                continue;
+            };
+
+            sources.push(SourceDetails {
+                key: source_plugin.key.to_owned(),
+                info: SourceInfo {
+                    id: source_plugin.id,
+                    name: source_plugin.name.to_owned(),
+                    path: source_plugin.path.to_owned(),
+                    enabled: source_config.enabled,
+                    running: true,
+                },
+                config: source_config.config.clone(),
+            });
+        }
+    }
+
+    RuntimeContext {
+        sinks: SinkManager::new(sinks),
+        sources: SourceManager::new(sources),
+    }
+}
diff --git a/core/connectors/runtime/src/error.rs 
b/core/connectors/runtime/src/error.rs
index 702353e8..8d26ef0f 100644
--- a/core/connectors/runtime/src/error.rs
+++ b/core/connectors/runtime/src/error.rs
@@ -38,4 +38,22 @@ pub enum RuntimeError {
     IggyError(#[from] iggy::prelude::IggyError),
     #[error("Missing Iggy credentials")]
     MissingIggyCredentials,
+    #[error("JSON error")]
+    JsonError(#[from] serde_json::Error),
+    #[error("Sink not found with key: {0}")]
+    SinkNotFound(String),
+    #[error("Source not found with key: {0}")]
+    SourceNotFound(String),
+}
+
+impl RuntimeError {
+    pub fn as_code(&self) -> &'static str {
+        match self {
+            RuntimeError::SinkNotFound(_) => "sink_not_found",
+            RuntimeError::SourceNotFound(_) => "source_not_found",
+            RuntimeError::MissingIggyCredentials => "invalid_configuration",
+            RuntimeError::InvalidConfiguration(_) => "invalid_configuration",
+            _ => "error",
+        }
+    }
 }
diff --git a/core/connectors/runtime/src/main.rs 
b/core/connectors/runtime/src/main.rs
index f87057bf..f3737c93 100644
--- a/core/connectors/runtime/src/main.rs
+++ b/core/connectors/runtime/src/main.rs
@@ -36,8 +36,11 @@ use std::{
 use tracing::{debug, error, info};
 use tracing_subscriber::{EnvFilter, Registry, layer::SubscriberExt, 
util::SubscriberInitExt};
 
+mod api;
 pub(crate) mod configs;
+pub(crate) mod context;
 pub(crate) mod error;
+mod manager;
 mod sink;
 mod source;
 mod transform;
@@ -91,10 +94,11 @@ async fn main() -> Result<(), RuntimeError> {
         .try_deserialize()
         .expect("Failed to deserialize runtime config");
 
-    let iggy_address = config.iggy.address;
-    let iggy_username = config.iggy.username;
-    let iggy_password = config.iggy.password;
-    let iggy_token = config.iggy.token;
+    let iggy_config = config.iggy.clone();
+    let iggy_address = iggy_config.address;
+    let iggy_username = iggy_config.username;
+    let iggy_password = iggy_config.password;
+    let iggy_token = iggy_config.token;
     let consumer_client = create_iggy_client(
         &iggy_address,
         iggy_username.as_deref(),
@@ -112,8 +116,8 @@ async fn main() -> Result<(), RuntimeError> {
     .await?;
     producer_client.connect().await?;
 
-    let sources = source::init(config.sources, &producer_client).await?;
-    let sinks = sink::init(config.sinks, &consumer_client).await?;
+    let sources = source::init(config.sources.clone(), 
&producer_client).await?;
+    let sinks = sink::init(config.sinks.clone(), &consumer_client).await?;
 
     let mut sink_wrappers = vec![];
     let mut sink_with_plugins = HashMap::new();
@@ -149,6 +153,10 @@ async fn main() -> Result<(), RuntimeError> {
         );
     }
 
+    let context = context::init(&config, &sink_wrappers, &source_wrappers);
+    let context = Arc::new(context);
+    api::init(&config.http_api, context).await;
+
     source::handle(source_wrappers);
     sink::consume(sink_wrappers);
     info!("All sources and sinks spawned.");
@@ -261,6 +269,9 @@ struct SinkConnector {
 
 struct SinkConnectorPlugin {
     id: u32,
+    key: String,
+    name: String,
+    path: String,
     consumers: Vec<SinkConnectorConsumer>,
 }
 
@@ -288,6 +299,9 @@ struct SourceConnector {
 
 struct SourceConnectorPlugin {
     id: u32,
+    key: String,
+    name: String,
+    path: String,
     transforms: Vec<Arc<dyn Transform>>,
     producer: Option<SourceConnectorProducer>,
 }
diff --git a/core/connectors/runtime/src/manager/mod.rs 
b/core/connectors/runtime/src/manager/mod.rs
new file mode 100644
index 00000000..8efc27b1
--- /dev/null
+++ b/core/connectors/runtime/src/manager/mod.rs
@@ -0,0 +1,20 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+pub mod sink;
+pub mod source;
diff --git a/core/connectors/runtime/src/manager/sink.rs 
b/core/connectors/runtime/src/manager/sink.rs
new file mode 100644
index 00000000..db950e45
--- /dev/null
+++ b/core/connectors/runtime/src/manager/sink.rs
@@ -0,0 +1,79 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use serde::{Deserialize, Serialize};
+use std::collections::HashMap;
+
+#[derive(Debug)]
+pub struct SinkManager {
+    sinks: HashMap<String, SinkDetails>,
+}
+
+impl SinkManager {
+    pub fn new(sinks: Vec<SinkDetails>) -> Self {
+        Self {
+            sinks: sinks
+                .into_iter()
+                .map(|sink| (sink.key.to_owned(), sink))
+                .collect(),
+        }
+    }
+
+    pub fn get(&self, key: &str) -> Option<&SinkDetails> {
+        self.sinks.get(key)
+    }
+
+    pub fn get_all(&self) -> HashMap<&String, &SinkInfo> {
+        self.sinks
+            .iter()
+            .map(|(key, sink)| (key, &sink.info))
+            .collect()
+    }
+}
+
+#[derive(Debug)]
+pub struct SinkDetails {
+    pub key: String,
+    pub info: SinkInfo,
+    pub config: Option<serde_json::Value>,
+}
+
+#[derive(Debug)]
+pub struct SinkInfo {
+    pub id: u32,
+    pub name: String,
+    pub path: String,
+    pub enabled: bool,
+    pub running: bool,
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+pub struct SinkInfoResponse {
+    pub id: u32,
+    pub key: String,
+    pub name: String,
+    pub path: String,
+    pub enabled: bool,
+    pub running: bool,
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+pub struct SinkDetailsResponse {
+    #[serde(flatten)]
+    pub info: SinkInfoResponse,
+}
diff --git a/core/connectors/runtime/src/manager/source.rs 
b/core/connectors/runtime/src/manager/source.rs
new file mode 100644
index 00000000..6ef774d8
--- /dev/null
+++ b/core/connectors/runtime/src/manager/source.rs
@@ -0,0 +1,79 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use serde::{Deserialize, Serialize};
+use std::collections::HashMap;
+
+#[derive(Debug)]
+pub struct SourceManager {
+    sources: HashMap<String, SourceDetails>,
+}
+
+impl SourceManager {
+    pub fn new(sources: Vec<SourceDetails>) -> Self {
+        Self {
+            sources: sources
+                .into_iter()
+                .map(|source| (source.key.to_owned(), source))
+                .collect(),
+        }
+    }
+
+    pub fn get(&self, key: &str) -> Option<&SourceDetails> {
+        self.sources.get(key)
+    }
+
+    pub fn get_all(&self) -> HashMap<&String, &SourceInfo> {
+        self.sources
+            .iter()
+            .map(|(key, source)| (key, &source.info))
+            .collect()
+    }
+}
+
+#[derive(Debug)]
+pub struct SourceDetails {
+    pub key: String,
+    pub info: SourceInfo,
+    pub config: Option<serde_json::Value>,
+}
+
+#[derive(Debug)]
+pub struct SourceInfo {
+    pub id: u32,
+    pub name: String,
+    pub path: String,
+    pub enabled: bool,
+    pub running: bool,
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+pub struct SourceInfoResponse {
+    pub id: u32,
+    pub key: String,
+    pub name: String,
+    pub path: String,
+    pub enabled: bool,
+    pub running: bool,
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+pub struct SourceDetailsResponse {
+    #[serde(flatten)]
+    pub info: SourceInfoResponse,
+}
diff --git a/core/connectors/runtime/src/sink.rs 
b/core/connectors/runtime/src/sink.rs
index 17b014eb..6f6946a7 100644
--- a/core/connectors/runtime/src/sink.rs
+++ b/core/connectors/runtime/src/sink.rs
@@ -63,6 +63,9 @@ pub async fn init(
             );
             container.plugins.push(SinkConnectorPlugin {
                 id: plugin_id,
+                key: key.to_owned(),
+                name: name.to_owned(),
+                path: path.to_owned(),
                 consumers: vec![],
             });
         } else {
@@ -76,6 +79,9 @@ pub async fn init(
                     container,
                     plugins: vec![SinkConnectorPlugin {
                         id: plugin_id,
+                        key: key.to_owned(),
+                        name: name.to_owned(),
+                        path: path.to_owned(),
                         consumers: vec![],
                     }],
                 },
diff --git a/core/connectors/runtime/src/source.rs 
b/core/connectors/runtime/src/source.rs
index 9cde82ca..bdc00123 100644
--- a/core/connectors/runtime/src/source.rs
+++ b/core/connectors/runtime/src/source.rs
@@ -63,6 +63,9 @@ pub async fn init(
             );
             container.plugins.push(SourceConnectorPlugin {
                 id: plugin_id,
+                key: key.to_owned(),
+                name: name.to_owned(),
+                path: path.to_owned(),
                 producer: None,
                 transforms: vec![],
             });
@@ -77,6 +80,9 @@ pub async fn init(
                     container,
                     plugins: vec![SourceConnectorPlugin {
                         id: plugin_id,
+                        key: key.to_owned(),
+                        name: name.to_owned(),
+                        path: path.to_owned(),
                         producer: None,
                         transforms: vec![],
                     }],

Reply via email to