This is an automated email from the ASF dual-hosted git repository.
piotr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new 8cb79069d feat(connectors): extend connectors' status reporting (#2434)
8cb79069d is described below
commit 8cb79069d8d9d03f714792b421814db50e5b79ce
Author: Maciej Modzelewski <[email protected]>
AuthorDate: Tue Dec 2 14:56:20 2025 +0100
feat(connectors): extend connectors' status reporting (#2434)
---
Cargo.lock | 26 +++++----
Cargo.toml | 2 +-
DEPENDENCIES.md | 9 +--
core/ai/mcp/Cargo.toml | 2 +-
core/connectors/runtime/src/api/models.rs | 20 +++++--
core/connectors/runtime/src/context.rs | 30 +++++++++-
core/connectors/runtime/src/main.rs | 8 ++-
core/connectors/runtime/src/manager/mod.rs | 4 +-
core/connectors/runtime/src/manager/sink.rs | 22 ++++++-
core/connectors/runtime/src/manager/source.rs | 22 ++++++-
core/connectors/runtime/src/manager/status.rs | 65 +++++++++++++++++++++
core/connectors/runtime/src/sink.rs | 70 ++++++++++++++++++-----
core/connectors/runtime/src/source.rs | 82 ++++++++++++++++++++++-----
core/integration/Cargo.toml | 2 +-
14 files changed, 307 insertions(+), 57 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index 9a1814068..1998d3f78 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -6631,6 +6631,12 @@ version = "1.0.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "57c0d7b74b563b49d38dae00a0c37d4d6de9b432382b2892f0574ddcae73fd0a"
+[[package]]
+name = "pastey"
+version = "0.2.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "57d6c094ee800037dff99e02cab0eaf3142826586742a270ab3d7a62656bd27a"
+
[[package]]
name = "pbkdf2"
version = "0.12.2"
@@ -7098,9 +7104,9 @@ dependencies = [
[[package]]
name = "prost-reflect"
-version = "0.16.2"
+version = "0.16.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "89a3ac73ec9a9118131a4594c9d336631a07852220a1d0ae03ee36b04503a063"
+checksum = "b89455ef41ed200cafc47c76c552ee7792370ac420497e551f16123a9135f76e"
dependencies = [
"logos",
"miette",
@@ -7658,9 +7664,9 @@ checksum =
"3582f63211428f83597b51b2ddb88e2a91a9d52d12831f9d08f5e624e8977422"
[[package]]
name = "rmcp"
-version = "0.9.1"
+version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "eaa07b85b779d1e1df52dd79f6c6bffbe005b191f07290136cc42a142da3409a"
+checksum = "38b18323edc657390a6ed4d7a9110b0dec2dc3ed128eb2a123edfbafabdbddc5"
dependencies = [
"async-trait",
"axum",
@@ -7671,7 +7677,7 @@ dependencies = [
"http 1.4.0",
"http-body",
"http-body-util",
- "paste",
+ "pastey",
"pin-project-lite",
"rand 0.9.2",
"reqwest",
@@ -7691,9 +7697,9 @@ dependencies = [
[[package]]
name = "rmcp-macros"
-version = "0.9.1"
+version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "0f6fa09933cac0d0204c8a5d647f558425538ed6a0134b1ebb1ae4dc00c96db3"
+checksum = "c75d0a62676bf8c8003c4e3c348e2ceb6a7b3e48323681aaf177fdccdac2ce50"
dependencies = [
"darling 0.21.3",
"proc-macro2",
@@ -9922,14 +9928,14 @@ checksum =
"06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821"
[[package]]
name = "uuid"
-version = "1.18.1"
+version = "1.19.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "2f87b8aa10b915a06587d0dec516c282ff295b475d94abf425d62b57710070a2"
+checksum = "e2e054861b4bd027cd373e18e8d8d8e6548085000e41290d95ce0c373a654b4a"
dependencies = [
"getrandom 0.3.4",
"js-sys",
"rand 0.9.2",
- "serde",
+ "serde_core",
"wasm-bindgen",
"zerocopy",
]
diff --git a/Cargo.toml b/Cargo.toml
index 800ae36ac..355be9d16 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -196,7 +196,7 @@ tracing-subscriber = { version = "0.3.22", default-features
= false, features =
trait-variant = "0.1.2"
tungstenite = "0.28.0"
twox-hash = { version = "2.1.2", features = ["xxhash32"] }
-uuid = { version = "1.18.1", features = [
+uuid = { version = "1.19.0", features = [
"v4",
"v7",
"fast-rng",
diff --git a/DEPENDENCIES.md b/DEPENDENCIES.md
index f9ba6516f..239ffed5a 100644
--- a/DEPENDENCIES.md
+++ b/DEPENDENCIES.md
@@ -577,6 +577,7 @@ parse-display-derive: 0.9.1, "Apache-2.0 OR MIT",
passterm: 2.0.1, "BSD-3-Clause",
password-hash: 0.5.0, "Apache-2.0 OR MIT",
paste: 1.0.15, "Apache-2.0 OR MIT",
+pastey: 0.2.0, "Apache-2.0 OR MIT",
pbkdf2: 0.12.2, "Apache-2.0 OR MIT",
pear: 0.2.9, "Apache-2.0 OR MIT",
pear_codegen: 0.2.9, "Apache-2.0 OR MIT",
@@ -624,7 +625,7 @@ prometheus-client: 0.24.0, "Apache-2.0 OR MIT",
prometheus-client-derive-encode: 0.5.0, "Apache-2.0 OR MIT",
prost: 0.14.1, "Apache-2.0",
prost-derive: 0.14.1, "Apache-2.0",
-prost-reflect: 0.16.2, "Apache-2.0 OR MIT",
+prost-reflect: 0.16.3, "Apache-2.0 OR MIT",
prost-types: 0.14.1, "Apache-2.0",
protox: 0.9.0, "Apache-2.0 OR MIT",
protox-parse: 0.9.0, "Apache-2.0 OR MIT",
@@ -671,8 +672,8 @@ ringbuffer: 0.16.0, "MIT",
rkyv: 0.7.45, "MIT",
rkyv_derive: 0.7.45, "MIT",
rle-decode-fast: 1.0.3, "Apache-2.0 OR MIT",
-rmcp: 0.9.1, "MIT",
-rmcp-macros: 0.9.1, "MIT",
+rmcp: 0.10.0, "MIT",
+rmcp-macros: 0.10.0, "MIT",
roaring: 0.10.12, "Apache-2.0 OR MIT",
route-recognizer: 0.3.1, "MIT",
rsa: 0.9.9, "Apache-2.0 OR MIT",
@@ -870,7 +871,7 @@ utf-8: 0.7.6, "Apache-2.0 OR MIT",
utf8-width: 0.1.8, "MIT",
utf8_iter: 1.0.4, "Apache-2.0 OR MIT",
utf8parse: 0.2.2, "Apache-2.0 OR MIT",
-uuid: 1.18.1, "Apache-2.0 OR MIT",
+uuid: 1.19.0, "Apache-2.0 OR MIT",
v_htmlescape: 0.15.8, "Apache-2.0 OR MIT",
valuable: 0.1.1, "MIT",
value-trait: 0.12.1, "Apache-2.0 OR MIT",
diff --git a/core/ai/mcp/Cargo.toml b/core/ai/mcp/Cargo.toml
index 5d47536ca..526517d1e 100644
--- a/core/ai/mcp/Cargo.toml
+++ b/core/ai/mcp/Cargo.toml
@@ -34,7 +34,7 @@ figlet-rs = { workspace = true }
figment = { workspace = true }
iggy = { workspace = true }
iggy_common = { workspace = true }
-rmcp = { version = "0.9.1", features = [
+rmcp = { version = "0.10.0", features = [
"server",
"transport-io",
"transport-sse-server",
diff --git a/core/connectors/runtime/src/api/models.rs
b/core/connectors/runtime/src/api/models.rs
index f8832dfd1..2ce4176e6 100644
--- a/core/connectors/runtime/src/api/models.rs
+++ b/core/connectors/runtime/src/api/models.rs
@@ -20,7 +20,11 @@
use crate::configs::connectors::{
ConfigFormat, SinkConfig, SourceConfig, StreamConsumerConfig,
StreamProducerConfig,
};
-use crate::manager::{sink::SinkInfo, source::SourceInfo};
+use crate::manager::{
+ sink::SinkInfo,
+ source::SourceInfo,
+ status::{ConnectorError, ConnectorStatus},
+};
use iggy_connector_sdk::transforms::TransformType;
use serde::{Deserialize, Serialize};
@@ -31,7 +35,9 @@ pub struct SinkInfoResponse {
pub name: String,
pub path: String,
pub enabled: bool,
- pub running: bool,
+ pub status: ConnectorStatus,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub last_error: Option<ConnectorError>,
pub plugin_config_format: Option<ConfigFormat>,
}
@@ -56,7 +62,9 @@ pub struct SourceInfoResponse {
pub name: String,
pub path: String,
pub enabled: bool,
- pub running: bool,
+ pub status: ConnectorStatus,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub last_error: Option<ConnectorError>,
pub plugin_config_format: Option<ConfigFormat>,
}
@@ -88,7 +96,8 @@ impl From<SinkInfo> for SinkInfoResponse {
name: sink.name,
path: sink.path,
enabled: sink.enabled,
- running: sink.running,
+ status: sink.status,
+ last_error: sink.last_error,
plugin_config_format: sink.plugin_config_format,
}
}
@@ -102,7 +111,8 @@ impl From<SourceInfo> for SourceInfoResponse {
name: source.name,
path: source.path,
enabled: source.enabled,
- running: source.running,
+ status: source.status,
+ last_error: source.last_error,
plugin_config_format: source.plugin_config_format,
}
}
diff --git a/core/connectors/runtime/src/context.rs
b/core/connectors/runtime/src/context.rs
index ef9b18910..fe2a8e034 100644
--- a/core/connectors/runtime/src/context.rs
+++ b/core/connectors/runtime/src/context.rs
@@ -18,11 +18,13 @@
*/
use crate::configs::connectors::{ConnectorsConfigProvider, SinkConfig,
SourceConfig};
use crate::configs::runtime::ConnectorsRuntimeConfig;
+use crate::manager::status::ConnectorError;
use crate::{
SinkConnectorWrapper, SourceConnectorWrapper,
manager::{
sink::{SinkDetails, SinkInfo, SinkManager},
source::{SourceDetails, SourceInfo, SourceManager},
+ status::ConnectorStatus,
},
};
use std::collections::HashMap;
@@ -64,6 +66,14 @@ fn map_sinks(
continue;
};
+ let status = if sink_plugin.error.is_some() {
+ ConnectorStatus::Error
+ } else if sink_config.enabled {
+ ConnectorStatus::Starting
+ } else {
+ ConnectorStatus::Stopped
+ };
+
sinks.push(SinkDetails {
info: SinkInfo {
id: sink_plugin.id,
@@ -71,7 +81,11 @@ fn map_sinks(
name: sink_plugin.name.to_owned(),
path: sink_plugin.path.to_owned(),
enabled: sink_config.enabled,
- running: sink_config.enabled,
+ status,
+ last_error: sink_plugin
+ .error
+ .as_ref()
+ .map(|err| ConnectorError::new(&err.to_string())),
plugin_config_format: sink_plugin.config_format,
},
config: sink_config.clone(),
@@ -93,6 +107,14 @@ fn map_sources(
continue;
};
+ let status = if source_plugin.error.is_some() {
+ ConnectorStatus::Error
+ } else if source_config.enabled {
+ ConnectorStatus::Starting
+ } else {
+ ConnectorStatus::Stopped
+ };
+
sources.push(SourceDetails {
info: SourceInfo {
id: source_plugin.id,
@@ -100,7 +122,11 @@ fn map_sources(
name: source_plugin.name.to_owned(),
path: source_plugin.path.to_owned(),
enabled: source_config.enabled,
- running: source_config.enabled,
+ status,
+ last_error: source_plugin
+ .error
+ .as_ref()
+ .map(|err| ConnectorError::new(&err.to_string())),
plugin_config_format: source_plugin.config_format,
},
config: source_config.clone(),
diff --git a/core/connectors/runtime/src/main.rs
b/core/connectors/runtime/src/main.rs
index 8da7add28..d1990d80f 100644
--- a/core/connectors/runtime/src/main.rs
+++ b/core/connectors/runtime/src/main.rs
@@ -189,10 +189,10 @@ async fn main() -> Result<(), RuntimeError> {
connectors_config_provider,
);
let context = Arc::new(context);
- api::init(&config.http, context).await;
+ api::init(&config.http, context.clone()).await;
- source::handle(source_wrappers);
- sink::consume(sink_wrappers);
+ source::handle(source_wrappers, context.clone());
+ sink::consume(sink_wrappers, context.clone());
info!("All sources and sinks spawned.");
#[cfg(unix)]
@@ -265,6 +265,7 @@ struct SinkConnectorPlugin {
path: String,
config_format: Option<ConfigFormat>,
consumers: Vec<SinkConnectorConsumer>,
+ error: Option<String>,
}
struct SinkConnectorConsumer {
@@ -298,6 +299,7 @@ struct SourceConnectorPlugin {
transforms: Vec<Arc<dyn Transform>>,
producer: Option<SourceConnectorProducer>,
state_storage: StateStorage,
+ error: Option<String>,
}
struct SourceConnectorProducer {
diff --git a/core/connectors/runtime/src/manager/mod.rs
b/core/connectors/runtime/src/manager/mod.rs
index 8efc27b1a..475050c8a 100644
--- a/core/connectors/runtime/src/manager/mod.rs
+++ b/core/connectors/runtime/src/manager/mod.rs
@@ -1,4 +1,5 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
@@ -18,3 +19,4 @@
pub mod sink;
pub mod source;
+pub mod status;
diff --git a/core/connectors/runtime/src/manager/sink.rs
b/core/connectors/runtime/src/manager/sink.rs
index 65d594fa0..93ad280bd 100644
--- a/core/connectors/runtime/src/manager/sink.rs
+++ b/core/connectors/runtime/src/manager/sink.rs
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+use super::status::{ConnectorError, ConnectorStatus};
use crate::configs::connectors::{ConfigFormat, SinkConfig};
use dashmap::DashMap;
use std::collections::HashMap;
@@ -61,6 +62,24 @@ impl SinkManager {
}
results
}
+
+ pub async fn update_status(&self, key: &str, status: ConnectorStatus) {
+ if let Some(sink) = self.sinks.get(key) {
+ let mut sink = sink.lock().await;
+ sink.info.status = status;
+ if matches!(status, ConnectorStatus::Running |
ConnectorStatus::Stopped) {
+ sink.info.last_error = None;
+ }
+ }
+ }
+
+ pub async fn set_error(&self, key: &str, error_message: &str) {
+ if let Some(sink) = self.sinks.get(key) {
+ let mut sink = sink.lock().await;
+ sink.info.status = ConnectorStatus::Error;
+ sink.info.last_error = Some(ConnectorError::new(error_message));
+ }
+ }
}
#[derive(Debug, Clone)]
@@ -70,7 +89,8 @@ pub struct SinkInfo {
pub name: String,
pub path: String,
pub enabled: bool,
- pub running: bool,
+ pub status: ConnectorStatus,
+ pub last_error: Option<ConnectorError>,
pub plugin_config_format: Option<ConfigFormat>,
}
diff --git a/core/connectors/runtime/src/manager/source.rs
b/core/connectors/runtime/src/manager/source.rs
index 258151837..801a567e7 100644
--- a/core/connectors/runtime/src/manager/source.rs
+++ b/core/connectors/runtime/src/manager/source.rs
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+use super::status::{ConnectorError, ConnectorStatus};
use crate::configs::connectors::{ConfigFormat, SourceConfig};
use dashmap::DashMap;
use std::collections::HashMap;
@@ -61,6 +62,24 @@ impl SourceManager {
}
results
}
+
+ pub async fn update_status(&self, key: &str, status: ConnectorStatus) {
+ if let Some(source) = self.sources.get(key) {
+ let mut source = source.lock().await;
+ source.info.status = status;
+ if matches!(status, ConnectorStatus::Running |
ConnectorStatus::Stopped) {
+ source.info.last_error = None;
+ }
+ }
+ }
+
+ pub async fn set_error(&self, key: &str, error_message: &str) {
+ if let Some(source) = self.sources.get(key) {
+ let mut source = source.lock().await;
+ source.info.status = ConnectorStatus::Error;
+ source.info.last_error = Some(ConnectorError::new(error_message));
+ }
+ }
}
#[derive(Debug, Clone)]
@@ -70,7 +89,8 @@ pub struct SourceInfo {
pub name: String,
pub path: String,
pub enabled: bool,
- pub running: bool,
+ pub status: ConnectorStatus,
+ pub last_error: Option<ConnectorError>,
pub plugin_config_format: Option<ConfigFormat>,
}
diff --git a/core/connectors/runtime/src/manager/status.rs
b/core/connectors/runtime/src/manager/status.rs
new file mode 100644
index 000000000..00dacf79d
--- /dev/null
+++ b/core/connectors/runtime/src/manager/status.rs
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use iggy_common::IggyTimestamp;
+use serde::{Deserialize, Serialize};
+use strum::{AsRefStr, Display as StrumDisplay, EnumString};
+
+#[derive(
+ Debug,
+ Serialize,
+ Deserialize,
+ PartialEq,
+ Clone,
+ Copy,
+ AsRefStr,
+ StrumDisplay,
+ EnumString,
+ Default,
+)]
+#[serde(rename_all = "lowercase")]
+#[strum(serialize_all = "lowercase")]
+#[repr(u8)]
+pub enum ConnectorStatus {
+ /// Connector is initializing
+ Starting,
+ /// Connector is running normally
+ Running,
+ /// Connector is shutting down
+ Stopping,
+ /// Connector is stopped (disabled or shut down cleanly)
+ #[default]
+ Stopped,
+ /// Connector has encountered an error
+ Error,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct ConnectorError {
+ pub message: String,
+ pub timestamp: IggyTimestamp,
+}
+
+impl ConnectorError {
+ pub fn new(message: &str) -> Self {
+ Self {
+ message: message.to_string(),
+ timestamp: IggyTimestamp::now(),
+ }
+ }
+}
diff --git a/core/connectors/runtime/src/sink.rs
b/core/connectors/runtime/src/sink.rs
index 6b1c9970c..abd04971e 100644
--- a/core/connectors/runtime/src/sink.rs
+++ b/core/connectors/runtime/src/sink.rs
@@ -18,6 +18,8 @@
*/
use crate::configs::connectors::SinkConfig;
+use crate::context::RuntimeContext;
+use crate::manager::status::ConnectorStatus;
use crate::{
PLUGIN_ID, RuntimeError, SinkApi, SinkConnector, SinkConnectorConsumer,
SinkConnectorPlugin,
SinkConnectorWrapper, resolve_plugin_path, transform,
@@ -58,13 +60,16 @@ pub async fn init(
"Initializing sink container with name: {name} ({key}), config
version: {}, plugin: {path}",
&config.version
);
+ let init_error: Option<String>;
if let Some(container) = sink_connectors.get_mut(&path) {
info!("Sink container for plugin: {path} is already loaded.",);
- init_sink(
+ init_error = init_sink(
&container.container,
&config.plugin_config.unwrap_or_default(),
plugin_id,
- );
+ )
+ .err()
+ .map(|error| error.to_string());
container.plugins.push(SinkConnectorPlugin {
id: plugin_id,
key: key.to_owned(),
@@ -72,16 +77,19 @@ pub async fn init(
path: path.to_owned(),
config_format: config.plugin_config_format,
consumers: vec![],
+ error: init_error.clone(),
});
} else {
let container: Container<SinkApi> =
unsafe { Container::load(&path).expect("Failed to load sink
container") };
info!("Sink container for plugin: {path} loaded successfully.",);
- init_sink(
+ init_error = init_sink(
&container,
&config.plugin_config.unwrap_or_default(),
plugin_id,
- );
+ )
+ .err()
+ .map(|error| error.to_string());
sink_connectors.insert(
path.to_owned(),
SinkConnector {
@@ -93,14 +101,20 @@ pub async fn init(
path: path.to_owned(),
config_format: config.plugin_config_format,
consumers: vec![],
+ error: init_error.clone(),
}],
},
);
}
- info!(
- "Sink container with name: {name} ({key}), initialized
successfully with ID: {plugin_id}."
- );
+ if let Some(error) = init_error {
+ error!("Failed to initialize sink container with name: {name}
({key}). {error}");
+ continue;
+ } else {
+ info!(
+ "Sink container with name: {name} ({key}), initialized
successfully with ID: {plugin_id}."
+ );
+ }
PLUGIN_ID.fetch_add(1, Ordering::Relaxed);
let transforms = if let Some(transforms_config) = config.transforms {
@@ -161,12 +175,29 @@ pub async fn init(
Ok(sink_connectors)
}
-pub fn consume(sinks: Vec<SinkConnectorWrapper>) {
+pub fn consume(sinks: Vec<SinkConnectorWrapper>, context: Arc<RuntimeContext>)
{
for sink in sinks {
for plugin in sink.plugins {
- info!("Starting consume for sink with ID: {}...", plugin.id);
+ if plugin.error.is_none() {
+ info!("Starting consume for sink with ID: {}...", plugin.id);
+ } else {
+ error!(
+ "Failed to initialize sink connector with ID: {}: {}.
Skipping...",
+ plugin.id,
+ plugin.error.as_ref().expect("Error should be present")
+ );
+ continue;
+ }
for consumer in plugin.consumers {
+ let plugin_key = plugin.key.clone();
+ let context = context.clone();
+
tokio::spawn(async move {
+ context
+ .sinks
+ .update_status(&plugin_key, ConnectorStatus::Running)
+ .await;
+
if let Err(error) = consume_messages(
plugin.id,
consumer.decoder,
@@ -177,10 +208,12 @@ pub fn consume(sinks: Vec<SinkConnectorWrapper>) {
)
.await
{
- error!(
+ let err = format!(
"Failed to consume messages for sink connector
with ID: {}. {error}",
plugin.id
);
+ error!(err);
+ context.sinks.set_error(&plugin_key, &err).await;
return;
}
info!(
@@ -249,7 +282,7 @@ async fn consume_messages(
error!(
"Failed to process {messages_count} messages for sink
connector with ID: {plugin_id}. {error}",
);
- continue;
+ return Err(error);
}
let elapsed = start.elapsed();
@@ -262,9 +295,20 @@ async fn consume_messages(
Ok(())
}
-fn init_sink(container: &Container<SinkApi>, plugin_config:
&serde_json::Value, id: u32) {
+fn init_sink(
+ container: &Container<SinkApi>,
+ plugin_config: &serde_json::Value,
+ id: u32,
+) -> Result<(), RuntimeError> {
let plugin_config = serde_json::to_string(plugin_config).expect("Invalid
sink plugin config.");
- (container.open)(id, plugin_config.as_ptr(), plugin_config.len());
+ let result = (container.open)(id, plugin_config.as_ptr(),
plugin_config.len());
+ if result != 0 {
+ let err = format!("Plugin initialization failed (ID: {id})");
+ error!("{err}");
+ Err(RuntimeError::InvalidConfiguration(err))
+ } else {
+ Ok(())
+ }
}
async fn process_messages(
diff --git a/core/connectors/runtime/src/source.rs
b/core/connectors/runtime/src/source.rs
index 462ae4487..26af0eaff 100644
--- a/core/connectors/runtime/src/source.rs
+++ b/core/connectors/runtime/src/source.rs
@@ -36,6 +36,8 @@ use std::{
use tracing::{debug, error, info, trace, warn};
use crate::configs::connectors::SourceConfig;
+use crate::context::RuntimeContext;
+use crate::manager::status::ConnectorStatus;
use crate::{
PLUGIN_ID, RuntimeError, SourceApi, SourceConnector, SourceConnectorPlugin,
SourceConnectorProducer, SourceConnectorWrapper, resolve_plugin_path,
@@ -68,14 +70,17 @@ pub async fn init(
let state = match &state_storage {
StateStorage::File(file) => file.load().await?,
};
+ let init_error: Option<String>;
if let Some(container) = source_connectors.get_mut(&path) {
info!("Source container for plugin: {path} is already loaded.",);
- init_source(
+ init_error = init_source(
&container.container,
&config.plugin_config.unwrap_or_default(),
plugin_id,
state,
- );
+ )
+ .err()
+ .map(|e| e.to_string());
container.plugins.push(SourceConnectorPlugin {
id: plugin_id,
key: key.to_owned(),
@@ -85,17 +90,20 @@ pub async fn init(
producer: None,
transforms: vec![],
state_storage,
+ error: init_error.clone(),
});
} else {
let container: Container<SourceApi> =
unsafe { Container::load(&path).expect("Failed to load source
container") };
info!("Source container for plugin: {path} loaded successfully.",);
- init_source(
+ init_error = init_source(
&container,
&config.plugin_config.unwrap_or_default(),
plugin_id,
state,
- );
+ )
+ .err()
+ .map(|e| e.to_string());
source_connectors.insert(
path.to_owned(),
SourceConnector {
@@ -109,14 +117,20 @@ pub async fn init(
producer: None,
transforms: vec![],
state_storage,
+ error: init_error.clone(),
}],
},
);
}
- info!(
- "Source container with name: {name} ({key}), initialized
successfully with ID: {plugin_id}."
- );
+ if let Some(err) = init_error {
+ error!("Source container with name: {name} ({key}) failed to
initialize: {err}");
+ continue;
+ } else {
+ info!(
+ "Source container with name: {name} ({key}), initialized
successfully with ID: {plugin_id}."
+ );
+ }
PLUGIN_ID.fetch_add(1, Ordering::Relaxed);
let transforms = if let Some(transforms_config) = config.transforms {
@@ -174,19 +188,26 @@ fn init_source(
plugin_config: &serde_json::Value,
id: u32,
state: Option<ConnectorState>,
-) {
+) -> Result<(), RuntimeError> {
trace!("Initializing source plugin with config: {plugin_config:?} (ID:
{id})");
let plugin_config =
serde_json::to_string(plugin_config).expect("Invalid source plugin
config.");
let state_ptr = state.as_ref().map_or(std::ptr::null(), |s| s.0.as_ptr());
let state_len = state.as_ref().map_or(0, |s| s.0.len());
- (container.open)(
+ let result = (container.open)(
id,
plugin_config.as_ptr(),
plugin_config.len(),
state_ptr,
state_len,
);
+ if result != 0 {
+ let err = format!("Plugin initialization failed (ID: {id})");
+ error!("{err}");
+ Err(RuntimeError::InvalidConfiguration(err))
+ } else {
+ Ok(())
+ }
}
fn get_state_storage(state_path: &str, key: &str) -> StateStorage {
@@ -194,11 +215,23 @@ fn get_state_storage(state_path: &str, key: &str) ->
StateStorage {
StateStorage::File(FileStateProvider::new(path))
}
-pub fn handle(sources: Vec<SourceConnectorWrapper>) {
+pub fn handle(sources: Vec<SourceConnectorWrapper>, context:
Arc<RuntimeContext>) {
for source in sources {
for plugin in source.plugins {
let plugin_id = plugin.id;
- info!("Starting handler for source connector with ID:
{plugin_id}...");
+ let plugin_key = plugin.key.clone();
+ let context = context.clone();
+
+ if plugin.error.is_none() {
+ info!("Starting handler for source connector with ID:
{plugin_id}...");
+ } else {
+ error!(
+ "Failed to initialize source connector with ID:
{plugin_id}: {}. Skipping...",
+ plugin.error.as_ref().expect("Error should be present")
+ );
+ continue;
+ }
+
let handle = source.callback;
tokio::task::spawn_blocking(move || {
handle(plugin_id, handle_produced_messages);
@@ -212,8 +245,17 @@ pub fn handle(sources: Vec<SourceConnectorWrapper>) {
info!("Source connector with ID: {plugin_id} started.");
let Some(producer) = &plugin.producer else {
error!("Producer not initialized for source connector with
ID: {plugin_id}");
+ context
+ .sources
+ .set_error(&plugin_key, "Producer not initialized")
+ .await;
return;
};
+
+ context
+ .sources
+ .update_status(&plugin_key, ConnectorStatus::Running)
+ .await;
let encoder = producer.encoder.clone();
let producer = &producer.producer;
let mut number = 1u64;
@@ -259,20 +301,24 @@ pub fn handle(sources: Vec<SourceConnectorWrapper>) {
messages,
&plugin.transforms,
) else {
- error!(
+ let err = format!(
"Failed to process {count} messages by source
connector with ID: {plugin_id} before sending them to stream: {}, topic: {}.",
producer.stream(),
producer.topic()
);
+ error!(err);
+ context.sources.set_error(&plugin_key, &err).await;
continue;
};
if let Err(error) = producer.send(iggy_messages).await {
- error!(
+ let err = format!(
"Failed to send {count} messages to stream: {},
topic: {} by source connector with ID: {plugin_id}. {error}",
producer.stream(),
producer.topic(),
);
+ error!(err);
+ context.sources.set_error(&plugin_key, &err).await;
continue;
}
@@ -290,15 +336,23 @@ pub fn handle(sources: Vec<SourceConnectorWrapper>) {
match &plugin.state_storage {
StateStorage::File(file) => {
if let Err(error) = file.save(state).await {
- error!(
+ let err = format!(
"Failed to save state for source connector
with ID: {plugin_id}. {error}"
);
+ error!(err);
+ context.sources.set_error(&plugin_key,
&err).await;
continue;
}
debug!("State saved for source connector with ID:
{plugin_id}");
}
}
}
+
+ info!("Source connector with ID: {plugin_id} stopped.");
+ context
+ .sources
+ .update_status(&plugin_key, ConnectorStatus::Stopped)
+ .await;
});
}
}
diff --git a/core/integration/Cargo.toml b/core/integration/Cargo.toml
index 0ed445e8a..428721d9e 100644
--- a/core/integration/Cargo.toml
+++ b/core/integration/Cargo.toml
@@ -49,7 +49,7 @@ predicates = { workspace = true }
rand = { workspace = true }
rcgen = "0.14.5"
reqwest = { workspace = true }
-rmcp = { version = "0.9.1", features = [
+rmcp = { version = "0.10.0", features = [
"client",
"reqwest",
"transport-streamable-http-client",