This is an automated email from the ASF dual-hosted git repository. piotr pushed a commit to branch connectors_api in repository https://gitbox.apache.org/repos/asf/iggy.git
commit e15e23a014a897a9a74de28660bab8c89eb21a98 Author: spetz <[email protected]> AuthorDate: Tue Jun 10 20:31:04 2025 +0200 add initial version of connectors http api --- Cargo.lock | 1 + core/connectors/runtime/Cargo.toml | 1 + core/connectors/runtime/config.toml | 4 + core/connectors/runtime/src/api.rs | 228 ++++++++++++++++++++++++++ core/connectors/runtime/src/configs.rs | 19 ++- core/connectors/runtime/src/context.rs | 88 ++++++++++ core/connectors/runtime/src/error.rs | 18 ++ core/connectors/runtime/src/main.rs | 26 ++- core/connectors/runtime/src/manager/mod.rs | 20 +++ core/connectors/runtime/src/manager/sink.rs | 79 +++++++++ core/connectors/runtime/src/manager/source.rs | 79 +++++++++ core/connectors/runtime/src/sink.rs | 6 + core/connectors/runtime/src/source.rs | 6 + 13 files changed, 561 insertions(+), 14 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6be9aa34..7e021dd6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3857,6 +3857,7 @@ dependencies = [ name = "iggy_connector_runtime" version = "0.1.0" dependencies = [ + "axum 0.8.4", "config", "dashmap", "dlopen2", diff --git a/core/connectors/runtime/Cargo.toml b/core/connectors/runtime/Cargo.toml index a975d0ae..75121533 100644 --- a/core/connectors/runtime/Cargo.toml +++ b/core/connectors/runtime/Cargo.toml @@ -29,6 +29,7 @@ repository = "https://github.com/apache/iggy" readme = "../../README.md" [dependencies] +axum = { workspace = true } config = { workspace = true } dashmap = { workspace = true } dlopen2 = { workspace = true } diff --git a/core/connectors/runtime/config.toml b/core/connectors/runtime/config.toml index d7361472..f9d5e14d 100644 --- a/core/connectors/runtime/config.toml +++ b/core/connectors/runtime/config.toml @@ -15,6 +15,10 @@ # specific language governing permissions and limitations # under the License. +[http_api] +enabled = true +address = "127.0.0.1:8081" + [iggy] address = "localhost:8090" username = "iggy" diff --git a/core/connectors/runtime/src/api.rs b/core/connectors/runtime/src/api.rs new file mode 100644 index 00000000..5c72b8f9 --- /dev/null +++ b/core/connectors/runtime/src/api.rs @@ -0,0 +1,228 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use std::sync::Arc; + +use axum::{ + Json, Router, + extract::{Path, State}, + http::StatusCode, + response::IntoResponse, + routing::get, +}; +use serde::{Deserialize, Serialize}; +use thiserror::Error; +use tokio::{net::TcpListener, spawn}; +use tracing::{error, info}; + +use crate::{ + context::RuntimeContext, + error::RuntimeError, + manager::{ + sink::{SinkDetailsResponse, SinkInfoResponse}, + source::{SourceDetailsResponse, SourceInfoResponse}, + }, +}; + +const NAME: &str = env!("CARGO_PKG_NAME"); + +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct HttpApiConfig { + pub enabled: bool, + pub address: String, +} + +pub async fn init(config: &HttpApiConfig, context: Arc<RuntimeContext>) { + if !config.enabled { + info!("HTTP API is disabled"); + return; + } + + let address = config.address.to_owned(); + let app = Router::new() + .route("/", get(|| async { "Connector Runtime API" })) + .route( + "/health", + get(|| async { Json(serde_json::json!({ "status": "healthy" })) }), + ) + .route("/sinks", get(get_sinks)) + .route("/sinks/{key}", get(get_sink)) + .route("/sinks/{key}/config", get(get_sink_config)) + .route("/sources", get(get_sources)) + .route("/sources/{key}", get(get_source)) + .route("/sources/{key}/config", get(get_source_config)) + .with_state(context); + + let Ok(listener) = TcpListener::bind(&address).await else { + panic!("Failed to bind to HTTP API address: {address}"); + }; + + spawn(async move { + info!("Started {NAME} HTTP API on: {address}"); + if let Err(error) = axum::serve(listener, app.into_make_service()).await { + panic!("Failed to start {NAME} HTTP API on: {address}. {error}"); + } + }); +} + +async fn get_sinks( + State(context): State<Arc<RuntimeContext>>, +) -> Result<Json<Vec<SinkInfoResponse>>, ApiError> { + let sinks = context + .sinks + .get_all() + .into_iter() + .map(|(key, sink)| SinkInfoResponse { + id: sink.id, + key: key.to_owned(), + name: sink.name.to_owned(), + path: sink.path.to_owned(), + enabled: sink.enabled, + running: sink.running, + }) + .collect::<Vec<_>>(); + Ok(Json(sinks)) +} + +async fn get_sink( + State(context): State<Arc<RuntimeContext>>, + Path(key): Path<String>, +) -> Result<Json<SinkDetailsResponse>, ApiError> { + let Some(sink) = context.sinks.get(&key) else { + return Err(ApiError::Error(RuntimeError::SinkNotFound(key))); + }; + let sink = SinkDetailsResponse { + info: SinkInfoResponse { + id: sink.info.id, + key: sink.key.to_owned(), + name: sink.info.name.to_owned(), + path: sink.info.path.to_owned(), + enabled: sink.info.enabled, + running: sink.info.running, + }, + }; + Ok(Json(sink)) +} + +async fn get_sink_config( + State(context): State<Arc<RuntimeContext>>, + Path(key): Path<String>, +) -> Result<Json<Option<serde_json::Value>>, ApiError> { + let Some(sink) = context.sinks.get(&key) else { + return Err(ApiError::Error(RuntimeError::SinkNotFound(key))); + }; + Ok(Json(sink.config.clone())) +} + +async fn get_sources( + State(context): State<Arc<RuntimeContext>>, +) -> Result<Json<Vec<SourceInfoResponse>>, ApiError> { + let sources = context + .sources + .get_all() + .into_iter() + .map(|(key, source)| SourceInfoResponse { + id: source.id, + key: key.to_owned(), + name: source.name.to_owned(), + path: source.path.to_owned(), + enabled: source.enabled, + running: source.running, + }) + .collect::<Vec<_>>(); + Ok(Json(sources)) +} + +async fn get_source( + State(context): State<Arc<RuntimeContext>>, + Path(key): Path<String>, +) -> Result<Json<SourceDetailsResponse>, ApiError> { + let Some(source) = context.sources.get(&key) else { + return Err(ApiError::Error(RuntimeError::SourceNotFound(key))); + }; + let source = SourceDetailsResponse { + info: SourceInfoResponse { + id: source.info.id, + key: source.key.to_owned(), + name: source.info.name.to_owned(), + path: source.info.path.to_owned(), + enabled: source.info.enabled, + running: source.info.running, + }, + }; + Ok(Json(source)) +} + +async fn get_source_config( + State(context): State<Arc<RuntimeContext>>, + Path(key): Path<String>, +) -> Result<Json<Option<serde_json::Value>>, ApiError> { + let Some(sink) = context.sources.get(&key) else { + return Err(ApiError::Error(RuntimeError::SourceNotFound(key))); + }; + Ok(Json(sink.config.clone())) +} + +#[derive(Debug, Error)] +enum ApiError { + #[error(transparent)] + Error(#[from] RuntimeError), + #[error(transparent)] + JsonError(#[from] serde_json::Error), +} + +#[derive(Debug, Serialize)] +pub struct ErrorResponse { + pub code: String, + pub reason: String, +} + +impl IntoResponse for ApiError { + fn into_response(self) -> axum::response::Response { + match self { + ApiError::Error(error) => { + error!("There was an error: {error}"); + let status_code = match error { + RuntimeError::MissingIggyCredentials => StatusCode::BAD_REQUEST, + RuntimeError::InvalidConfiguration(_) => StatusCode::BAD_REQUEST, + RuntimeError::SinkNotFound(_) => StatusCode::NOT_FOUND, + RuntimeError::SourceNotFound(_) => StatusCode::NOT_FOUND, + _ => StatusCode::INTERNAL_SERVER_ERROR, + }; + ( + status_code, + Json(ErrorResponse { + code: error.as_code().to_owned(), + reason: error.to_string(), + }), + ) + } + ApiError::JsonError(error) => { + error!("There was a JSON error: {error}"); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ErrorResponse { + code: "json_error".to_owned(), + reason: error.to_string(), + }), + ) + } + } + .into_response() + } +} diff --git a/core/connectors/runtime/src/configs.rs b/core/connectors/runtime/src/configs.rs index aa6d8fa8..b1ecb504 100644 --- a/core/connectors/runtime/src/configs.rs +++ b/core/connectors/runtime/src/configs.rs @@ -20,14 +20,17 @@ use iggy_connector_sdk::{Schema, transforms::TransformType}; use serde::{Deserialize, Serialize}; use std::collections::HashMap; -#[derive(Debug, Deserialize, Serialize)] +use crate::api::HttpApiConfig; + +#[derive(Debug, Clone, Deserialize, Serialize)] pub struct RuntimeConfig { + pub http_api: HttpApiConfig, pub iggy: IggyConfig, pub sinks: HashMap<String, SinkConfig>, pub sources: HashMap<String, SourceConfig>, } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct IggyConfig { pub address: String, pub username: Option<String>, @@ -35,7 +38,7 @@ pub struct IggyConfig { pub token: Option<String>, } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct SinkConfig { pub enabled: bool, pub name: String, @@ -45,7 +48,7 @@ pub struct SinkConfig { pub config: Option<serde_json::Value>, } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct StreamConsumerConfig { pub stream: String, pub topics: Vec<String>, @@ -55,7 +58,7 @@ pub struct StreamConsumerConfig { pub consumer_group: Option<String>, } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct StreamProducerConfig { pub stream: String, pub topic: String, @@ -64,7 +67,7 @@ pub struct StreamProducerConfig { pub send_interval: Option<String>, } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct SourceConfig { pub enabled: bool, pub name: String, @@ -74,13 +77,13 @@ pub struct SourceConfig { pub config: Option<serde_json::Value>, } -#[derive(Debug, Default, Serialize, Deserialize)] +#[derive(Debug, Clone, Default, Serialize, Deserialize)] pub struct TransformsConfig { #[serde(flatten)] pub transforms: HashMap<TransformType, serde_json::Value>, } -#[derive(Debug, Default, Serialize, Deserialize)] +#[derive(Debug, Clone, Default, Serialize, Deserialize)] pub struct SharedTransformConfig { pub enabled: bool, } diff --git a/core/connectors/runtime/src/context.rs b/core/connectors/runtime/src/context.rs new file mode 100644 index 00000000..c2a3f194 --- /dev/null +++ b/core/connectors/runtime/src/context.rs @@ -0,0 +1,88 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use crate::{ + SinkConnectorWrapper, SourceConnectorWrapper, + configs::RuntimeConfig, + manager::{ + sink::{SinkDetails, SinkInfo, SinkManager}, + source::{SourceDetails, SourceInfo, SourceManager}, + }, +}; + +use tracing::error; + +pub struct RuntimeContext { + pub sinks: SinkManager, + pub sources: SourceManager, +} + +pub fn init( + config: &RuntimeConfig, + sink_wrappers: &[SinkConnectorWrapper], + source_wrappers: &[SourceConnectorWrapper], +) -> RuntimeContext { + let mut sinks = vec![]; + for sink_wrapper in sink_wrappers.iter() { + for sink_plugin in sink_wrapper.plugins.iter() { + let Some(sink_config) = config.sinks.get(&sink_plugin.key) else { + error!("Missing sink config for: {}", sink_plugin.key); + continue; + }; + + sinks.push(SinkDetails { + key: sink_plugin.key.to_owned(), + info: SinkInfo { + id: sink_plugin.id, + name: sink_plugin.name.to_owned(), + path: sink_plugin.path.to_owned(), + enabled: sink_config.enabled, + running: true, + }, + config: sink_config.config.clone(), + }); + } + } + + let mut sources = vec![]; + for source_wrapper in source_wrappers.iter() { + for source_plugin in source_wrapper.plugins.iter() { + let Some(source_config) = config.sources.get(&source_plugin.key) else { + error!("Missing source config for: {}", source_plugin.key); + continue; + }; + + sources.push(SourceDetails { + key: source_plugin.key.to_owned(), + info: SourceInfo { + id: source_plugin.id, + name: source_plugin.name.to_owned(), + path: source_plugin.path.to_owned(), + enabled: source_config.enabled, + running: true, + }, + config: source_config.config.clone(), + }); + } + } + + RuntimeContext { + sinks: SinkManager::new(sinks), + sources: SourceManager::new(sources), + } +} diff --git a/core/connectors/runtime/src/error.rs b/core/connectors/runtime/src/error.rs index 702353e8..8d26ef0f 100644 --- a/core/connectors/runtime/src/error.rs +++ b/core/connectors/runtime/src/error.rs @@ -38,4 +38,22 @@ pub enum RuntimeError { IggyError(#[from] iggy::prelude::IggyError), #[error("Missing Iggy credentials")] MissingIggyCredentials, + #[error("JSON error")] + JsonError(#[from] serde_json::Error), + #[error("Sink not found with key: {0}")] + SinkNotFound(String), + #[error("Source not found with key: {0}")] + SourceNotFound(String), +} + +impl RuntimeError { + pub fn as_code(&self) -> &'static str { + match self { + RuntimeError::SinkNotFound(_) => "sink_not_found", + RuntimeError::SourceNotFound(_) => "source_not_found", + RuntimeError::MissingIggyCredentials => "invalid_configuration", + RuntimeError::InvalidConfiguration(_) => "invalid_configuration", + _ => "error", + } + } } diff --git a/core/connectors/runtime/src/main.rs b/core/connectors/runtime/src/main.rs index f87057bf..f3737c93 100644 --- a/core/connectors/runtime/src/main.rs +++ b/core/connectors/runtime/src/main.rs @@ -36,8 +36,11 @@ use std::{ use tracing::{debug, error, info}; use tracing_subscriber::{EnvFilter, Registry, layer::SubscriberExt, util::SubscriberInitExt}; +mod api; pub(crate) mod configs; +pub(crate) mod context; pub(crate) mod error; +mod manager; mod sink; mod source; mod transform; @@ -91,10 +94,11 @@ async fn main() -> Result<(), RuntimeError> { .try_deserialize() .expect("Failed to deserialize runtime config"); - let iggy_address = config.iggy.address; - let iggy_username = config.iggy.username; - let iggy_password = config.iggy.password; - let iggy_token = config.iggy.token; + let iggy_config = config.iggy.clone(); + let iggy_address = iggy_config.address; + let iggy_username = iggy_config.username; + let iggy_password = iggy_config.password; + let iggy_token = iggy_config.token; let consumer_client = create_iggy_client( &iggy_address, iggy_username.as_deref(), @@ -112,8 +116,8 @@ async fn main() -> Result<(), RuntimeError> { .await?; producer_client.connect().await?; - let sources = source::init(config.sources, &producer_client).await?; - let sinks = sink::init(config.sinks, &consumer_client).await?; + let sources = source::init(config.sources.clone(), &producer_client).await?; + let sinks = sink::init(config.sinks.clone(), &consumer_client).await?; let mut sink_wrappers = vec![]; let mut sink_with_plugins = HashMap::new(); @@ -149,6 +153,10 @@ async fn main() -> Result<(), RuntimeError> { ); } + let context = context::init(&config, &sink_wrappers, &source_wrappers); + let context = Arc::new(context); + api::init(&config.http_api, context).await; + source::handle(source_wrappers); sink::consume(sink_wrappers); info!("All sources and sinks spawned."); @@ -261,6 +269,9 @@ struct SinkConnector { struct SinkConnectorPlugin { id: u32, + key: String, + name: String, + path: String, consumers: Vec<SinkConnectorConsumer>, } @@ -288,6 +299,9 @@ struct SourceConnector { struct SourceConnectorPlugin { id: u32, + key: String, + name: String, + path: String, transforms: Vec<Arc<dyn Transform>>, producer: Option<SourceConnectorProducer>, } diff --git a/core/connectors/runtime/src/manager/mod.rs b/core/connectors/runtime/src/manager/mod.rs new file mode 100644 index 00000000..8efc27b1 --- /dev/null +++ b/core/connectors/runtime/src/manager/mod.rs @@ -0,0 +1,20 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +pub mod sink; +pub mod source; diff --git a/core/connectors/runtime/src/manager/sink.rs b/core/connectors/runtime/src/manager/sink.rs new file mode 100644 index 00000000..db950e45 --- /dev/null +++ b/core/connectors/runtime/src/manager/sink.rs @@ -0,0 +1,79 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; + +#[derive(Debug)] +pub struct SinkManager { + sinks: HashMap<String, SinkDetails>, +} + +impl SinkManager { + pub fn new(sinks: Vec<SinkDetails>) -> Self { + Self { + sinks: sinks + .into_iter() + .map(|sink| (sink.key.to_owned(), sink)) + .collect(), + } + } + + pub fn get(&self, key: &str) -> Option<&SinkDetails> { + self.sinks.get(key) + } + + pub fn get_all(&self) -> HashMap<&String, &SinkInfo> { + self.sinks + .iter() + .map(|(key, sink)| (key, &sink.info)) + .collect() + } +} + +#[derive(Debug)] +pub struct SinkDetails { + pub key: String, + pub info: SinkInfo, + pub config: Option<serde_json::Value>, +} + +#[derive(Debug)] +pub struct SinkInfo { + pub id: u32, + pub name: String, + pub path: String, + pub enabled: bool, + pub running: bool, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct SinkInfoResponse { + pub id: u32, + pub key: String, + pub name: String, + pub path: String, + pub enabled: bool, + pub running: bool, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct SinkDetailsResponse { + #[serde(flatten)] + pub info: SinkInfoResponse, +} diff --git a/core/connectors/runtime/src/manager/source.rs b/core/connectors/runtime/src/manager/source.rs new file mode 100644 index 00000000..6ef774d8 --- /dev/null +++ b/core/connectors/runtime/src/manager/source.rs @@ -0,0 +1,79 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; + +#[derive(Debug)] +pub struct SourceManager { + sources: HashMap<String, SourceDetails>, +} + +impl SourceManager { + pub fn new(sources: Vec<SourceDetails>) -> Self { + Self { + sources: sources + .into_iter() + .map(|source| (source.key.to_owned(), source)) + .collect(), + } + } + + pub fn get(&self, key: &str) -> Option<&SourceDetails> { + self.sources.get(key) + } + + pub fn get_all(&self) -> HashMap<&String, &SourceInfo> { + self.sources + .iter() + .map(|(key, source)| (key, &source.info)) + .collect() + } +} + +#[derive(Debug)] +pub struct SourceDetails { + pub key: String, + pub info: SourceInfo, + pub config: Option<serde_json::Value>, +} + +#[derive(Debug)] +pub struct SourceInfo { + pub id: u32, + pub name: String, + pub path: String, + pub enabled: bool, + pub running: bool, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct SourceInfoResponse { + pub id: u32, + pub key: String, + pub name: String, + pub path: String, + pub enabled: bool, + pub running: bool, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct SourceDetailsResponse { + #[serde(flatten)] + pub info: SourceInfoResponse, +} diff --git a/core/connectors/runtime/src/sink.rs b/core/connectors/runtime/src/sink.rs index 17b014eb..6f6946a7 100644 --- a/core/connectors/runtime/src/sink.rs +++ b/core/connectors/runtime/src/sink.rs @@ -63,6 +63,9 @@ pub async fn init( ); container.plugins.push(SinkConnectorPlugin { id: plugin_id, + key: key.to_owned(), + name: name.to_owned(), + path: path.to_owned(), consumers: vec![], }); } else { @@ -76,6 +79,9 @@ pub async fn init( container, plugins: vec![SinkConnectorPlugin { id: plugin_id, + key: key.to_owned(), + name: name.to_owned(), + path: path.to_owned(), consumers: vec![], }], }, diff --git a/core/connectors/runtime/src/source.rs b/core/connectors/runtime/src/source.rs index 9cde82ca..bdc00123 100644 --- a/core/connectors/runtime/src/source.rs +++ b/core/connectors/runtime/src/source.rs @@ -63,6 +63,9 @@ pub async fn init( ); container.plugins.push(SourceConnectorPlugin { id: plugin_id, + key: key.to_owned(), + name: name.to_owned(), + path: path.to_owned(), producer: None, transforms: vec![], }); @@ -77,6 +80,9 @@ pub async fn init( container, plugins: vec![SourceConnectorPlugin { id: plugin_id, + key: key.to_owned(), + name: name.to_owned(), + path: path.to_owned(), producer: None, transforms: vec![], }],
