This is an automated email from the ASF dual-hosted git repository.

hubcio pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/master by this push:
     new 0cd1bec3a refactor(connectors): isolate per-connector init failures in 
runtime (#3244)
0cd1bec3a is described below

commit 0cd1bec3ab8c96ab4e3a9b35c60e9a50ba46bc1a
Author: Maciej Modzelewski <[email protected]>
AuthorDate: Thu May 14 00:30:34 2026 +0200

    refactor(connectors): isolate per-connector init failures in runtime (#3244)
    
    Previously, any failure during connector init (path resolution,
    dlopen, state load, plugin init, consumer/producer setup) was
    propagated as a fatal `RuntimeError` and aborted the entire
    runtime. A single bad config or missing plugin file took down
    every other healthy connector with it, and the failing connector
    was hidden from the API/health view since it never reached the
    manager.
    
    Capture per-connector failures inline against the offending
    plugin and continue iterating. Connectors that fail before their
    FFI container can be loaded are returned from `sink::init` /
    `source::init` as `FailedPlugin` entries and surfaced through the
    manager as `ConnectorStatus::Error` with the captured message in
    `last_error`, so operators can still see and diagnose them.
    
    The `Result` return is preserved on both init paths for symmetry
    and future fatal-error handling, but no system-level errors are
    currently emitted.
---
 core/connectors/runtime/src/context.rs             |  78 ++++-
 core/connectors/runtime/src/main.rs                |  42 ++-
 core/connectors/runtime/src/sink.rs                | 199 ++++++++-----
 core/connectors/runtime/src/source.rs              | 219 ++++++++------
 core/integration/tests/connectors/mod.rs           |   1 +
 .../tests/connectors/runtime/error_isolation.rs    | 324 +++++++++++++++++++++
 core/integration/tests/connectors/runtime/mod.rs   |  20 ++
 .../connectors/runtime/sink_invalid_config.toml    |  20 ++
 .../sink_invalid_config/stdout_invalid.toml        |  39 +++
 .../runtime/sink_invalid_config/stdout_valid.toml  |  39 +++
 .../connectors/runtime/sink_missing_plugin.toml    |  20 ++
 .../sink_missing_plugin_config/stdout_missing.toml |  40 +++
 .../sink_missing_plugin_config/stdout_valid.toml   |  39 +++
 .../connectors/runtime/source_invalid_config.toml  |  20 ++
 .../source_invalid_config/random_invalid.toml      |  42 +++
 .../source_invalid_config/random_valid.toml        |  40 +++
 .../connectors/runtime/source_invalid_state.toml   |  20 ++
 .../random_invalid_state.toml                      |  45 +++
 .../source_invalid_state_config/random_valid.toml  |  40 +++
 .../connectors/runtime/source_missing_plugin.toml  |  20 ++
 .../random_missing.toml                            |  42 +++
 .../source_missing_plugin_config/random_valid.toml |  40 +++
 22 files changed, 1225 insertions(+), 164 deletions(-)

diff --git a/core/connectors/runtime/src/context.rs 
b/core/connectors/runtime/src/context.rs
index a15d4bea6..a5b1ea9f6 100644
--- a/core/connectors/runtime/src/context.rs
+++ b/core/connectors/runtime/src/context.rs
@@ -21,7 +21,7 @@ use crate::configs::runtime::ConnectorsRuntimeConfig;
 use crate::metrics::Metrics;
 use crate::stream::IggyClients;
 use crate::{
-    SinkConnectorWrapper, SourceConnectorWrapper,
+    FailedPlugin, SinkConnectorWrapper, SourceConnectorWrapper,
     manager::{
         sink::{SinkDetails, SinkInfo, SinkManager},
         source::{SourceDetails, SourceInfo, SourceManager},
@@ -54,13 +54,20 @@ pub fn init(
     sources_config: &HashMap<String, SourceConfig>,
     sink_wrappers: &[SinkConnectorWrapper],
     source_wrappers: &[SourceConnectorWrapper],
+    failed_sinks: &[FailedPlugin],
+    failed_sources: &[FailedPlugin],
     config_provider: Box<dyn ConnectorsConfigProvider>,
     iggy_clients: Arc<IggyClients>,
     state_path: String,
 ) -> RuntimeContext {
     let metrics = Arc::new(Metrics::init());
-    let sinks = SinkManager::new(map_sinks(sinks_config, sink_wrappers));
-    let sources = SourceManager::new(map_sources(sources_config, 
source_wrappers));
+    let mut sink_details = map_sinks(sinks_config, sink_wrappers);
+    sink_details.extend(map_failed_sinks(sinks_config, failed_sinks));
+    let mut source_details = map_sources(sources_config, source_wrappers);
+    source_details.extend(map_failed_sources(sources_config, failed_sources));
+
+    let sinks = SinkManager::new(sink_details);
+    let sources = SourceManager::new(source_details);
 
     metrics.set_sinks_total(sinks_config.len() as u32);
     metrics.set_sources_total(sources_config.len() as u32);
@@ -167,3 +174,68 @@ fn map_sources(
     }
     sources
 }
+
+const UNKNOWN_PLUGIN_VERSION: &str = "unknown";
+
+fn map_failed_sinks(
+    sinks_config: &HashMap<String, SinkConfig>,
+    failed: &[FailedPlugin],
+) -> Vec<SinkDetails> {
+    let mut sinks = Vec::with_capacity(failed.len());
+    for plugin in failed {
+        let Some(config) = sinks_config.get(&plugin.key) else {
+            error!("Missing sink config for failed plugin: {}", plugin.key);
+            continue;
+        };
+        sinks.push(SinkDetails {
+            info: SinkInfo {
+                id: plugin.id,
+                key: plugin.key.clone(),
+                name: plugin.name.clone(),
+                path: plugin.path.clone(),
+                version: UNKNOWN_PLUGIN_VERSION.to_owned(),
+                enabled: plugin.enabled,
+                status: ConnectorStatus::Error,
+                last_error: Some(ConnectorError::new(&plugin.error)),
+                plugin_config_format: plugin.config_format,
+            },
+            config: config.clone(),
+            shutdown_tx: None,
+            task_handles: vec![],
+            container: None,
+            restart_guard: Arc::new(Mutex::new(())),
+        });
+    }
+    sinks
+}
+
+fn map_failed_sources(
+    sources_config: &HashMap<String, SourceConfig>,
+    failed: &[FailedPlugin],
+) -> Vec<SourceDetails> {
+    let mut sources = Vec::with_capacity(failed.len());
+    for plugin in failed {
+        let Some(config) = sources_config.get(&plugin.key) else {
+            error!("Missing source config for failed plugin: {}", plugin.key);
+            continue;
+        };
+        sources.push(SourceDetails {
+            info: SourceInfo {
+                id: plugin.id,
+                key: plugin.key.clone(),
+                name: plugin.name.clone(),
+                path: plugin.path.clone(),
+                version: UNKNOWN_PLUGIN_VERSION.to_owned(),
+                enabled: plugin.enabled,
+                status: ConnectorStatus::Error,
+                last_error: Some(ConnectorError::new(&plugin.error)),
+                plugin_config_format: plugin.config_format,
+            },
+            config: config.clone(),
+            handler_tasks: vec![],
+            container: None,
+            restart_guard: Arc::new(Mutex::new(())),
+        });
+    }
+    sources
+}
diff --git a/core/connectors/runtime/src/main.rs 
b/core/connectors/runtime/src/main.rs
index 908fe44ad..45ccff667 100644
--- a/core/connectors/runtime/src/main.rs
+++ b/core/connectors/runtime/src/main.rs
@@ -156,7 +156,7 @@ async fn main() -> Result<(), RuntimeError> {
         connectors_config.sinks().len()
     );
     let sources_config = connectors_config.sources();
-    let sources = source::init(
+    let (sources, failed_sources) = source::init(
         sources_config.clone(),
         &iggy_clients.producer,
         &config.state.path,
@@ -164,7 +164,7 @@ async fn main() -> Result<(), RuntimeError> {
     .await?;
 
     let sinks_config = connectors_config.sinks();
-    let sinks = sink::init(sinks_config.clone(), 
&iggy_clients.consumer).await?;
+    let (sinks, failed_sinks) = sink::init(sinks_config.clone(), 
&iggy_clients.consumer).await?;
 
     let mut sink_wrappers = vec![];
     let mut sink_containers_by_key: HashMap<String, Arc<Container<SinkApi>>> = 
HashMap::new();
@@ -200,6 +200,8 @@ async fn main() -> Result<(), RuntimeError> {
         sources_config,
         &sink_wrappers,
         &source_wrappers,
+        &failed_sinks,
+        &failed_sources,
         connectors_config_provider,
         iggy_clients.clone(),
         config.state.path.clone(),
@@ -459,6 +461,42 @@ struct SourceConnectorWrapper {
     plugins: Vec<SourceConnectorPlugin>,
 }
 
+/// Records a connector that failed before its FFI container could be loaded
+/// (path resolution, dlopen, or pre-container setup). Surfaced to the runtime
+/// API/health view as `ConnectorStatus::Error` so one bad config does not hide
+/// the connector or block healthy peers from running.
+pub(crate) struct FailedPlugin {
+    pub id: u32,
+    pub key: String,
+    pub name: String,
+    pub path: String,
+    pub config_format: Option<ConfigFormat>,
+    pub error: String,
+    pub enabled: bool,
+}
+
+impl FailedPlugin {
+    pub(crate) fn new(
+        id: u32,
+        key: &str,
+        name: &str,
+        path: &str,
+        config_format: Option<ConfigFormat>,
+        enabled: bool,
+        error: String,
+    ) -> Self {
+        Self {
+            id,
+            key: key.to_owned(),
+            name: name.to_owned(),
+            path: path.to_owned(),
+            config_format,
+            error,
+            enabled,
+        }
+    }
+}
+
 #[cfg(test)]
 mod tests {
     use super::*;
diff --git a/core/connectors/runtime/src/sink.rs 
b/core/connectors/runtime/src/sink.rs
index a0ab25b1a..0ab3c9547 100644
--- a/core/connectors/runtime/src/sink.rs
+++ b/core/connectors/runtime/src/sink.rs
@@ -22,8 +22,8 @@ use crate::context::RuntimeContext;
 use crate::log::LOG_CALLBACK;
 use crate::metrics::Metrics;
 use crate::{
-    PLUGIN_ID, RuntimeError, SinkApi, SinkConnector, SinkConnectorConsumer, 
SinkConnectorPlugin,
-    SinkConnectorWrapper, resolve_plugin_path, transform,
+    FailedPlugin, PLUGIN_ID, RuntimeError, SinkApi, SinkConnector, 
SinkConnectorConsumer,
+    SinkConnectorPlugin, SinkConnectorWrapper, resolve_plugin_path, transform,
 };
 use dlopen2::wrapper::Container;
 use futures::StreamExt;
@@ -46,11 +46,23 @@ use tokio::sync::watch;
 use tokio::task::JoinHandle;
 use tracing::{debug, error, info, warn};
 
+/// Initializes all enabled sink connectors.
+///
+/// Per-connector failures (path resolution, dlopen, plugin init,
+/// consumer/decoder/transform setup) are captured against the offending
+/// connector and do not abort the runtime. Connectors that fail before their
+/// FFI container can be loaded are returned in the second tuple element so
+/// they remain visible in health/status output.
+///
+/// Only system-level errors that prevent any connector from running are
+/// propagated as `Err`.
 pub async fn init(
     sink_configs: HashMap<String, SinkConfig>,
     iggy_client: &IggyClient,
-) -> Result<HashMap<String, SinkConnector>, RuntimeError> {
+) -> Result<(HashMap<String, SinkConnector>, Vec<FailedPlugin>), RuntimeError> 
{
     let mut sink_connectors: HashMap<String, SinkConnector> = HashMap::new();
+    let mut failed_plugins: Vec<FailedPlugin> = Vec::new();
+
     for (key, config) in sink_configs {
         let name = config.name.clone();
         if !config.enabled {
@@ -59,102 +71,135 @@ pub async fn init(
         }
 
         let plugin_id = PLUGIN_ID.fetch_add(1, Ordering::SeqCst);
-        let path = resolve_plugin_path(&config.path)?;
+
+        let path = match resolve_plugin_path(&config.path) {
+            Ok(path) => path,
+            Err(error) => {
+                let message = format!("Failed to resolve plugin path: 
{error}");
+                error!("Sink: {name} ({key}) - {message}");
+                failed_plugins.push(FailedPlugin::new(
+                    plugin_id,
+                    &key,
+                    &name,
+                    &config.path,
+                    config.plugin_config_format,
+                    config.enabled,
+                    message,
+                ));
+                continue;
+            }
+        };
+
         info!(
             "Initializing sink container with name: {name} ({key}), config 
version: {}, plugin: {path}",
             &config.version
         );
-        let init_error: Option<String>;
-        if let Some(container) = sink_connectors.get_mut(&path) {
-            info!("Sink container for plugin: {path} is already loaded.");
-            let version = get_plugin_version(&container.container);
-            init_error = init_sink(
-                &container.container,
-                &config.plugin_config.clone().unwrap_or_default(),
-                plugin_id,
-            )
-            .err()
-            .map(|error| error.to_string());
-            container.plugins.push(SinkConnectorPlugin {
-                id: plugin_id,
-                key: key.clone(),
-                name: name.clone(),
-                path: path.clone(),
-                version,
-                config_format: config.plugin_config_format,
-                consumers: vec![],
-                error: init_error.clone(),
-                verbose: config.verbose,
-            });
-        } else {
-            let container: Container<SinkApi> = unsafe {
-                Container::load(&path).map_err(|error| {
-                    RuntimeError::InvalidConfiguration(format!(
-                        "Failed to load sink container from {path}: {error}"
-                    ))
-                })?
+
+        if !sink_connectors.contains_key(&path) {
+            let container = match unsafe { Container::<SinkApi>::load(&path) } 
{
+                Ok(container) => container,
+                Err(error) => {
+                    let message = format!("Failed to load sink container from 
{path}: {error}");
+                    error!("Sink: {name} ({key}) - {message}");
+                    failed_plugins.push(FailedPlugin::new(
+                        plugin_id,
+                        &key,
+                        &name,
+                        &config.path,
+                        config.plugin_config_format,
+                        config.enabled,
+                        message,
+                    ));
+                    continue;
+                }
             };
             info!("Sink container for plugin: {path} loaded successfully.");
-            let version = get_plugin_version(&container);
-            init_error = init_sink(
-                &container,
-                &config.plugin_config.clone().unwrap_or_default(),
-                plugin_id,
-            )
-            .err()
-            .map(|error| error.to_string());
             sink_connectors.insert(
                 path.clone(),
                 SinkConnector {
                     container,
-                    plugins: vec![SinkConnectorPlugin {
-                        id: plugin_id,
-                        key: key.clone(),
-                        name: name.clone(),
-                        path: path.clone(),
-                        version,
-                        config_format: config.plugin_config_format,
-                        consumers: vec![],
-                        error: init_error.clone(),
-                        verbose: config.verbose,
-                    }],
+                    plugins: Vec::new(),
                 },
             );
+        } else {
+            info!("Sink container for plugin: {path} is already loaded.");
         }
 
+        let connector = sink_connectors
+            .get_mut(&path)
+            .expect("sink container was just ensured for this path");
+        let version = get_plugin_version(&connector.container);
+        let init_error = init_sink(
+            &connector.container,
+            &config.plugin_config.clone().unwrap_or_default(),
+            plugin_id,
+        )
+        .err()
+        .map(|error| error.to_string());
+
+        connector.plugins.push(SinkConnectorPlugin {
+            id: plugin_id,
+            key: key.clone(),
+            name: name.clone(),
+            path: path.clone(),
+            version,
+            config_format: config.plugin_config_format,
+            consumers: vec![],
+            error: init_error.clone(),
+            verbose: config.verbose,
+        });
+
         if let Some(error) = init_error {
             error!("Failed to initialize sink container with name: {name} 
({key}). {error}");
             continue;
-        } else {
-            info!(
-                "Sink container with name: {name} ({key}), initialized 
successfully with ID: {plugin_id}."
-            );
         }
 
-        let consumers = setup_sink_consumers(&key, &config, 
iggy_client).await?;
-        let connector = sink_connectors.get_mut(&path).ok_or_else(|| {
-            RuntimeError::InvalidConfiguration(format!("Sink connector not 
found for path: {path}"))
-        })?;
-        let plugin = connector
-            .plugins
-            .iter_mut()
-            .find(|p| p.id == plugin_id)
-            .ok_or_else(|| {
-                RuntimeError::InvalidConfiguration(format!(
-                    "Sink plugin not found for ID: {plugin_id}"
-                ))
-            })?;
-        for (consumer, decoder, batch_size, transforms) in consumers {
-            plugin.consumers.push(SinkConnectorConsumer {
-                consumer,
-                decoder,
-                batch_size,
-                transforms,
-            });
+        match setup_sink_consumers(&key, &config, iggy_client).await {
+            Ok(consumers) => {
+                let connector = sink_connectors
+                    .get_mut(&path)
+                    .expect("sink connector was inserted above");
+                let plugin = connector
+                    .plugins
+                    .iter_mut()
+                    .find(|plugin| plugin.id == plugin_id)
+                    .expect("sink plugin was pushed above");
+                for (consumer, decoder, batch_size, transforms) in consumers {
+                    plugin.consumers.push(SinkConnectorConsumer {
+                        consumer,
+                        decoder,
+                        batch_size,
+                        transforms,
+                    });
+                }
+                info!(
+                    "Sink container with name: {name} ({key}) initialized 
successfully with ID: {plugin_id}."
+                );
+            }
+            Err(error) => {
+                let message = format!("Failed to set up sink consumers: 
{error}");
+                error!("Sink: {name} ({key}) - {message}");
+                let connector = sink_connectors
+                    .get_mut(&path)
+                    .expect("sink connector was inserted above");
+                let close_result = 
(connector.container.iggy_sink_close)(plugin_id);
+                if close_result != 0 {
+                    warn!(
+                        "iggy_sink_close returned {close_result} while 
cleaning up failed sink connector with ID: {plugin_id} ({key})"
+                    );
+                }
+                if let Some(plugin) = connector
+                    .plugins
+                    .iter_mut()
+                    .find(|plugin| plugin.id == plugin_id)
+                {
+                    plugin.error = Some(message);
+                }
+            }
         }
     }
 
-    Ok(sink_connectors)
+    Ok((sink_connectors, failed_plugins))
 }
 
 pub fn consume(
diff --git a/core/connectors/runtime/src/source.rs 
b/core/connectors/runtime/src/source.rs
index 2c20eb1b0..5d992a60b 100644
--- a/core/connectors/runtime/src/source.rs
+++ b/core/connectors/runtime/src/source.rs
@@ -42,7 +42,7 @@ use crate::context::RuntimeContext;
 use crate::log::LOG_CALLBACK;
 use crate::metrics::ConnectorType;
 use crate::{
-    PLUGIN_ID, RuntimeError, SourceApi, SourceConnector, SourceConnectorPlugin,
+    FailedPlugin, PLUGIN_ID, RuntimeError, SourceApi, SourceConnector, 
SourceConnectorPlugin,
     SourceConnectorProducer, SourceConnectorWrapper, resolve_plugin_path,
     state::{FileStateProvider, StateProvider, StateStorage},
     transform,
@@ -56,12 +56,24 @@ pub fn cleanup_sender(plugin_id: u32) {
     SOURCE_SENDERS.remove(&plugin_id);
 }
 
+/// Initializes all enabled source connectors.
+///
+/// Per-connector failures (path resolution, dlopen, state load, plugin init,
+/// producer/encoder/transform setup) are captured against the offending
+/// connector and do not abort the runtime. Connectors that fail before their
+/// FFI container can be loaded are returned in the second tuple element so
+/// they remain visible in health/status output.
+///
+/// Only system-level errors that prevent any connector from running (e.g. a
+/// poisoned global state) are propagated as `Err`.
 pub async fn init(
     source_configs: HashMap<String, SourceConfig>,
     iggy_client: &IggyClient,
     state_path: &str,
-) -> Result<HashMap<String, SourceConnector>, RuntimeError> {
+) -> Result<(HashMap<String, SourceConnector>, Vec<FailedPlugin>), 
RuntimeError> {
     let mut source_connectors: HashMap<String, SourceConnector> = 
HashMap::new();
+    let mut failed_plugins: Vec<FailedPlugin> = Vec::new();
+
     for (key, config) in source_configs {
         let name = config.name.clone();
         if !config.enabled {
@@ -70,110 +82,153 @@ pub async fn init(
         }
 
         let plugin_id = PLUGIN_ID.fetch_add(1, Ordering::SeqCst);
-        let path = resolve_plugin_path(&config.path)?;
+
+        let path = match resolve_plugin_path(&config.path) {
+            Ok(path) => path,
+            Err(error) => {
+                let message = format!("Failed to resolve plugin path: 
{error}");
+                error!("Source: {name} ({key}) - {message}");
+                failed_plugins.push(FailedPlugin::new(
+                    plugin_id,
+                    &key,
+                    &name,
+                    &config.path,
+                    config.plugin_config_format,
+                    config.enabled,
+                    message,
+                ));
+                continue;
+            }
+        };
+
         info!(
             "Initializing source container with name: {name} ({key}), config 
version: {}, plugin: {path}",
             &config.version
         );
+
         let state_storage = get_state_storage(state_path, &key);
         let state = match &state_storage {
-            StateStorage::File(file) => file.load().await?,
+            StateStorage::File(file) => match file.load().await {
+                Ok(state) => state,
+                Err(error) => {
+                    let message = format!("Failed to load source state: 
{error}");
+                    error!("Source: {name} ({key}) - {message}");
+                    failed_plugins.push(FailedPlugin::new(
+                        plugin_id,
+                        &key,
+                        &name,
+                        &config.path,
+                        config.plugin_config_format,
+                        config.enabled,
+                        message,
+                    ));
+                    continue;
+                }
+            },
         };
-        let init_error: Option<String>;
-        if let Some(container) = source_connectors.get_mut(&path) {
-            info!("Source container for plugin: {path} is already loaded.");
-            let version = get_plugin_version(&container.container);
-            init_error = init_source(
-                &container.container,
-                &config.plugin_config.clone().unwrap_or_default(),
-                plugin_id,
-                state,
-            )
-            .err()
-            .map(|error| error.to_string());
-            container.plugins.push(SourceConnectorPlugin {
-                id: plugin_id,
-                key: key.clone(),
-                name: name.clone(),
-                path: path.clone(),
-                version,
-                config_format: config.plugin_config_format,
-                producer: None,
-                transforms: vec![],
-                state_storage,
-                error: init_error.clone(),
-                verbose: config.verbose,
-            });
-        } else {
-            let container: Container<SourceApi> = unsafe {
-                Container::load(&path).map_err(|error| {
-                    RuntimeError::InvalidConfiguration(format!(
-                        "Failed to load source container from {path}: {error}"
-                    ))
-                })?
+
+        if !source_connectors.contains_key(&path) {
+            let container = match unsafe { Container::<SourceApi>::load(&path) 
} {
+                Ok(container) => container,
+                Err(error) => {
+                    let message = format!("Failed to load source container 
from {path}: {error}");
+                    error!("Source: {name} ({key}) - {message}");
+                    failed_plugins.push(FailedPlugin::new(
+                        plugin_id,
+                        &key,
+                        &name,
+                        &config.path,
+                        config.plugin_config_format,
+                        config.enabled,
+                        message,
+                    ));
+                    continue;
+                }
             };
             info!("Source container for plugin: {path} loaded successfully.");
-            let version = get_plugin_version(&container);
-            init_error = init_source(
-                &container,
-                &config.plugin_config.clone().unwrap_or_default(),
-                plugin_id,
-                state,
-            )
-            .err()
-            .map(|error| error.to_string());
             source_connectors.insert(
                 path.clone(),
                 SourceConnector {
                     container,
-                    plugins: vec![SourceConnectorPlugin {
-                        id: plugin_id,
-                        key: key.clone(),
-                        name: name.clone(),
-                        path: path.clone(),
-                        version,
-                        config_format: config.plugin_config_format,
-                        producer: None,
-                        transforms: vec![],
-                        state_storage,
-                        error: init_error.clone(),
-                        verbose: config.verbose,
-                    }],
+                    plugins: Vec::new(),
                 },
             );
+        } else {
+            info!("Source container for plugin: {path} is already loaded.");
         }
 
+        let connector = source_connectors
+            .get_mut(&path)
+            .expect("source container was just ensured for this path");
+        let version = get_plugin_version(&connector.container);
+        let init_error = init_source(
+            &connector.container,
+            &config.plugin_config.clone().unwrap_or_default(),
+            plugin_id,
+            state,
+        )
+        .err()
+        .map(|error| error.to_string());
+
+        connector.plugins.push(SourceConnectorPlugin {
+            id: plugin_id,
+            key: key.clone(),
+            name: name.clone(),
+            path: path.clone(),
+            version,
+            config_format: config.plugin_config_format,
+            producer: None,
+            transforms: vec![],
+            state_storage,
+            error: init_error.clone(),
+            verbose: config.verbose,
+        });
+
         if let Some(error) = init_error {
             error!("Source container with name: {name} ({key}) failed to 
initialize: {error}");
             continue;
-        } else {
-            info!(
-                "Source container with name: {name} ({key}), initialized 
successfully with ID: {plugin_id}."
-            );
         }
 
-        let (producer, encoder, transforms) =
-            setup_source_producer(&key, &config, iggy_client).await?;
-
-        let connector = source_connectors.get_mut(&path).ok_or_else(|| {
-            RuntimeError::InvalidConfiguration(format!(
-                "Source connector not found for path: {path}"
-            ))
-        })?;
-        let plugin = connector
-            .plugins
-            .iter_mut()
-            .find(|p| p.id == plugin_id)
-            .ok_or_else(|| {
-                RuntimeError::InvalidConfiguration(format!(
-                    "Source plugin not found for ID: {plugin_id}"
-                ))
-            })?;
-        plugin.producer = Some(SourceConnectorProducer { producer, encoder });
-        plugin.transforms = transforms;
+        match setup_source_producer(&key, &config, iggy_client).await {
+            Ok((producer, encoder, transforms)) => {
+                let connector = source_connectors
+                    .get_mut(&path)
+                    .expect("source connector was inserted above");
+                let plugin = connector
+                    .plugins
+                    .iter_mut()
+                    .find(|plugin| plugin.id == plugin_id)
+                    .expect("source plugin was pushed above");
+                plugin.producer = Some(SourceConnectorProducer { producer, 
encoder });
+                plugin.transforms = transforms;
+                info!(
+                    "Source container with name: {name} ({key}) initialized 
successfully with ID: {plugin_id}."
+                );
+            }
+            Err(error) => {
+                let message = format!("Failed to set up source producer: 
{error}");
+                error!("Source: {name} ({key}) - {message}");
+                let connector = source_connectors
+                    .get_mut(&path)
+                    .expect("source connector was inserted above");
+                let close_result = 
(connector.container.iggy_source_close)(plugin_id);
+                if close_result != 0 {
+                    warn!(
+                        "iggy_source_close returned {close_result} while 
cleaning up failed source connector with ID: {plugin_id} ({key})"
+                    );
+                }
+                if let Some(plugin) = connector
+                    .plugins
+                    .iter_mut()
+                    .find(|plugin| plugin.id == plugin_id)
+                {
+                    plugin.error = Some(message);
+                }
+            }
+        }
     }
 
-    Ok(source_connectors)
+    Ok((source_connectors, failed_plugins))
 }
 
 fn get_plugin_version(container: &Container<SourceApi>) -> String {
diff --git a/core/integration/tests/connectors/mod.rs 
b/core/integration/tests/connectors/mod.rs
index 83600be27..adf97f1b7 100644
--- a/core/integration/tests/connectors/mod.rs
+++ b/core/integration/tests/connectors/mod.rs
@@ -29,6 +29,7 @@ mod mongodb;
 mod postgres;
 mod quickwit;
 mod random;
+mod runtime;
 mod stdout;
 
 use iggy_common::IggyTimestamp;
diff --git a/core/integration/tests/connectors/runtime/error_isolation.rs 
b/core/integration/tests/connectors/runtime/error_isolation.rs
new file mode 100644
index 000000000..3d6fbb977
--- /dev/null
+++ b/core/integration/tests/connectors/runtime/error_isolation.rs
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+//! Per-connector error isolation tests for the connectors runtime.
+//!
+//! Each test drives the runtime with two connectors of the same kind: one
+//! deliberately misconfigured and one healthy. The assertions verify that:
+//!   * the runtime stays alive (`/health` returns "healthy"),
+//!   * the broken connector is surfaced via the API with
+//!     [`ConnectorStatus::Error`] and a populated `last_error`,
+//!   * the sibling healthy connector still reaches 
[`ConnectorStatus::Running`].
+//!
+//! Together the tests cover every per-connector failure branch in
+//! `source::init` and `sink::init`:
+//!   * plugin path resolution failure (missing `.so`),
+//!   * source state-load failure (unreachable state file),
+//!   * post-container setup failure (invalid duration in stream config).
+
+use iggy_connector_sdk::api::{
+    ConnectorStatus, HealthResponse, SinkInfoResponse, SourceInfoResponse,
+};
+use integration::harness::seeds;
+use integration::iggy_harness;
+use reqwest::Client;
+use std::time::Duration;
+use tokio::time::sleep;
+
+async fn assert_runtime_healthy(http_client: &Client, api_address: &str) {
+    let response = http_client
+        .get(format!("{api_address}/health"))
+        .send()
+        .await
+        .expect("Failed to query health endpoint");
+    assert_eq!(response.status(), 200);
+    let health: HealthResponse = response
+        .json()
+        .await
+        .expect("Failed to parse health response");
+    assert_eq!(health.status, "healthy");
+}
+
+async fn fetch_sinks(http_client: &Client, api_address: &str) -> 
Vec<SinkInfoResponse> {
+    let response = http_client
+        .get(format!("{api_address}/sinks"))
+        .send()
+        .await
+        .expect("Failed to query /sinks");
+    assert_eq!(response.status(), 200);
+    response.json().await.expect("Failed to parse sinks")
+}
+
+async fn fetch_sources(http_client: &Client, api_address: &str) -> 
Vec<SourceInfoResponse> {
+    let response = http_client
+        .get(format!("{api_address}/sources"))
+        .send()
+        .await
+        .expect("Failed to query /sources");
+    assert_eq!(response.status(), 200);
+    response.json().await.expect("Failed to parse sources")
+}
+
+#[iggy_harness(
+    server(connectors_runtime(
+        config_path = "tests/connectors/runtime/sink_invalid_config.toml"
+    )),
+    seed = seeds::connector_stream
+)]
+async fn sink_with_invalid_config_does_not_abort_runtime(harness: 
&TestHarness) {
+    let api_address = harness
+        .connectors_runtime()
+        .expect("connector runtime should be available")
+        .http_url();
+    let http_client = Client::new();
+
+    assert_runtime_healthy(&http_client, &api_address).await;
+    let sinks = fetch_sinks(&http_client, &api_address).await;
+
+    assert_eq!(
+        sinks.len(),
+        2,
+        "Both the invalid and the valid sink should be visible in the API"
+    );
+
+    let invalid_sink = sinks
+        .iter()
+        .find(|sink| sink.key == "stdout_invalid")
+        .expect("Invalid sink should be reported");
+    assert_eq!(invalid_sink.status, ConnectorStatus::Error);
+    let last_error = invalid_sink
+        .last_error
+        .as_ref()
+        .expect("Invalid sink should expose a last_error");
+    assert!(
+        last_error.message.contains("poll interval"),
+        "last_error should mention the misconfigured poll_interval, got: {}",
+        last_error.message
+    );
+
+    let valid_sink = sinks
+        .iter()
+        .find(|sink| sink.key == "stdout_valid")
+        .expect("Healthy sibling sink should be reported");
+    assert_eq!(valid_sink.status, ConnectorStatus::Running);
+    assert!(
+        valid_sink.last_error.is_none(),
+        "Healthy sibling sink should have no last_error"
+    );
+}
+
+#[iggy_harness(
+    server(connectors_runtime(
+        config_path = "tests/connectors/runtime/sink_missing_plugin.toml"
+    )),
+    seed = seeds::connector_stream
+)]
+async fn sink_with_missing_plugin_does_not_abort_runtime(harness: 
&TestHarness) {
+    let api_address = harness
+        .connectors_runtime()
+        .expect("connector runtime should be available")
+        .http_url();
+    let http_client = Client::new();
+
+    assert_runtime_healthy(&http_client, &api_address).await;
+    let sinks = fetch_sinks(&http_client, &api_address).await;
+
+    assert_eq!(
+        sinks.len(),
+        2,
+        "Both the missing-plugin sink and the valid sink should be visible in 
the API"
+    );
+
+    let missing_plugin_sink = sinks
+        .iter()
+        .find(|sink| sink.key == "stdout_missing_plugin")
+        .expect("Missing-plugin sink should be reported");
+    assert_eq!(missing_plugin_sink.status, ConnectorStatus::Error);
+    let last_error = missing_plugin_sink
+        .last_error
+        .as_ref()
+        .expect("Missing-plugin sink should expose a last_error");
+    assert!(
+        last_error.message.contains("Plugin library not found")
+            || last_error.message.contains("Failed to resolve plugin path"),
+        "last_error should mention the missing plugin path, got: {}",
+        last_error.message
+    );
+
+    let valid_sink = sinks
+        .iter()
+        .find(|sink| sink.key == "stdout_valid")
+        .expect("Healthy sibling sink should be reported");
+    assert_eq!(valid_sink.status, ConnectorStatus::Running);
+    assert!(
+        valid_sink.last_error.is_none(),
+        "Healthy sibling sink should have no last_error"
+    );
+}
+
+#[iggy_harness(
+    server(connectors_runtime(
+        config_path = "tests/connectors/runtime/source_invalid_state.toml"
+    )),
+    seed = seeds::connector_stream
+)]
+async fn source_with_invalid_state_does_not_abort_runtime(harness: 
&TestHarness) {
+    let api_address = harness
+        .connectors_runtime()
+        .expect("connector runtime should be available")
+        .http_url();
+    let http_client = Client::new();
+
+    sleep(Duration::from_millis(500)).await;
+
+    assert_runtime_healthy(&http_client, &api_address).await;
+    let sources = fetch_sources(&http_client, &api_address).await;
+
+    assert_eq!(
+        sources.len(),
+        2,
+        "Both the broken-state source and the valid source should be visible 
in the API"
+    );
+
+    let invalid_source = sources
+        .iter()
+        .find(|source| source.key == "random_invalid/state_missing_parent")
+        .expect("Source with broken state path should be reported");
+    assert_eq!(invalid_source.status, ConnectorStatus::Error);
+    let last_error = invalid_source
+        .last_error
+        .as_ref()
+        .expect("Source with broken state should expose a last_error");
+    assert!(
+        last_error.message.contains("Failed to load source state"),
+        "last_error should mention the failed state load, got: {}",
+        last_error.message
+    );
+
+    let valid_source = sources
+        .iter()
+        .find(|source| source.key == "random_valid")
+        .expect("Healthy sibling source should be reported");
+    assert_eq!(valid_source.status, ConnectorStatus::Running);
+    assert!(
+        valid_source.last_error.is_none(),
+        "Healthy sibling source should have no last_error"
+    );
+}
+
+#[iggy_harness(
+    server(connectors_runtime(
+        config_path = "tests/connectors/runtime/source_missing_plugin.toml"
+    )),
+    seed = seeds::connector_stream
+)]
+async fn source_with_missing_plugin_does_not_abort_runtime(harness: 
&TestHarness) {
+    let api_address = harness
+        .connectors_runtime()
+        .expect("connector runtime should be available")
+        .http_url();
+    let http_client = Client::new();
+
+    sleep(Duration::from_millis(500)).await;
+
+    assert_runtime_healthy(&http_client, &api_address).await;
+    let sources = fetch_sources(&http_client, &api_address).await;
+
+    assert_eq!(
+        sources.len(),
+        2,
+        "Both the missing-plugin source and the valid source should be visible 
in the API"
+    );
+
+    let missing_plugin_source = sources
+        .iter()
+        .find(|source| source.key == "random_missing_plugin")
+        .expect("Missing-plugin source should be reported");
+    assert_eq!(missing_plugin_source.status, ConnectorStatus::Error);
+    let last_error = missing_plugin_source
+        .last_error
+        .as_ref()
+        .expect("Missing-plugin source should expose a last_error");
+    assert!(
+        last_error.message.contains("Plugin library not found")
+            || last_error.message.contains("Failed to resolve plugin path"),
+        "last_error should mention the missing plugin path, got: {}",
+        last_error.message
+    );
+
+    let valid_source = sources
+        .iter()
+        .find(|source| source.key == "random_valid")
+        .expect("Healthy sibling source should be reported");
+    assert_eq!(valid_source.status, ConnectorStatus::Running);
+    assert!(
+        valid_source.last_error.is_none(),
+        "Healthy sibling source should have no last_error"
+    );
+}
+
+#[iggy_harness(
+    server(connectors_runtime(
+        config_path = "tests/connectors/runtime/source_invalid_config.toml"
+    )),
+    seed = seeds::connector_stream
+)]
+async fn source_with_invalid_config_does_not_abort_runtime(harness: 
&TestHarness) {
+    let api_address = harness
+        .connectors_runtime()
+        .expect("connector runtime should be available")
+        .http_url();
+    let http_client = Client::new();
+
+    sleep(Duration::from_millis(500)).await;
+
+    assert_runtime_healthy(&http_client, &api_address).await;
+    let sources = fetch_sources(&http_client, &api_address).await;
+
+    assert_eq!(
+        sources.len(),
+        2,
+        "Both the invalid and the valid source should be visible in the API"
+    );
+
+    let invalid_source = sources
+        .iter()
+        .find(|source| source.key == "random_invalid")
+        .expect("Invalid source should be reported");
+    assert_eq!(invalid_source.status, ConnectorStatus::Error);
+    let last_error = invalid_source
+        .last_error
+        .as_ref()
+        .expect("Invalid source should expose a last_error");
+    assert!(
+        last_error.message.contains("linger time"),
+        "last_error should mention the misconfigured linger_time, got: {}",
+        last_error.message
+    );
+
+    let valid_source = sources
+        .iter()
+        .find(|source| source.key == "random_valid")
+        .expect("Healthy sibling source should be reported");
+    assert_eq!(valid_source.status, ConnectorStatus::Running);
+    assert!(
+        valid_source.last_error.is_none(),
+        "Healthy sibling source should have no last_error"
+    );
+}
diff --git a/core/integration/tests/connectors/runtime/mod.rs 
b/core/integration/tests/connectors/runtime/mod.rs
new file mode 100644
index 000000000..e5b353fc3
--- /dev/null
+++ b/core/integration/tests/connectors/runtime/mod.rs
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+mod error_isolation;
diff --git a/core/integration/tests/connectors/runtime/sink_invalid_config.toml 
b/core/integration/tests/connectors/runtime/sink_invalid_config.toml
new file mode 100644
index 000000000..3d2c03553
--- /dev/null
+++ b/core/integration/tests/connectors/runtime/sink_invalid_config.toml
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[connectors]
+config_type = "local"
+config_dir = "tests/connectors/runtime/sink_invalid_config"
diff --git 
a/core/integration/tests/connectors/runtime/sink_invalid_config/stdout_invalid.toml
 
b/core/integration/tests/connectors/runtime/sink_invalid_config/stdout_invalid.toml
new file mode 100644
index 000000000..b4230b888
--- /dev/null
+++ 
b/core/integration/tests/connectors/runtime/sink_invalid_config/stdout_invalid.toml
@@ -0,0 +1,39 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Deliberately broken sink: `poll_interval` cannot be parsed as a duration.
+# Used to assert that the runtime stays alive and reports this connector
+# with `ConnectorStatus::Error` instead of aborting startup.
+
+type = "sink"
+key = "stdout_invalid"
+enabled = true
+version = 0
+name = "Stdout invalid sink"
+path = "../../target/debug/libiggy_connector_stdout_sink"
+verbose = true
+
+[[streams]]
+stream = "test_stream"
+topics = ["test_topic"]
+schema = "json"
+batch_length = 100
+poll_interval = "not-a-valid-duration"
+consumer_group = "stdout_invalid_sink_connector"
+
+[plugin_config]
+print_payload = true
diff --git 
a/core/integration/tests/connectors/runtime/sink_invalid_config/stdout_valid.toml
 
b/core/integration/tests/connectors/runtime/sink_invalid_config/stdout_valid.toml
new file mode 100644
index 000000000..0670d68fd
--- /dev/null
+++ 
b/core/integration/tests/connectors/runtime/sink_invalid_config/stdout_valid.toml
@@ -0,0 +1,39 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Healthy companion sink: verifies that a sibling connector with valid
+# configuration still starts and runs when another connector in the same
+# runtime fails initialization.
+
+type = "sink"
+key = "stdout_valid"
+enabled = true
+version = 0
+name = "Stdout valid sink"
+path = "../../target/debug/libiggy_connector_stdout_sink"
+verbose = true
+
+[[streams]]
+stream = "test_stream"
+topics = ["test_topic"]
+schema = "json"
+batch_length = 100
+poll_interval = "5ms"
+consumer_group = "stdout_valid_sink_connector"
+
+[plugin_config]
+print_payload = true
diff --git a/core/integration/tests/connectors/runtime/sink_missing_plugin.toml 
b/core/integration/tests/connectors/runtime/sink_missing_plugin.toml
new file mode 100644
index 000000000..7eb99d3b4
--- /dev/null
+++ b/core/integration/tests/connectors/runtime/sink_missing_plugin.toml
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[connectors]
+config_type = "local"
+config_dir = "tests/connectors/runtime/sink_missing_plugin_config"
diff --git 
a/core/integration/tests/connectors/runtime/sink_missing_plugin_config/stdout_missing.toml
 
b/core/integration/tests/connectors/runtime/sink_missing_plugin_config/stdout_missing.toml
new file mode 100644
index 000000000..608f34e32
--- /dev/null
+++ 
b/core/integration/tests/connectors/runtime/sink_missing_plugin_config/stdout_missing.toml
@@ -0,0 +1,40 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Deliberately broken sink: `path` points to a shared library that does not
+# exist on disk, exercising the pre-container failure branch in
+# `source::init` / `sink::init` (failed_plugins list, surfaced through
+# `map_failed_sinks` with `ConnectorStatus::Error`).
+
+type = "sink"
+key = "stdout_missing_plugin"
+enabled = true
+version = 0
+name = "Stdout missing plugin sink"
+path = "/nonexistent/path/to/libiggy_connector_stdout_sink_does_not_exist.so"
+verbose = true
+
+[[streams]]
+stream = "test_stream"
+topics = ["test_topic"]
+schema = "json"
+batch_length = 100
+poll_interval = "5ms"
+consumer_group = "stdout_missing_plugin_sink_connector"
+
+[plugin_config]
+print_payload = true
diff --git 
a/core/integration/tests/connectors/runtime/sink_missing_plugin_config/stdout_valid.toml
 
b/core/integration/tests/connectors/runtime/sink_missing_plugin_config/stdout_valid.toml
new file mode 100644
index 000000000..f9a52dd06
--- /dev/null
+++ 
b/core/integration/tests/connectors/runtime/sink_missing_plugin_config/stdout_valid.toml
@@ -0,0 +1,39 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Healthy companion sink: verifies that a sibling connector with valid
+# configuration still starts and runs when another connector in the same
+# runtime fails to load its plugin.
+
+type = "sink"
+key = "stdout_valid"
+enabled = true
+version = 0
+name = "Stdout valid sink"
+path = "../../target/debug/libiggy_connector_stdout_sink"
+verbose = true
+
+[[streams]]
+stream = "test_stream"
+topics = ["test_topic"]
+schema = "json"
+batch_length = 100
+poll_interval = "5ms"
+consumer_group = "stdout_valid_sink_connector"
+
+[plugin_config]
+print_payload = true
diff --git 
a/core/integration/tests/connectors/runtime/source_invalid_config.toml 
b/core/integration/tests/connectors/runtime/source_invalid_config.toml
new file mode 100644
index 000000000..ec101f9cd
--- /dev/null
+++ b/core/integration/tests/connectors/runtime/source_invalid_config.toml
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[connectors]
+config_type = "local"
+config_dir = "tests/connectors/runtime/source_invalid_config"
diff --git 
a/core/integration/tests/connectors/runtime/source_invalid_config/random_invalid.toml
 
b/core/integration/tests/connectors/runtime/source_invalid_config/random_invalid.toml
new file mode 100644
index 000000000..2df8c39c0
--- /dev/null
+++ 
b/core/integration/tests/connectors/runtime/source_invalid_config/random_invalid.toml
@@ -0,0 +1,42 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Deliberately broken source: `linger_time` cannot be parsed as a duration.
+# Used to assert that the runtime stays alive and reports this connector
+# with `ConnectorStatus::Error` after the FFI plugin instance has been
+# cleaned up (post-container Err arm in `source::init`).
+
+type = "source"
+key = "random_invalid"
+enabled = true
+version = 0
+name = "Random invalid source"
+path = "../../target/debug/libiggy_connector_random_source"
+plugin_config_format = "json"
+
+[[streams]]
+stream = "test_stream"
+topic = "test_topic"
+schema = "json"
+batch_length = 1000
+linger_time = "not-a-valid-duration"
+
+[plugin_config]
+interval = "100ms"
+max_count = 1_000_000
+messages_range = [1, 5]
+payload_size = 200
diff --git 
a/core/integration/tests/connectors/runtime/source_invalid_config/random_valid.toml
 
b/core/integration/tests/connectors/runtime/source_invalid_config/random_valid.toml
new file mode 100644
index 000000000..8810177d1
--- /dev/null
+++ 
b/core/integration/tests/connectors/runtime/source_invalid_config/random_valid.toml
@@ -0,0 +1,40 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Healthy companion source: verifies that a sibling connector still starts
+# and runs when another source in the same runtime fails producer setup.
+
+type = "source"
+key = "random_valid"
+enabled = true
+version = 0
+name = "Random valid source"
+path = "../../target/debug/libiggy_connector_random_source"
+plugin_config_format = "json"
+
+[[streams]]
+stream = "test_stream"
+topic = "test_topic"
+schema = "json"
+batch_length = 1000
+linger_time = "5ms"
+
+[plugin_config]
+interval = "100ms"
+max_count = 1_000_000
+messages_range = [1, 5]
+payload_size = 200
diff --git 
a/core/integration/tests/connectors/runtime/source_invalid_state.toml 
b/core/integration/tests/connectors/runtime/source_invalid_state.toml
new file mode 100644
index 000000000..bdb23b871
--- /dev/null
+++ b/core/integration/tests/connectors/runtime/source_invalid_state.toml
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[connectors]
+config_type = "local"
+config_dir = "tests/connectors/runtime/source_invalid_state_config"
diff --git 
a/core/integration/tests/connectors/runtime/source_invalid_state_config/random_invalid_state.toml
 
b/core/integration/tests/connectors/runtime/source_invalid_state_config/random_invalid_state.toml
new file mode 100644
index 000000000..c6632a152
--- /dev/null
+++ 
b/core/integration/tests/connectors/runtime/source_invalid_state_config/random_invalid_state.toml
@@ -0,0 +1,45 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Deliberately broken source: the `/` in the connector key forces the runtime
+# to build a state file path of the form
+# `{state_dir}/source_random_invalid/state_missing_parent.state`.
+# The parent directory `source_random_invalid/` does not exist, so the state
+# load step in `source::init` returns `Err(CannotOpenStateFile)` and the
+# connector is recorded in the `failed_plugins` list. Exercises the
+# state-load failure branch of the per-connector error isolation.
+
+type = "source"
+key = "random_invalid/state_missing_parent"
+enabled = true
+version = 0
+name = "Random invalid state source"
+path = "../../target/debug/libiggy_connector_random_source"
+plugin_config_format = "json"
+
+[[streams]]
+stream = "test_stream"
+topic = "test_topic"
+schema = "json"
+batch_length = 1000
+linger_time = "5ms"
+
+[plugin_config]
+interval = "100ms"
+max_count = 1_000_000
+messages_range = [1, 5]
+payload_size = 200
diff --git 
a/core/integration/tests/connectors/runtime/source_invalid_state_config/random_valid.toml
 
b/core/integration/tests/connectors/runtime/source_invalid_state_config/random_valid.toml
new file mode 100644
index 000000000..82d49229e
--- /dev/null
+++ 
b/core/integration/tests/connectors/runtime/source_invalid_state_config/random_valid.toml
@@ -0,0 +1,40 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Healthy companion source: verifies that a sibling connector keeps running
+# when another source in the same runtime fails its state load.
+
+type = "source"
+key = "random_valid"
+enabled = true
+version = 0
+name = "Random valid source"
+path = "../../target/debug/libiggy_connector_random_source"
+plugin_config_format = "json"
+
+[[streams]]
+stream = "test_stream"
+topic = "test_topic"
+schema = "json"
+batch_length = 1000
+linger_time = "5ms"
+
+[plugin_config]
+interval = "100ms"
+max_count = 1_000_000
+messages_range = [1, 5]
+payload_size = 200
diff --git 
a/core/integration/tests/connectors/runtime/source_missing_plugin.toml 
b/core/integration/tests/connectors/runtime/source_missing_plugin.toml
new file mode 100644
index 000000000..1d810b479
--- /dev/null
+++ b/core/integration/tests/connectors/runtime/source_missing_plugin.toml
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[connectors]
+config_type = "local"
+config_dir = "tests/connectors/runtime/source_missing_plugin_config"
diff --git 
a/core/integration/tests/connectors/runtime/source_missing_plugin_config/random_missing.toml
 
b/core/integration/tests/connectors/runtime/source_missing_plugin_config/random_missing.toml
new file mode 100644
index 000000000..2deb6bbaf
--- /dev/null
+++ 
b/core/integration/tests/connectors/runtime/source_missing_plugin_config/random_missing.toml
@@ -0,0 +1,42 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Deliberately broken source: `path` points to a shared library that does not
+# exist on disk, exercising the pre-container failure branch in
+# `source::init` (failed_plugins list, surfaced through `map_failed_sources`
+# with `ConnectorStatus::Error`).
+
+type = "source"
+key = "random_missing_plugin"
+enabled = true
+version = 0
+name = "Random missing plugin source"
+path = "/nonexistent/path/to/libiggy_connector_random_source_does_not_exist.so"
+plugin_config_format = "json"
+
+[[streams]]
+stream = "test_stream"
+topic = "test_topic"
+schema = "json"
+batch_length = 1000
+linger_time = "5ms"
+
+[plugin_config]
+interval = "100ms"
+max_count = 1_000_000
+messages_range = [1, 5]
+payload_size = 200
diff --git 
a/core/integration/tests/connectors/runtime/source_missing_plugin_config/random_valid.toml
 
b/core/integration/tests/connectors/runtime/source_missing_plugin_config/random_valid.toml
new file mode 100644
index 000000000..a06ad65aa
--- /dev/null
+++ 
b/core/integration/tests/connectors/runtime/source_missing_plugin_config/random_valid.toml
@@ -0,0 +1,40 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Healthy companion source: verifies that a sibling connector keeps running
+# when another source in the same runtime fails to load its plugin.
+
+type = "source"
+key = "random_valid"
+enabled = true
+version = 0
+name = "Random valid source"
+path = "../../target/debug/libiggy_connector_random_source"
+plugin_config_format = "json"
+
+[[streams]]
+stream = "test_stream"
+topic = "test_topic"
+schema = "json"
+batch_length = 1000
+linger_time = "5ms"
+
+[plugin_config]
+interval = "100ms"
+max_count = 1_000_000
+messages_range = [1, 5]
+payload_size = 200

Reply via email to