This is an automated email from the ASF dual-hosted git repository.

piotr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/master by this push:
     new 8cb79069d feat(connectors): extend connectors' status reporting (#2434)
8cb79069d is described below

commit 8cb79069d8d9d03f714792b421814db50e5b79ce
Author: Maciej Modzelewski <[email protected]>
AuthorDate: Tue Dec 2 14:56:20 2025 +0100

    feat(connectors): extend connectors' status reporting (#2434)
---
 Cargo.lock                                    | 26 +++++----
 Cargo.toml                                    |  2 +-
 DEPENDENCIES.md                               |  9 +--
 core/ai/mcp/Cargo.toml                        |  2 +-
 core/connectors/runtime/src/api/models.rs     | 20 +++++--
 core/connectors/runtime/src/context.rs        | 30 +++++++++-
 core/connectors/runtime/src/main.rs           |  8 ++-
 core/connectors/runtime/src/manager/mod.rs    |  4 +-
 core/connectors/runtime/src/manager/sink.rs   | 22 ++++++-
 core/connectors/runtime/src/manager/source.rs | 22 ++++++-
 core/connectors/runtime/src/manager/status.rs | 65 +++++++++++++++++++++
 core/connectors/runtime/src/sink.rs           | 70 ++++++++++++++++++-----
 core/connectors/runtime/src/source.rs         | 82 ++++++++++++++++++++++-----
 core/integration/Cargo.toml                   |  2 +-
 14 files changed, 307 insertions(+), 57 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 9a1814068..1998d3f78 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -6631,6 +6631,12 @@ version = "1.0.15"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "57c0d7b74b563b49d38dae00a0c37d4d6de9b432382b2892f0574ddcae73fd0a"
 
+[[package]]
+name = "pastey"
+version = "0.2.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "57d6c094ee800037dff99e02cab0eaf3142826586742a270ab3d7a62656bd27a"
+
 [[package]]
 name = "pbkdf2"
 version = "0.12.2"
@@ -7098,9 +7104,9 @@ dependencies = [
 
 [[package]]
 name = "prost-reflect"
-version = "0.16.2"
+version = "0.16.3"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "89a3ac73ec9a9118131a4594c9d336631a07852220a1d0ae03ee36b04503a063"
+checksum = "b89455ef41ed200cafc47c76c552ee7792370ac420497e551f16123a9135f76e"
 dependencies = [
  "logos",
  "miette",
@@ -7658,9 +7664,9 @@ checksum = 
"3582f63211428f83597b51b2ddb88e2a91a9d52d12831f9d08f5e624e8977422"
 
 [[package]]
 name = "rmcp"
-version = "0.9.1"
+version = "0.10.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "eaa07b85b779d1e1df52dd79f6c6bffbe005b191f07290136cc42a142da3409a"
+checksum = "38b18323edc657390a6ed4d7a9110b0dec2dc3ed128eb2a123edfbafabdbddc5"
 dependencies = [
  "async-trait",
  "axum",
@@ -7671,7 +7677,7 @@ dependencies = [
  "http 1.4.0",
  "http-body",
  "http-body-util",
- "paste",
+ "pastey",
  "pin-project-lite",
  "rand 0.9.2",
  "reqwest",
@@ -7691,9 +7697,9 @@ dependencies = [
 
 [[package]]
 name = "rmcp-macros"
-version = "0.9.1"
+version = "0.10.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "0f6fa09933cac0d0204c8a5d647f558425538ed6a0134b1ebb1ae4dc00c96db3"
+checksum = "c75d0a62676bf8c8003c4e3c348e2ceb6a7b3e48323681aaf177fdccdac2ce50"
 dependencies = [
  "darling 0.21.3",
  "proc-macro2",
@@ -9922,14 +9928,14 @@ checksum = 
"06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821"
 
 [[package]]
 name = "uuid"
-version = "1.18.1"
+version = "1.19.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "2f87b8aa10b915a06587d0dec516c282ff295b475d94abf425d62b57710070a2"
+checksum = "e2e054861b4bd027cd373e18e8d8d8e6548085000e41290d95ce0c373a654b4a"
 dependencies = [
  "getrandom 0.3.4",
  "js-sys",
  "rand 0.9.2",
- "serde",
+ "serde_core",
  "wasm-bindgen",
  "zerocopy",
 ]
diff --git a/Cargo.toml b/Cargo.toml
index 800ae36ac..355be9d16 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -196,7 +196,7 @@ tracing-subscriber = { version = "0.3.22", default-features 
= false, features =
 trait-variant = "0.1.2"
 tungstenite = "0.28.0"
 twox-hash = { version = "2.1.2", features = ["xxhash32"] }
-uuid = { version = "1.18.1", features = [
+uuid = { version = "1.19.0", features = [
     "v4",
     "v7",
     "fast-rng",
diff --git a/DEPENDENCIES.md b/DEPENDENCIES.md
index f9ba6516f..239ffed5a 100644
--- a/DEPENDENCIES.md
+++ b/DEPENDENCIES.md
@@ -577,6 +577,7 @@ parse-display-derive: 0.9.1, "Apache-2.0 OR MIT",
 passterm: 2.0.1, "BSD-3-Clause",
 password-hash: 0.5.0, "Apache-2.0 OR MIT",
 paste: 1.0.15, "Apache-2.0 OR MIT",
+pastey: 0.2.0, "Apache-2.0 OR MIT",
 pbkdf2: 0.12.2, "Apache-2.0 OR MIT",
 pear: 0.2.9, "Apache-2.0 OR MIT",
 pear_codegen: 0.2.9, "Apache-2.0 OR MIT",
@@ -624,7 +625,7 @@ prometheus-client: 0.24.0, "Apache-2.0 OR MIT",
 prometheus-client-derive-encode: 0.5.0, "Apache-2.0 OR MIT",
 prost: 0.14.1, "Apache-2.0",
 prost-derive: 0.14.1, "Apache-2.0",
-prost-reflect: 0.16.2, "Apache-2.0 OR MIT",
+prost-reflect: 0.16.3, "Apache-2.0 OR MIT",
 prost-types: 0.14.1, "Apache-2.0",
 protox: 0.9.0, "Apache-2.0 OR MIT",
 protox-parse: 0.9.0, "Apache-2.0 OR MIT",
@@ -671,8 +672,8 @@ ringbuffer: 0.16.0, "MIT",
 rkyv: 0.7.45, "MIT",
 rkyv_derive: 0.7.45, "MIT",
 rle-decode-fast: 1.0.3, "Apache-2.0 OR MIT",
-rmcp: 0.9.1, "MIT",
-rmcp-macros: 0.9.1, "MIT",
+rmcp: 0.10.0, "MIT",
+rmcp-macros: 0.10.0, "MIT",
 roaring: 0.10.12, "Apache-2.0 OR MIT",
 route-recognizer: 0.3.1, "MIT",
 rsa: 0.9.9, "Apache-2.0 OR MIT",
@@ -870,7 +871,7 @@ utf-8: 0.7.6, "Apache-2.0 OR MIT",
 utf8-width: 0.1.8, "MIT",
 utf8_iter: 1.0.4, "Apache-2.0 OR MIT",
 utf8parse: 0.2.2, "Apache-2.0 OR MIT",
-uuid: 1.18.1, "Apache-2.0 OR MIT",
+uuid: 1.19.0, "Apache-2.0 OR MIT",
 v_htmlescape: 0.15.8, "Apache-2.0 OR MIT",
 valuable: 0.1.1, "MIT",
 value-trait: 0.12.1, "Apache-2.0 OR MIT",
diff --git a/core/ai/mcp/Cargo.toml b/core/ai/mcp/Cargo.toml
index 5d47536ca..526517d1e 100644
--- a/core/ai/mcp/Cargo.toml
+++ b/core/ai/mcp/Cargo.toml
@@ -34,7 +34,7 @@ figlet-rs = { workspace = true }
 figment = { workspace = true }
 iggy = { workspace = true }
 iggy_common = { workspace = true }
-rmcp = { version = "0.9.1", features = [
+rmcp = { version = "0.10.0", features = [
     "server",
     "transport-io",
     "transport-sse-server",
diff --git a/core/connectors/runtime/src/api/models.rs 
b/core/connectors/runtime/src/api/models.rs
index f8832dfd1..2ce4176e6 100644
--- a/core/connectors/runtime/src/api/models.rs
+++ b/core/connectors/runtime/src/api/models.rs
@@ -20,7 +20,11 @@
 use crate::configs::connectors::{
     ConfigFormat, SinkConfig, SourceConfig, StreamConsumerConfig, 
StreamProducerConfig,
 };
-use crate::manager::{sink::SinkInfo, source::SourceInfo};
+use crate::manager::{
+    sink::SinkInfo,
+    source::SourceInfo,
+    status::{ConnectorError, ConnectorStatus},
+};
 use iggy_connector_sdk::transforms::TransformType;
 use serde::{Deserialize, Serialize};
 
@@ -31,7 +35,9 @@ pub struct SinkInfoResponse {
     pub name: String,
     pub path: String,
     pub enabled: bool,
-    pub running: bool,
+    pub status: ConnectorStatus,
+    #[serde(skip_serializing_if = "Option::is_none")]
+    pub last_error: Option<ConnectorError>,
     pub plugin_config_format: Option<ConfigFormat>,
 }
 
@@ -56,7 +62,9 @@ pub struct SourceInfoResponse {
     pub name: String,
     pub path: String,
     pub enabled: bool,
-    pub running: bool,
+    pub status: ConnectorStatus,
+    #[serde(skip_serializing_if = "Option::is_none")]
+    pub last_error: Option<ConnectorError>,
     pub plugin_config_format: Option<ConfigFormat>,
 }
 
@@ -88,7 +96,8 @@ impl From<SinkInfo> for SinkInfoResponse {
             name: sink.name,
             path: sink.path,
             enabled: sink.enabled,
-            running: sink.running,
+            status: sink.status,
+            last_error: sink.last_error,
             plugin_config_format: sink.plugin_config_format,
         }
     }
@@ -102,7 +111,8 @@ impl From<SourceInfo> for SourceInfoResponse {
             name: source.name,
             path: source.path,
             enabled: source.enabled,
-            running: source.running,
+            status: source.status,
+            last_error: source.last_error,
             plugin_config_format: source.plugin_config_format,
         }
     }
diff --git a/core/connectors/runtime/src/context.rs 
b/core/connectors/runtime/src/context.rs
index ef9b18910..fe2a8e034 100644
--- a/core/connectors/runtime/src/context.rs
+++ b/core/connectors/runtime/src/context.rs
@@ -18,11 +18,13 @@
  */
 use crate::configs::connectors::{ConnectorsConfigProvider, SinkConfig, 
SourceConfig};
 use crate::configs::runtime::ConnectorsRuntimeConfig;
+use crate::manager::status::ConnectorError;
 use crate::{
     SinkConnectorWrapper, SourceConnectorWrapper,
     manager::{
         sink::{SinkDetails, SinkInfo, SinkManager},
         source::{SourceDetails, SourceInfo, SourceManager},
+        status::ConnectorStatus,
     },
 };
 use std::collections::HashMap;
@@ -64,6 +66,14 @@ fn map_sinks(
                 continue;
             };
 
+            let status = if sink_plugin.error.is_some() {
+                ConnectorStatus::Error
+            } else if sink_config.enabled {
+                ConnectorStatus::Starting
+            } else {
+                ConnectorStatus::Stopped
+            };
+
             sinks.push(SinkDetails {
                 info: SinkInfo {
                     id: sink_plugin.id,
@@ -71,7 +81,11 @@ fn map_sinks(
                     name: sink_plugin.name.to_owned(),
                     path: sink_plugin.path.to_owned(),
                     enabled: sink_config.enabled,
-                    running: sink_config.enabled,
+                    status,
+                    last_error: sink_plugin
+                        .error
+                        .as_ref()
+                        .map(|err| ConnectorError::new(&err.to_string())),
                     plugin_config_format: sink_plugin.config_format,
                 },
                 config: sink_config.clone(),
@@ -93,6 +107,14 @@ fn map_sources(
                 continue;
             };
 
+            let status = if source_plugin.error.is_some() {
+                ConnectorStatus::Error
+            } else if source_config.enabled {
+                ConnectorStatus::Starting
+            } else {
+                ConnectorStatus::Stopped
+            };
+
             sources.push(SourceDetails {
                 info: SourceInfo {
                     id: source_plugin.id,
@@ -100,7 +122,11 @@ fn map_sources(
                     name: source_plugin.name.to_owned(),
                     path: source_plugin.path.to_owned(),
                     enabled: source_config.enabled,
-                    running: source_config.enabled,
+                    status,
+                    last_error: source_plugin
+                        .error
+                        .as_ref()
+                        .map(|err| ConnectorError::new(&err.to_string())),
                     plugin_config_format: source_plugin.config_format,
                 },
                 config: source_config.clone(),
diff --git a/core/connectors/runtime/src/main.rs 
b/core/connectors/runtime/src/main.rs
index 8da7add28..d1990d80f 100644
--- a/core/connectors/runtime/src/main.rs
+++ b/core/connectors/runtime/src/main.rs
@@ -189,10 +189,10 @@ async fn main() -> Result<(), RuntimeError> {
         connectors_config_provider,
     );
     let context = Arc::new(context);
-    api::init(&config.http, context).await;
+    api::init(&config.http, context.clone()).await;
 
-    source::handle(source_wrappers);
-    sink::consume(sink_wrappers);
+    source::handle(source_wrappers, context.clone());
+    sink::consume(sink_wrappers, context.clone());
     info!("All sources and sinks spawned.");
 
     #[cfg(unix)]
@@ -265,6 +265,7 @@ struct SinkConnectorPlugin {
     path: String,
     config_format: Option<ConfigFormat>,
     consumers: Vec<SinkConnectorConsumer>,
+    error: Option<String>,
 }
 
 struct SinkConnectorConsumer {
@@ -298,6 +299,7 @@ struct SourceConnectorPlugin {
     transforms: Vec<Arc<dyn Transform>>,
     producer: Option<SourceConnectorProducer>,
     state_storage: StateStorage,
+    error: Option<String>,
 }
 
 struct SourceConnectorProducer {
diff --git a/core/connectors/runtime/src/manager/mod.rs 
b/core/connectors/runtime/src/manager/mod.rs
index 8efc27b1a..475050c8a 100644
--- a/core/connectors/runtime/src/manager/mod.rs
+++ b/core/connectors/runtime/src/manager/mod.rs
@@ -1,4 +1,5 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
  * regarding copyright ownership.  The ASF licenses this file
@@ -18,3 +19,4 @@
 
 pub mod sink;
 pub mod source;
+pub mod status;
diff --git a/core/connectors/runtime/src/manager/sink.rs 
b/core/connectors/runtime/src/manager/sink.rs
index 65d594fa0..93ad280bd 100644
--- a/core/connectors/runtime/src/manager/sink.rs
+++ b/core/connectors/runtime/src/manager/sink.rs
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+use super::status::{ConnectorError, ConnectorStatus};
 use crate::configs::connectors::{ConfigFormat, SinkConfig};
 use dashmap::DashMap;
 use std::collections::HashMap;
@@ -61,6 +62,24 @@ impl SinkManager {
         }
         results
     }
+
+    pub async fn update_status(&self, key: &str, status: ConnectorStatus) {
+        if let Some(sink) = self.sinks.get(key) {
+            let mut sink = sink.lock().await;
+            sink.info.status = status;
+            if matches!(status, ConnectorStatus::Running | 
ConnectorStatus::Stopped) {
+                sink.info.last_error = None;
+            }
+        }
+    }
+
+    pub async fn set_error(&self, key: &str, error_message: &str) {
+        if let Some(sink) = self.sinks.get(key) {
+            let mut sink = sink.lock().await;
+            sink.info.status = ConnectorStatus::Error;
+            sink.info.last_error = Some(ConnectorError::new(error_message));
+        }
+    }
 }
 
 #[derive(Debug, Clone)]
@@ -70,7 +89,8 @@ pub struct SinkInfo {
     pub name: String,
     pub path: String,
     pub enabled: bool,
-    pub running: bool,
+    pub status: ConnectorStatus,
+    pub last_error: Option<ConnectorError>,
     pub plugin_config_format: Option<ConfigFormat>,
 }
 
diff --git a/core/connectors/runtime/src/manager/source.rs 
b/core/connectors/runtime/src/manager/source.rs
index 258151837..801a567e7 100644
--- a/core/connectors/runtime/src/manager/source.rs
+++ b/core/connectors/runtime/src/manager/source.rs
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+use super::status::{ConnectorError, ConnectorStatus};
 use crate::configs::connectors::{ConfigFormat, SourceConfig};
 use dashmap::DashMap;
 use std::collections::HashMap;
@@ -61,6 +62,24 @@ impl SourceManager {
         }
         results
     }
+
+    pub async fn update_status(&self, key: &str, status: ConnectorStatus) {
+        if let Some(source) = self.sources.get(key) {
+            let mut source = source.lock().await;
+            source.info.status = status;
+            if matches!(status, ConnectorStatus::Running | 
ConnectorStatus::Stopped) {
+                source.info.last_error = None;
+            }
+        }
+    }
+
+    pub async fn set_error(&self, key: &str, error_message: &str) {
+        if let Some(source) = self.sources.get(key) {
+            let mut source = source.lock().await;
+            source.info.status = ConnectorStatus::Error;
+            source.info.last_error = Some(ConnectorError::new(error_message));
+        }
+    }
 }
 
 #[derive(Debug, Clone)]
@@ -70,7 +89,8 @@ pub struct SourceInfo {
     pub name: String,
     pub path: String,
     pub enabled: bool,
-    pub running: bool,
+    pub status: ConnectorStatus,
+    pub last_error: Option<ConnectorError>,
     pub plugin_config_format: Option<ConfigFormat>,
 }
 
diff --git a/core/connectors/runtime/src/manager/status.rs 
b/core/connectors/runtime/src/manager/status.rs
new file mode 100644
index 000000000..00dacf79d
--- /dev/null
+++ b/core/connectors/runtime/src/manager/status.rs
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use iggy_common::IggyTimestamp;
+use serde::{Deserialize, Serialize};
+use strum::{AsRefStr, Display as StrumDisplay, EnumString};
+
+#[derive(
+    Debug,
+    Serialize,
+    Deserialize,
+    PartialEq,
+    Clone,
+    Copy,
+    AsRefStr,
+    StrumDisplay,
+    EnumString,
+    Default,
+)]
+#[serde(rename_all = "lowercase")]
+#[strum(serialize_all = "lowercase")]
+#[repr(u8)]
+pub enum ConnectorStatus {
+    /// Connector is initializing
+    Starting,
+    /// Connector is running normally
+    Running,
+    /// Connector is shutting down
+    Stopping,
+    /// Connector is stopped (disabled or shut down cleanly)
+    #[default]
+    Stopped,
+    /// Connector has encountered an error
+    Error,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct ConnectorError {
+    pub message: String,
+    pub timestamp: IggyTimestamp,
+}
+
+impl ConnectorError {
+    pub fn new(message: &str) -> Self {
+        Self {
+            message: message.to_string(),
+            timestamp: IggyTimestamp::now(),
+        }
+    }
+}
diff --git a/core/connectors/runtime/src/sink.rs 
b/core/connectors/runtime/src/sink.rs
index 6b1c9970c..abd04971e 100644
--- a/core/connectors/runtime/src/sink.rs
+++ b/core/connectors/runtime/src/sink.rs
@@ -18,6 +18,8 @@
  */
 
 use crate::configs::connectors::SinkConfig;
+use crate::context::RuntimeContext;
+use crate::manager::status::ConnectorStatus;
 use crate::{
     PLUGIN_ID, RuntimeError, SinkApi, SinkConnector, SinkConnectorConsumer, 
SinkConnectorPlugin,
     SinkConnectorWrapper, resolve_plugin_path, transform,
@@ -58,13 +60,16 @@ pub async fn init(
             "Initializing sink container with name: {name} ({key}), config 
version: {}, plugin: {path}",
             &config.version
         );
+        let init_error: Option<String>;
         if let Some(container) = sink_connectors.get_mut(&path) {
             info!("Sink container for plugin: {path} is already loaded.",);
-            init_sink(
+            init_error = init_sink(
                 &container.container,
                 &config.plugin_config.unwrap_or_default(),
                 plugin_id,
-            );
+            )
+            .err()
+            .map(|error| error.to_string());
             container.plugins.push(SinkConnectorPlugin {
                 id: plugin_id,
                 key: key.to_owned(),
@@ -72,16 +77,19 @@ pub async fn init(
                 path: path.to_owned(),
                 config_format: config.plugin_config_format,
                 consumers: vec![],
+                error: init_error.clone(),
             });
         } else {
             let container: Container<SinkApi> =
                 unsafe { Container::load(&path).expect("Failed to load sink 
container") };
             info!("Sink container for plugin: {path} loaded successfully.",);
-            init_sink(
+            init_error = init_sink(
                 &container,
                 &config.plugin_config.unwrap_or_default(),
                 plugin_id,
-            );
+            )
+            .err()
+            .map(|error| error.to_string());
             sink_connectors.insert(
                 path.to_owned(),
                 SinkConnector {
@@ -93,14 +101,20 @@ pub async fn init(
                         path: path.to_owned(),
                         config_format: config.plugin_config_format,
                         consumers: vec![],
+                        error: init_error.clone(),
                     }],
                 },
             );
         }
 
-        info!(
-            "Sink container with name: {name} ({key}), initialized 
successfully with ID: {plugin_id}."
-        );
+        if let Some(error) = init_error {
+            error!("Failed to initialize sink container with name: {name} 
({key}). {error}");
+            continue;
+        } else {
+            info!(
+                "Sink container with name: {name} ({key}), initialized 
successfully with ID: {plugin_id}."
+            );
+        }
         PLUGIN_ID.fetch_add(1, Ordering::Relaxed);
 
         let transforms = if let Some(transforms_config) = config.transforms {
@@ -161,12 +175,29 @@ pub async fn init(
     Ok(sink_connectors)
 }
 
-pub fn consume(sinks: Vec<SinkConnectorWrapper>) {
+pub fn consume(sinks: Vec<SinkConnectorWrapper>, context: Arc<RuntimeContext>) 
{
     for sink in sinks {
         for plugin in sink.plugins {
-            info!("Starting consume for sink with ID: {}...", plugin.id);
+            if plugin.error.is_none() {
+                info!("Starting consume for sink with ID: {}...", plugin.id);
+            } else {
+                error!(
+                    "Failed to initialize sink connector with ID: {}: {}. 
Skipping...",
+                    plugin.id,
+                    plugin.error.as_ref().expect("Error should be present")
+                );
+                continue;
+            }
             for consumer in plugin.consumers {
+                let plugin_key = plugin.key.clone();
+                let context = context.clone();
+
                 tokio::spawn(async move {
+                    context
+                        .sinks
+                        .update_status(&plugin_key, ConnectorStatus::Running)
+                        .await;
+
                     if let Err(error) = consume_messages(
                         plugin.id,
                         consumer.decoder,
@@ -177,10 +208,12 @@ pub fn consume(sinks: Vec<SinkConnectorWrapper>) {
                     )
                     .await
                     {
-                        error!(
+                        let err = format!(
                             "Failed to consume messages for sink connector 
with ID: {}. {error}",
                             plugin.id
                         );
+                        error!(err);
+                        context.sinks.set_error(&plugin_key, &err).await;
                         return;
                     }
                     info!(
@@ -249,7 +282,7 @@ async fn consume_messages(
             error!(
                 "Failed to process {messages_count} messages for sink 
connector with ID: {plugin_id}. {error}",
             );
-            continue;
+            return Err(error);
         }
 
         let elapsed = start.elapsed();
@@ -262,9 +295,20 @@ async fn consume_messages(
     Ok(())
 }
 
-fn init_sink(container: &Container<SinkApi>, plugin_config: 
&serde_json::Value, id: u32) {
+fn init_sink(
+    container: &Container<SinkApi>,
+    plugin_config: &serde_json::Value,
+    id: u32,
+) -> Result<(), RuntimeError> {
     let plugin_config = serde_json::to_string(plugin_config).expect("Invalid 
sink plugin config.");
-    (container.open)(id, plugin_config.as_ptr(), plugin_config.len());
+    let result = (container.open)(id, plugin_config.as_ptr(), 
plugin_config.len());
+    if result != 0 {
+        let err = format!("Plugin initialization failed (ID: {id})");
+        error!("{err}");
+        Err(RuntimeError::InvalidConfiguration(err))
+    } else {
+        Ok(())
+    }
 }
 
 async fn process_messages(
diff --git a/core/connectors/runtime/src/source.rs 
b/core/connectors/runtime/src/source.rs
index 462ae4487..26af0eaff 100644
--- a/core/connectors/runtime/src/source.rs
+++ b/core/connectors/runtime/src/source.rs
@@ -36,6 +36,8 @@ use std::{
 use tracing::{debug, error, info, trace, warn};
 
 use crate::configs::connectors::SourceConfig;
+use crate::context::RuntimeContext;
+use crate::manager::status::ConnectorStatus;
 use crate::{
     PLUGIN_ID, RuntimeError, SourceApi, SourceConnector, SourceConnectorPlugin,
     SourceConnectorProducer, SourceConnectorWrapper, resolve_plugin_path,
@@ -68,14 +70,17 @@ pub async fn init(
         let state = match &state_storage {
             StateStorage::File(file) => file.load().await?,
         };
+        let init_error: Option<String>;
         if let Some(container) = source_connectors.get_mut(&path) {
             info!("Source container for plugin: {path} is already loaded.",);
-            init_source(
+            init_error = init_source(
                 &container.container,
                 &config.plugin_config.unwrap_or_default(),
                 plugin_id,
                 state,
-            );
+            )
+            .err()
+            .map(|e| e.to_string());
             container.plugins.push(SourceConnectorPlugin {
                 id: plugin_id,
                 key: key.to_owned(),
@@ -85,17 +90,20 @@ pub async fn init(
                 producer: None,
                 transforms: vec![],
                 state_storage,
+                error: init_error.clone(),
             });
         } else {
             let container: Container<SourceApi> =
                 unsafe { Container::load(&path).expect("Failed to load source 
container") };
             info!("Source container for plugin: {path} loaded successfully.",);
-            init_source(
+            init_error = init_source(
                 &container,
                 &config.plugin_config.unwrap_or_default(),
                 plugin_id,
                 state,
-            );
+            )
+            .err()
+            .map(|e| e.to_string());
             source_connectors.insert(
                 path.to_owned(),
                 SourceConnector {
@@ -109,14 +117,20 @@ pub async fn init(
                         producer: None,
                         transforms: vec![],
                         state_storage,
+                        error: init_error.clone(),
                     }],
                 },
             );
         }
 
-        info!(
-            "Source container with name: {name} ({key}), initialized 
successfully with ID: {plugin_id}."
-        );
+        if let Some(err) = init_error {
+            error!("Source container with name: {name} ({key}) failed to 
initialize: {err}");
+            continue;
+        } else {
+            info!(
+                "Source container with name: {name} ({key}), initialized 
successfully with ID: {plugin_id}."
+            );
+        }
         PLUGIN_ID.fetch_add(1, Ordering::Relaxed);
 
         let transforms = if let Some(transforms_config) = config.transforms {
@@ -174,19 +188,26 @@ fn init_source(
     plugin_config: &serde_json::Value,
     id: u32,
     state: Option<ConnectorState>,
-) {
+) -> Result<(), RuntimeError> {
     trace!("Initializing source plugin with config: {plugin_config:?} (ID: 
{id})");
     let plugin_config =
         serde_json::to_string(plugin_config).expect("Invalid source plugin 
config.");
     let state_ptr = state.as_ref().map_or(std::ptr::null(), |s| s.0.as_ptr());
     let state_len = state.as_ref().map_or(0, |s| s.0.len());
-    (container.open)(
+    let result = (container.open)(
         id,
         plugin_config.as_ptr(),
         plugin_config.len(),
         state_ptr,
         state_len,
     );
+    if result != 0 {
+        let err = format!("Plugin initialization failed (ID: {id})");
+        error!("{err}");
+        Err(RuntimeError::InvalidConfiguration(err))
+    } else {
+        Ok(())
+    }
 }
 
 fn get_state_storage(state_path: &str, key: &str) -> StateStorage {
@@ -194,11 +215,23 @@ fn get_state_storage(state_path: &str, key: &str) -> 
StateStorage {
     StateStorage::File(FileStateProvider::new(path))
 }
 
-pub fn handle(sources: Vec<SourceConnectorWrapper>) {
+pub fn handle(sources: Vec<SourceConnectorWrapper>, context: 
Arc<RuntimeContext>) {
     for source in sources {
         for plugin in source.plugins {
             let plugin_id = plugin.id;
-            info!("Starting handler for source connector with ID: 
{plugin_id}...");
+            let plugin_key = plugin.key.clone();
+            let context = context.clone();
+
+            if plugin.error.is_none() {
+                info!("Starting handler for source connector with ID: 
{plugin_id}...");
+            } else {
+                error!(
+                    "Failed to initialize source connector with ID: 
{plugin_id}: {}. Skipping...",
+                    plugin.error.as_ref().expect("Error should be present")
+                );
+                continue;
+            }
+
             let handle = source.callback;
             tokio::task::spawn_blocking(move || {
                 handle(plugin_id, handle_produced_messages);
@@ -212,8 +245,17 @@ pub fn handle(sources: Vec<SourceConnectorWrapper>) {
                 info!("Source connector with ID: {plugin_id} started.");
                 let Some(producer) = &plugin.producer else {
                     error!("Producer not initialized for source connector with 
ID: {plugin_id}");
+                    context
+                        .sources
+                        .set_error(&plugin_key, "Producer not initialized")
+                        .await;
                     return;
                 };
+
+                context
+                    .sources
+                    .update_status(&plugin_key, ConnectorStatus::Running)
+                    .await;
                 let encoder = producer.encoder.clone();
                 let producer = &producer.producer;
                 let mut number = 1u64;
@@ -259,20 +301,24 @@ pub fn handle(sources: Vec<SourceConnectorWrapper>) {
                         messages,
                         &plugin.transforms,
                     ) else {
-                        error!(
+                        let err = format!(
                             "Failed to process {count} messages by source 
connector with ID: {plugin_id} before sending them to stream: {}, topic: {}.",
                             producer.stream(),
                             producer.topic()
                         );
+                        error!(err);
+                        context.sources.set_error(&plugin_key, &err).await;
                         continue;
                     };
 
                     if let Err(error) = producer.send(iggy_messages).await {
-                        error!(
+                        let err = format!(
                             "Failed to send {count} messages to stream: {}, 
topic: {} by source connector with ID: {plugin_id}. {error}",
                             producer.stream(),
                             producer.topic(),
                         );
+                        error!(err);
+                        context.sources.set_error(&plugin_key, &err).await;
                         continue;
                     }
 
@@ -290,15 +336,23 @@ pub fn handle(sources: Vec<SourceConnectorWrapper>) {
                     match &plugin.state_storage {
                         StateStorage::File(file) => {
                             if let Err(error) = file.save(state).await {
-                                error!(
+                                let err = format!(
                                     "Failed to save state for source connector 
with ID: {plugin_id}. {error}"
                                 );
+                                error!(err);
+                                context.sources.set_error(&plugin_key, 
&err).await;
                                 continue;
                             }
                             debug!("State saved for source connector with ID: 
{plugin_id}");
                         }
                     }
                 }
+
+                info!("Source connector with ID: {plugin_id} stopped.");
+                context
+                    .sources
+                    .update_status(&plugin_key, ConnectorStatus::Stopped)
+                    .await;
             });
         }
     }
diff --git a/core/integration/Cargo.toml b/core/integration/Cargo.toml
index 0ed445e8a..428721d9e 100644
--- a/core/integration/Cargo.toml
+++ b/core/integration/Cargo.toml
@@ -49,7 +49,7 @@ predicates = { workspace = true }
 rand = { workspace = true }
 rcgen = "0.14.5"
 reqwest = { workspace = true }
-rmcp = { version = "0.9.1", features = [
+rmcp = { version = "0.10.0", features = [
     "client",
     "reqwest",
     "transport-streamable-http-client",

Reply via email to