This is an automated email from the ASF dual-hosted git repository.

maciej pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/master by this push:
     new deb3eaad4 feat(connectors): restart connector with new config without 
runtime restart (#2781)
deb3eaad4 is described below

commit deb3eaad4eba2f25bbcece87c74fb4561cf399c7
Author: xin <[email protected]>
AuthorDate: Mon Mar 9 20:47:26 2026 +0900

    feat(connectors): restart connector with new config without runtime restart 
(#2781)
    
    Closes #2417
---
 core/connectors/runtime/src/api/sink.rs            |  20 +-
 core/connectors/runtime/src/api/source.rs          |  21 +-
 core/connectors/runtime/src/context.rs             |  16 +
 core/connectors/runtime/src/main.rs                | 156 ++++---
 core/connectors/runtime/src/manager/sink.rs        | 472 +++++++++++++++++++-
 core/connectors/runtime/src/manager/source.rs      | 471 +++++++++++++++++++-
 core/connectors/runtime/src/sink.rs                | 261 +++++++-----
 core/connectors/runtime/src/source.rs              | 473 ++++++++++++---------
 core/integration/tests/connectors/postgres/mod.rs  |   1 +
 .../tests/connectors/postgres/restart.rs           | 238 +++++++++++
 10 files changed, 1749 insertions(+), 380 deletions(-)

diff --git a/core/connectors/runtime/src/api/sink.rs 
b/core/connectors/runtime/src/api/sink.rs
index 0a2c8791b..50c9d9b7a 100644
--- a/core/connectors/runtime/src/api/sink.rs
+++ b/core/connectors/runtime/src/api/sink.rs
@@ -30,7 +30,7 @@ use axum::{
     extract::{Path, Query, State},
     http::{HeaderMap, StatusCode, header},
     response::IntoResponse,
-    routing::get,
+    routing::{get, post},
 };
 use serde::Deserialize;
 use std::sync::Arc;
@@ -52,6 +52,7 @@ pub fn router(state: Arc<RuntimeContext>) -> Router {
             "/sinks/{key}/configs/active",
             get(get_sink_active_config).put(update_sink_active_config),
         )
+        .route("/sinks/{key}/restart", post(restart_sink))
         .with_state(state)
 }
 
@@ -246,3 +247,20 @@ async fn delete_sink_config(
         .await?;
     Ok(StatusCode::NO_CONTENT)
 }
+
+async fn restart_sink(
+    State(context): State<Arc<RuntimeContext>>,
+    Path(key): Path<String>,
+) -> Result<StatusCode, ApiError> {
+    context
+        .sinks
+        .restart_connector(
+            &key,
+            context.config_provider.as_ref(),
+            &context.iggy_clients.consumer,
+            &context.metrics,
+            &context,
+        )
+        .await?;
+    Ok(StatusCode::NO_CONTENT)
+}
diff --git a/core/connectors/runtime/src/api/source.rs 
b/core/connectors/runtime/src/api/source.rs
index 7db1c9974..0387421fb 100644
--- a/core/connectors/runtime/src/api/source.rs
+++ b/core/connectors/runtime/src/api/source.rs
@@ -30,7 +30,7 @@ use axum::{
     extract::{Path, Query, State},
     http::{HeaderMap, StatusCode, header},
     response::IntoResponse,
-    routing::get,
+    routing::{get, post},
 };
 use serde::Deserialize;
 use std::sync::Arc;
@@ -55,6 +55,7 @@ pub fn router(state: Arc<RuntimeContext>) -> Router {
             "/sources/{key}/configs/active",
             get(get_source_active_config).put(update_source_active_config),
         )
+        .route("/sources/{key}/restart", post(restart_source))
         .with_state(state)
 }
 
@@ -249,3 +250,21 @@ async fn delete_source_config(
         .await?;
     Ok(StatusCode::NO_CONTENT)
 }
+
+async fn restart_source(
+    State(context): State<Arc<RuntimeContext>>,
+    Path(key): Path<String>,
+) -> Result<StatusCode, ApiError> {
+    context
+        .sources
+        .restart_connector(
+            &key,
+            context.config_provider.as_ref(),
+            &context.iggy_clients.producer,
+            &context.metrics,
+            &context.state_path,
+            &context,
+        )
+        .await?;
+    Ok(StatusCode::NO_CONTENT)
+}
diff --git a/core/connectors/runtime/src/context.rs 
b/core/connectors/runtime/src/context.rs
index 292ea226c..a87095463 100644
--- a/core/connectors/runtime/src/context.rs
+++ b/core/connectors/runtime/src/context.rs
@@ -19,6 +19,7 @@
 use crate::configs::connectors::{ConnectorsConfigProvider, SinkConfig, 
SourceConfig};
 use crate::configs::runtime::ConnectorsRuntimeConfig;
 use crate::metrics::Metrics;
+use crate::stream::IggyClients;
 use crate::{
     SinkConnectorWrapper, SourceConnectorWrapper,
     manager::{
@@ -31,6 +32,7 @@ use iggy_connector_sdk::api::ConnectorError;
 use iggy_connector_sdk::api::ConnectorStatus;
 use std::collections::HashMap;
 use std::sync::Arc;
+use tokio::sync::Mutex;
 use tracing::error;
 
 pub struct RuntimeContext {
@@ -40,8 +42,11 @@ pub struct RuntimeContext {
     pub config_provider: Arc<dyn ConnectorsConfigProvider>,
     pub metrics: Arc<Metrics>,
     pub start_time: IggyTimestamp,
+    pub iggy_clients: Arc<IggyClients>,
+    pub state_path: String,
 }
 
+#[allow(clippy::too_many_arguments)]
 pub fn init(
     config: &ConnectorsRuntimeConfig,
     sinks_config: &HashMap<String, SinkConfig>,
@@ -49,6 +54,8 @@ pub fn init(
     sink_wrappers: &[SinkConnectorWrapper],
     source_wrappers: &[SourceConnectorWrapper],
     config_provider: Box<dyn ConnectorsConfigProvider>,
+    iggy_clients: Arc<IggyClients>,
+    state_path: String,
 ) -> RuntimeContext {
     let metrics = Arc::new(Metrics::init());
     let sinks = SinkManager::new(map_sinks(sinks_config, sink_wrappers));
@@ -64,6 +71,8 @@ pub fn init(
         config_provider: Arc::from(config_provider),
         metrics,
         start_time: IggyTimestamp::now(),
+        iggy_clients,
+        state_path,
     }
 }
 
@@ -103,6 +112,10 @@ fn map_sinks(
                     plugin_config_format: sink_plugin.config_format,
                 },
                 config: sink_config.clone(),
+                shutdown_tx: None,
+                task_handles: vec![],
+                container: None,
+                restart_guard: Arc::new(Mutex::new(())),
             });
         }
     }
@@ -145,6 +158,9 @@ fn map_sources(
                     plugin_config_format: source_plugin.config_format,
                 },
                 config: source_config.clone(),
+                handler_tasks: vec![],
+                container: None,
+                restart_guard: Arc::new(Mutex::new(())),
             });
         }
     }
diff --git a/core/connectors/runtime/src/main.rs 
b/core/connectors/runtime/src/main.rs
index 1d53e71cf..cca15d47a 100644
--- a/core/connectors/runtime/src/main.rs
+++ b/core/connectors/runtime/src/main.rs
@@ -29,6 +29,7 @@ use figlet_rs::FIGfont;
 use iggy::prelude::{Client, IggyConsumer, IggyProducer};
 use iggy_connector_sdk::{
     StreamDecoder, StreamEncoder,
+    api::ConnectorStatus,
     sink::ConsumeCallback,
     source::{HandleCallback, SendCallback},
     transforms::Transform,
@@ -40,7 +41,7 @@ use std::{
     env,
     sync::{Arc, atomic::AtomicU32},
 };
-use tracing::info;
+use tracing::{error, info};
 
 mod api;
 pub(crate) mod configs;
@@ -69,8 +70,8 @@ static PLUGIN_ID: AtomicU32 = AtomicU32::new(1);
 const ALLOWED_PLUGIN_EXTENSIONS: [&str; 3] = ["so", "dylib", "dll"];
 const DEFAULT_CONFIG_PATH: &str = "core/connectors/runtime/config.toml";
 
-#[derive(WrapperApi)]
-struct SourceApi {
+#[derive(WrapperApi, Debug)]
+pub(crate) struct SourceApi {
     iggy_source_open: extern "C" fn(
         id: u32,
         config_ptr: *const u8,
@@ -84,8 +85,8 @@ struct SourceApi {
     iggy_source_version: extern "C" fn() -> *const std::ffi::c_char,
 }
 
-#[derive(WrapperApi)]
-struct SinkApi {
+#[derive(WrapperApi, Debug)]
+pub(crate) struct SinkApi {
     iggy_sink_open: extern "C" fn(
         id: u32,
         config_ptr: *const u8,
@@ -143,7 +144,7 @@ async fn main() -> Result<(), RuntimeError> {
 
     info!("State will be stored in: {}", config.state.path);
 
-    let iggy_clients = stream::init(config.iggy.clone()).await?;
+    let iggy_clients = Arc::new(stream::init(config.iggy.clone()).await?);
 
     let connectors_config_provider: Box<dyn ConnectorsConfigProvider> =
         create_connectors_config_provider(&config.connectors).await?;
@@ -166,47 +167,31 @@ async fn main() -> Result<(), RuntimeError> {
     let sinks = sink::init(sinks_config.clone(), 
&iggy_clients.consumer).await?;
 
     let mut sink_wrappers = vec![];
-    let mut sink_with_plugins = HashMap::new();
-    for (key, sink) in sinks {
-        let plugin_ids = sink
-            .plugins
-            .iter()
-            .filter(|plugin| plugin.error.is_none())
-            .map(|plugin| plugin.id)
-            .collect();
+    let mut sink_containers_by_key: HashMap<String, Arc<Container<SinkApi>>> = 
HashMap::new();
+    for (_path, sink) in sinks {
+        let container = Arc::new(sink.container);
+        let callback = container.iggy_sink_consume;
+        for plugin in &sink.plugins {
+            sink_containers_by_key.insert(plugin.key.clone(), 
container.clone());
+        }
         sink_wrappers.push(SinkConnectorWrapper {
-            callback: sink.container.iggy_sink_consume,
+            callback,
             plugins: sink.plugins,
         });
-        sink_with_plugins.insert(
-            key,
-            SinkWithPlugins {
-                container: sink.container,
-                plugin_ids,
-            },
-        );
     }
 
     let mut source_wrappers = vec![];
-    let mut source_with_plugins = HashMap::new();
-    for (key, source) in sources {
-        let plugin_ids = source
-            .plugins
-            .iter()
-            .filter(|plugin| plugin.error.is_none())
-            .map(|plugin| plugin.id)
-            .collect();
+    let mut source_containers_by_key: HashMap<String, 
Arc<Container<SourceApi>>> = HashMap::new();
+    for (_path, source) in sources {
+        let container = Arc::new(source.container);
+        let callback = container.iggy_source_handle;
+        for plugin in &source.plugins {
+            source_containers_by_key.insert(plugin.key.clone(), 
container.clone());
+        }
         source_wrappers.push(SourceConnectorWrapper {
-            callback: source.container.iggy_source_handle,
+            callback,
             plugins: source.plugins,
         });
-        source_with_plugins.insert(
-            key,
-            SourceWithPlugins {
-                container: source.container,
-                plugin_ids,
-            },
-        );
     }
 
     let context = context::init(
@@ -216,13 +201,47 @@ async fn main() -> Result<(), RuntimeError> {
         &sink_wrappers,
         &source_wrappers,
         connectors_config_provider,
+        iggy_clients.clone(),
+        config.state.path.clone(),
     );
+    for (key, container) in sink_containers_by_key {
+        if let Some(details) = context.sinks.get(&key).await {
+            let mut details = details.lock().await;
+            details.container = Some(container);
+        }
+    }
+    for (key, container) in source_containers_by_key {
+        if let Some(details) = context.sources.get(&key).await {
+            let mut details = details.lock().await;
+            details.container = Some(container);
+        }
+    }
+
     let context = Arc::new(context);
-    api::init(&config.http, context.clone()).await;
 
-    let source_handler_tasks = source::handle(source_wrappers, 
context.clone());
-    sink::consume(sink_wrappers, context.clone());
+    let source_handles = source::handle(source_wrappers, context.clone());
+    for (key, handler_tasks) in source_handles {
+        if let Some(details) = context.sources.get(&key).await {
+            let mut details = details.lock().await;
+            details.handler_tasks = handler_tasks;
+        }
+    }
+
+    let sink_handles = sink::consume(sink_wrappers, context.clone());
+    for (key, shutdown_tx, task_handles) in sink_handles {
+        if let Some(details) = context.sinks.get(&key).await {
+            let mut details = details.lock().await;
+            details.shutdown_tx = Some(shutdown_tx);
+            details.task_handles = task_handles;
+        }
+        context
+            .sinks
+            .update_status(&key, ConnectorStatus::Running, 
Some(&context.metrics))
+            .await;
+    }
+
     info!("All sources and sinks spawned.");
+    api::init(&config.http, context.clone()).await;
 
     #[cfg(unix)]
     let (mut ctrl_c, mut sigterm) = {
@@ -243,26 +262,37 @@ async fn main() -> Result<(), RuntimeError> {
         }
     }
 
-    for (key, source) in source_with_plugins {
-        for id in source.plugin_ids {
-            info!("Closing source connector with ID: {id} for plugin: {key}");
-            source.container.iggy_source_close(id);
-            source::cleanup_sender(id);
-            info!("Closed source connector with ID: {id} for plugin: {key}");
+    let source_keys: Vec<String> = context
+        .sources
+        .get_all()
+        .await
+        .into_iter()
+        .map(|s| s.key)
+        .collect();
+    for key in &source_keys {
+        if let Err(err) = context
+            .sources
+            .stop_connector_with_guard(key, &context.metrics)
+            .await
+        {
+            error!("Failed to stop source connector: {key}. {err}");
         }
     }
 
-    // Wait for source handler tasks to drain remaining messages and persist 
state
-    // before shutting down the Iggy clients they depend on.
-    for handle in source_handler_tasks {
-        let _ = tokio::time::timeout(std::time::Duration::from_secs(5), 
handle).await;
-    }
-
-    for (key, sink) in sink_with_plugins {
-        for id in sink.plugin_ids {
-            info!("Closing sink connector with ID: {id} for plugin: {key}");
-            sink.container.iggy_sink_close(id);
-            info!("Closed sink connector with ID: {id} for plugin: {key}");
+    let sink_keys: Vec<String> = context
+        .sinks
+        .get_all()
+        .await
+        .into_iter()
+        .map(|s| s.key)
+        .collect();
+    for key in &sink_keys {
+        if let Err(err) = context
+            .sinks
+            .stop_connector_with_guard(key, &context.metrics)
+            .await
+        {
+            error!("Failed to stop sink connector: {key}. {err}");
         }
     }
 
@@ -400,11 +430,6 @@ struct SinkConnectorWrapper {
     plugins: Vec<SinkConnectorPlugin>,
 }
 
-struct SinkWithPlugins {
-    container: Container<SinkApi>,
-    plugin_ids: Vec<u32>,
-}
-
 struct SourceConnector {
     container: Container<SourceApi>,
     plugins: Vec<SourceConnectorPlugin>,
@@ -429,11 +454,6 @@ struct SourceConnectorProducer {
     producer: IggyProducer,
 }
 
-struct SourceWithPlugins {
-    container: Container<SourceApi>,
-    plugin_ids: Vec<u32>,
-}
-
 struct SourceConnectorWrapper {
     callback: HandleCallback,
     plugins: Vec<SourceConnectorPlugin>,
diff --git a/core/connectors/runtime/src/manager/sink.rs 
b/core/connectors/runtime/src/manager/sink.rs
index 0bc058201..9c6cafa1e 100644
--- a/core/connectors/runtime/src/manager/sink.rs
+++ b/core/connectors/runtime/src/manager/sink.rs
@@ -16,13 +16,25 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-use crate::configs::connectors::{ConfigFormat, SinkConfig};
+use crate::PLUGIN_ID;
+use crate::SinkApi;
+use crate::configs::connectors::{ConfigFormat, ConnectorsConfigProvider, 
SinkConfig};
+use crate::context::RuntimeContext;
+use crate::error::RuntimeError;
 use crate::metrics::Metrics;
+use crate::sink;
 use dashmap::DashMap;
+use dlopen2::wrapper::Container;
+use iggy::prelude::IggyClient;
 use iggy_connector_sdk::api::{ConnectorError, ConnectorStatus};
 use std::collections::HashMap;
+use std::fmt;
 use std::sync::Arc;
-use tokio::sync::Mutex;
+use std::sync::atomic::Ordering;
+use std::time::Duration;
+use tokio::sync::{Mutex, watch};
+use tokio::task::JoinHandle;
+use tracing::info;
 
 #[derive(Debug)]
 pub struct SinkManager {
@@ -96,6 +108,166 @@ impl SinkManager {
             sink.info.last_error = Some(ConnectorError::new(error_message));
         }
     }
+
+    pub async fn stop_connector_with_guard(
+        &self,
+        key: &str,
+        metrics: &Arc<Metrics>,
+    ) -> Result<(), RuntimeError> {
+        let guard = {
+            let details = self
+                .sinks
+                .get(key)
+                .map(|e| e.value().clone())
+                .ok_or_else(|| RuntimeError::SinkNotFound(key.to_string()))?;
+            let details = details.lock().await;
+            details.restart_guard.clone()
+        };
+        let _lock = guard.lock().await;
+        self.stop_connector(key, metrics).await
+    }
+
+    pub async fn stop_connector(
+        &self,
+        key: &str,
+        metrics: &Arc<Metrics>,
+    ) -> Result<(), RuntimeError> {
+        let details = self
+            .sinks
+            .get(key)
+            .map(|e| e.value().clone())
+            .ok_or_else(|| RuntimeError::SinkNotFound(key.to_string()))?;
+
+        let (shutdown_tx, task_handles, plugin_id, container) = {
+            let mut details = details.lock().await;
+            (
+                details.shutdown_tx.take(),
+                std::mem::take(&mut details.task_handles),
+                details.info.id,
+                details.container.clone(),
+            )
+        };
+
+        if let Some(tx) = shutdown_tx {
+            let _ = tx.send(());
+        }
+
+        for handle in task_handles {
+            let _ = tokio::time::timeout(Duration::from_secs(5), handle).await;
+        }
+
+        if let Some(container) = &container {
+            info!("Closing sink connector with ID: {plugin_id} for plugin: 
{key}");
+            (container.iggy_sink_close)(plugin_id);
+            info!("Closed sink connector with ID: {plugin_id} for plugin: 
{key}");
+        }
+
+        {
+            let mut details = details.lock().await;
+            let old_status = details.info.status;
+            details.info.status = ConnectorStatus::Stopped;
+            details.info.last_error = None;
+            if old_status == ConnectorStatus::Running {
+                metrics.decrement_sinks_running();
+            }
+        }
+
+        Ok(())
+    }
+
+    pub async fn start_connector(
+        &self,
+        key: &str,
+        config: &SinkConfig,
+        iggy_client: &IggyClient,
+        metrics: &Arc<Metrics>,
+        context: &Arc<RuntimeContext>,
+    ) -> Result<(), RuntimeError> {
+        let details = self
+            .sinks
+            .get(key)
+            .map(|e| e.value().clone())
+            .ok_or_else(|| RuntimeError::SinkNotFound(key.to_string()))?;
+
+        let container = {
+            let details = details.lock().await;
+            details.container.clone().ok_or_else(|| {
+                RuntimeError::InvalidConfiguration(format!("No container 
loaded for sink: {key}"))
+            })?
+        };
+
+        let plugin_id = PLUGIN_ID.fetch_add(1, Ordering::SeqCst);
+
+        sink::init_sink(
+            &container,
+            &config.plugin_config.clone().unwrap_or_default(),
+            plugin_id,
+        )?;
+        info!("Sink connector with ID: {plugin_id} for plugin: {key} 
initialized successfully.");
+
+        let consumers = sink::setup_sink_consumers(key, config, 
iggy_client).await?;
+
+        let callback = container.iggy_sink_consume;
+        let (shutdown_tx, task_handles) = sink::spawn_consume_tasks(
+            plugin_id,
+            key,
+            consumers,
+            callback,
+            config.verbose,
+            metrics,
+            context.clone(),
+        );
+
+        {
+            let mut details = details.lock().await;
+            details.info.id = plugin_id;
+            details.info.status = ConnectorStatus::Running;
+            details.info.last_error = None;
+            details.config = config.clone();
+            details.shutdown_tx = Some(shutdown_tx);
+            details.task_handles = task_handles;
+            metrics.increment_sinks_running();
+        }
+
+        Ok(())
+    }
+
+    pub async fn restart_connector(
+        &self,
+        key: &str,
+        config_provider: &dyn ConnectorsConfigProvider,
+        iggy_client: &IggyClient,
+        metrics: &Arc<Metrics>,
+        context: &Arc<RuntimeContext>,
+    ) -> Result<(), RuntimeError> {
+        let guard = {
+            let details = self
+                .sinks
+                .get(key)
+                .map(|e| e.value().clone())
+                .ok_or_else(|| RuntimeError::SinkNotFound(key.to_string()))?;
+            let details = details.lock().await;
+            details.restart_guard.clone()
+        };
+        let Ok(_lock) = guard.try_lock() else {
+            info!("Restart already in progress for sink connector: {key}, 
skipping.");
+            return Ok(());
+        };
+
+        info!("Restarting sink connector: {key}");
+        self.stop_connector(key, metrics).await?;
+
+        let config = config_provider
+            .get_sink_config(key, None)
+            .await
+            .map_err(|e| RuntimeError::InvalidConfiguration(e.to_string()))?
+            .ok_or_else(|| RuntimeError::SinkNotFound(key.to_string()))?;
+
+        self.start_connector(key, &config, iggy_client, metrics, context)
+            .await?;
+        info!("Sink connector: {key} restarted successfully.");
+        Ok(())
+    }
 }
 
 #[derive(Debug, Clone)]
@@ -111,8 +283,302 @@ pub struct SinkInfo {
     pub plugin_config_format: Option<ConfigFormat>,
 }
 
-#[derive(Debug)]
 pub struct SinkDetails {
     pub info: SinkInfo,
     pub config: SinkConfig,
+    pub shutdown_tx: Option<watch::Sender<()>>,
+    pub task_handles: Vec<JoinHandle<()>>,
+    pub container: Option<Arc<Container<SinkApi>>>,
+    pub restart_guard: Arc<Mutex<()>>,
+}
+
+impl fmt::Debug for SinkDetails {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        f.debug_struct("SinkDetails")
+            .field("info", &self.info)
+            .field("config", &self.config)
+            .field("container", &self.container.as_ref().map(|_| "..."))
+            .finish()
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::configs::connectors::SinkConfig;
+
+    fn create_test_sink_info(key: &str, id: u32) -> SinkInfo {
+        SinkInfo {
+            id,
+            key: key.to_string(),
+            name: format!("{key} sink"),
+            path: format!("/path/to/{key}"),
+            version: "1.0.0".to_string(),
+            enabled: true,
+            status: ConnectorStatus::Running,
+            last_error: None,
+            plugin_config_format: None,
+        }
+    }
+
+    fn create_test_sink_details(key: &str, id: u32) -> SinkDetails {
+        SinkDetails {
+            info: create_test_sink_info(key, id),
+            config: SinkConfig {
+                key: key.to_string(),
+                enabled: true,
+                version: 1,
+                name: format!("{key} sink"),
+                path: format!("/path/to/{key}"),
+                ..Default::default()
+            },
+            shutdown_tx: None,
+            task_handles: vec![],
+            container: None,
+            restart_guard: Arc::new(Mutex::new(())),
+        }
+    }
+
+    #[tokio::test]
+    async fn should_create_manager_with_sinks() {
+        let manager = SinkManager::new(vec![
+            create_test_sink_details("es", 1),
+            create_test_sink_details("pg", 2),
+        ]);
+
+        let all = manager.get_all().await;
+        assert_eq!(all.len(), 2);
+    }
+
+    #[tokio::test]
+    async fn should_get_existing_sink() {
+        let manager = SinkManager::new(vec![create_test_sink_details("es", 
1)]);
+
+        let sink = manager.get("es").await;
+        assert!(sink.is_some());
+        let binding = sink.unwrap();
+        let details = binding.lock().await;
+        assert_eq!(details.info.key, "es");
+        assert_eq!(details.info.id, 1);
+    }
+
+    #[tokio::test]
+    async fn should_return_none_for_unknown_key() {
+        let manager = SinkManager::new(vec![create_test_sink_details("es", 
1)]);
+
+        assert!(manager.get("nonexistent").await.is_none());
+    }
+
+    #[tokio::test]
+    async fn should_get_config() {
+        let manager = SinkManager::new(vec![create_test_sink_details("es", 
1)]);
+
+        let config = manager.get_config("es").await;
+        assert!(config.is_some());
+        assert_eq!(config.unwrap().key, "es");
+    }
+
+    #[tokio::test]
+    async fn should_return_none_config_for_unknown_key() {
+        let manager = SinkManager::new(vec![]);
+
+        assert!(manager.get_config("nonexistent").await.is_none());
+    }
+
+    #[tokio::test]
+    async fn should_get_all_sinks() {
+        let manager = SinkManager::new(vec![
+            create_test_sink_details("es", 1),
+            create_test_sink_details("pg", 2),
+            create_test_sink_details("stdout", 3),
+        ]);
+
+        let all = manager.get_all().await;
+        assert_eq!(all.len(), 3);
+        let keys: Vec<String> = all.iter().map(|s| s.key.clone()).collect();
+        assert!(keys.contains(&"es".to_string()));
+        assert!(keys.contains(&"pg".to_string()));
+        assert!(keys.contains(&"stdout".to_string()));
+    }
+
+    #[tokio::test]
+    async fn should_update_status() {
+        let manager = SinkManager::new(vec![create_test_sink_details("es", 
1)]);
+
+        manager
+            .update_status("es", ConnectorStatus::Stopped, None)
+            .await;
+
+        let sink = manager.get("es").await.unwrap();
+        let details = sink.lock().await;
+        assert_eq!(details.info.status, ConnectorStatus::Stopped);
+    }
+
+    #[tokio::test]
+    async fn should_increment_metrics_when_transitioning_to_running() {
+        let metrics = Arc::new(Metrics::init());
+        let mut details = create_test_sink_details("es", 1);
+        details.info.status = ConnectorStatus::Stopped;
+        let manager = SinkManager::new(vec![details]);
+
+        manager
+            .update_status("es", ConnectorStatus::Running, Some(&metrics))
+            .await;
+
+        assert_eq!(metrics.get_sinks_running(), 1);
+    }
+
+    #[tokio::test]
+    async fn should_decrement_metrics_when_leaving_running() {
+        let metrics = Arc::new(Metrics::init());
+        let manager = SinkManager::new(vec![create_test_sink_details("es", 
1)]);
+        metrics.increment_sinks_running();
+
+        manager
+            .update_status("es", ConnectorStatus::Stopped, Some(&metrics))
+            .await;
+
+        assert_eq!(metrics.get_sinks_running(), 0);
+    }
+
+    #[tokio::test]
+    async fn should_clear_error_when_status_becomes_running() {
+        let manager = SinkManager::new(vec![create_test_sink_details("es", 
1)]);
+        manager.set_error("es", "some error").await;
+
+        manager
+            .update_status("es", ConnectorStatus::Running, None)
+            .await;
+
+        let sink = manager.get("es").await.unwrap();
+        let details = sink.lock().await;
+        assert!(details.info.last_error.is_none());
+    }
+
+    #[tokio::test]
+    async fn should_set_error_status_and_message() {
+        let manager = SinkManager::new(vec![create_test_sink_details("es", 
1)]);
+
+        manager.set_error("es", "connection failed").await;
+
+        let sink = manager.get("es").await.unwrap();
+        let details = sink.lock().await;
+        assert_eq!(details.info.status, ConnectorStatus::Error);
+        assert!(details.info.last_error.is_some());
+    }
+
+    #[tokio::test]
+    async fn stop_should_return_not_found_for_unknown_key() {
+        let metrics = Arc::new(Metrics::init());
+        let manager = SinkManager::new(vec![]);
+
+        let result = manager.stop_connector("nonexistent", &metrics).await;
+        assert!(result.is_err());
+        let err = result.unwrap_err();
+        assert!(matches!(err, RuntimeError::SinkNotFound(_)));
+    }
+
+    #[tokio::test]
+    async fn stop_should_send_shutdown_signal_and_update_status() {
+        let metrics = Arc::new(Metrics::init());
+        metrics.increment_sinks_running();
+        let (shutdown_tx, mut shutdown_rx) = watch::channel(());
+        let handle = tokio::spawn(async move {
+            let _ = shutdown_rx.changed().await;
+        });
+        let mut details = create_test_sink_details("es", 1);
+        details.shutdown_tx = Some(shutdown_tx);
+        details.task_handles = vec![handle];
+        let manager = SinkManager::new(vec![details]);
+
+        let result = manager.stop_connector("es", &metrics).await;
+        assert!(result.is_ok());
+
+        let sink = manager.get("es").await.unwrap();
+        let details = sink.lock().await;
+        assert_eq!(details.info.status, ConnectorStatus::Stopped);
+        assert!(details.shutdown_tx.is_none());
+        assert!(details.task_handles.is_empty());
+    }
+
+    #[tokio::test]
+    async fn stop_should_work_without_container() {
+        let metrics = Arc::new(Metrics::init());
+        let mut details = create_test_sink_details("es", 1);
+        details.container = None;
+        details.info.status = ConnectorStatus::Stopped;
+        let manager = SinkManager::new(vec![details]);
+
+        let result = manager.stop_connector("es", &metrics).await;
+        assert!(result.is_ok());
+    }
+
+    #[tokio::test]
+    async fn stop_should_decrement_metrics_from_running() {
+        let metrics = Arc::new(Metrics::init());
+        metrics.increment_sinks_running();
+        let manager = SinkManager::new(vec![create_test_sink_details("es", 
1)]);
+
+        manager.stop_connector("es", &metrics).await.unwrap();
+
+        assert_eq!(metrics.get_sinks_running(), 0);
+    }
+
+    #[tokio::test]
+    async fn should_clear_error_when_status_becomes_stopped() {
+        let manager = SinkManager::new(vec![create_test_sink_details("es", 
1)]);
+        manager.set_error("es", "some error").await;
+
+        manager
+            .update_status("es", ConnectorStatus::Stopped, None)
+            .await;
+
+        let sink = manager.get("es").await.unwrap();
+        let details = sink.lock().await;
+        assert_eq!(details.info.status, ConnectorStatus::Stopped);
+        assert!(details.info.last_error.is_none());
+    }
+
+    #[tokio::test]
+    async fn stop_should_clear_last_error() {
+        let metrics = Arc::new(Metrics::init());
+        let mut details = create_test_sink_details("es", 1);
+        details.info.status = ConnectorStatus::Error;
+        details.info.last_error = Some(ConnectorError::new("previous error"));
+        let manager = SinkManager::new(vec![details]);
+
+        manager.stop_connector("es", &metrics).await.unwrap();
+
+        let sink = manager.get("es").await.unwrap();
+        let details = sink.lock().await;
+        assert!(details.info.last_error.is_none());
+    }
+
+    #[tokio::test]
+    async fn stop_should_not_decrement_metrics_from_non_running() {
+        let metrics = Arc::new(Metrics::init());
+        let mut details = create_test_sink_details("es", 1);
+        details.info.status = ConnectorStatus::Stopped;
+        let manager = SinkManager::new(vec![details]);
+
+        manager.stop_connector("es", &metrics).await.unwrap();
+
+        assert_eq!(metrics.get_sinks_running(), 0);
+    }
+
+    #[tokio::test]
+    async fn update_status_should_be_noop_for_unknown_key() {
+        let manager = SinkManager::new(vec![]);
+
+        manager
+            .update_status("nonexistent", ConnectorStatus::Running, None)
+            .await;
+    }
+
+    #[tokio::test]
+    async fn set_error_should_be_noop_for_unknown_key() {
+        let manager = SinkManager::new(vec![]);
+
+        manager.set_error("nonexistent", "some error").await;
+    }
 }
diff --git a/core/connectors/runtime/src/manager/source.rs 
b/core/connectors/runtime/src/manager/source.rs
index b259fd8cf..14fbb2c0d 100644
--- a/core/connectors/runtime/src/manager/source.rs
+++ b/core/connectors/runtime/src/manager/source.rs
@@ -16,13 +16,26 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-use crate::configs::connectors::{ConfigFormat, SourceConfig};
+use crate::PLUGIN_ID;
+use crate::SourceApi;
+use crate::configs::connectors::{ConfigFormat, ConnectorsConfigProvider, 
SourceConfig};
+use crate::context::RuntimeContext;
+use crate::error::RuntimeError;
 use crate::metrics::Metrics;
+use crate::source;
+use crate::state::{StateProvider, StateStorage};
 use dashmap::DashMap;
+use dlopen2::wrapper::Container;
+use iggy::prelude::IggyClient;
 use iggy_connector_sdk::api::{ConnectorError, ConnectorStatus};
 use std::collections::HashMap;
+use std::fmt;
 use std::sync::Arc;
+use std::sync::atomic::Ordering;
+use std::time::Duration;
 use tokio::sync::Mutex;
+use tokio::task::JoinHandle;
+use tracing::info;
 
 #[derive(Debug)]
 pub struct SourceManager {
@@ -96,6 +109,173 @@ impl SourceManager {
             source.info.last_error = Some(ConnectorError::new(error_message));
         }
     }
+
+    pub async fn stop_connector_with_guard(
+        &self,
+        key: &str,
+        metrics: &Arc<Metrics>,
+    ) -> Result<(), RuntimeError> {
+        let guard = {
+            let details = self
+                .sources
+                .get(key)
+                .map(|e| e.value().clone())
+                .ok_or_else(|| RuntimeError::SourceNotFound(key.to_string()))?;
+            let details = details.lock().await;
+            details.restart_guard.clone()
+        };
+        let _lock = guard.lock().await;
+        self.stop_connector(key, metrics).await
+    }
+
+    pub async fn stop_connector(
+        &self,
+        key: &str,
+        metrics: &Arc<Metrics>,
+    ) -> Result<(), RuntimeError> {
+        let details = self
+            .sources
+            .get(key)
+            .map(|e| e.value().clone())
+            .ok_or_else(|| RuntimeError::SourceNotFound(key.to_string()))?;
+
+        let (task_handles, plugin_id, container) = {
+            let mut details = details.lock().await;
+            (
+                std::mem::take(&mut details.handler_tasks),
+                details.info.id,
+                details.container.clone(),
+            )
+        };
+
+        source::cleanup_sender(plugin_id);
+
+        for handle in task_handles {
+            let _ = tokio::time::timeout(Duration::from_secs(5), handle).await;
+        }
+
+        if let Some(container) = &container {
+            info!("Closing source connector with ID: {plugin_id} for plugin: 
{key}");
+            (container.iggy_source_close)(plugin_id);
+            info!("Closed source connector with ID: {plugin_id} for plugin: 
{key}");
+        }
+
+        {
+            let mut details = details.lock().await;
+            let old_status = details.info.status;
+            details.info.status = ConnectorStatus::Stopped;
+            details.info.last_error = None;
+            if old_status == ConnectorStatus::Running {
+                metrics.decrement_sources_running();
+            }
+        }
+
+        Ok(())
+    }
+
+    pub async fn start_connector(
+        &self,
+        key: &str,
+        config: &SourceConfig,
+        iggy_client: &IggyClient,
+        metrics: &Arc<Metrics>,
+        state_path: &str,
+        context: &Arc<RuntimeContext>,
+    ) -> Result<(), RuntimeError> {
+        let details = self
+            .sources
+            .get(key)
+            .map(|e| e.value().clone())
+            .ok_or_else(|| RuntimeError::SourceNotFound(key.to_string()))?;
+
+        let container = {
+            let details = details.lock().await;
+            details.container.clone().ok_or_else(|| {
+                RuntimeError::InvalidConfiguration(format!("No container 
loaded for source: {key}"))
+            })?
+        };
+
+        let plugin_id = PLUGIN_ID.fetch_add(1, Ordering::SeqCst);
+
+        let state_storage = source::get_state_storage(state_path, key);
+        let state = match &state_storage {
+            StateStorage::File(file) => file.load().await?,
+        };
+
+        source::init_source(
+            &container,
+            &config.plugin_config.clone().unwrap_or_default(),
+            plugin_id,
+            state,
+        )?;
+        info!("Source connector with ID: {plugin_id} for plugin: {key} 
initialized successfully.");
+
+        let (producer, encoder, transforms) =
+            source::setup_source_producer(key, config, iggy_client).await?;
+
+        let callback = container.iggy_source_handle;
+        let handler_tasks = source::spawn_source_handler(
+            plugin_id,
+            key,
+            config.verbose,
+            producer,
+            encoder,
+            transforms,
+            state_storage,
+            callback,
+            context.clone(),
+        );
+
+        {
+            let mut details = details.lock().await;
+            details.info.id = plugin_id;
+            details.info.status = ConnectorStatus::Running;
+            details.info.last_error = None;
+            details.config = config.clone();
+            details.handler_tasks = handler_tasks;
+            metrics.increment_sources_running();
+        }
+
+        Ok(())
+    }
+
+    pub async fn restart_connector(
+        &self,
+        key: &str,
+        config_provider: &dyn ConnectorsConfigProvider,
+        iggy_client: &IggyClient,
+        metrics: &Arc<Metrics>,
+        state_path: &str,
+        context: &Arc<RuntimeContext>,
+    ) -> Result<(), RuntimeError> {
+        let guard = {
+            let details = self
+                .sources
+                .get(key)
+                .map(|e| e.value().clone())
+                .ok_or_else(|| RuntimeError::SourceNotFound(key.to_string()))?;
+            let details = details.lock().await;
+            details.restart_guard.clone()
+        };
+        let Ok(_lock) = guard.try_lock() else {
+            info!("Restart already in progress for source connector: {key}, 
skipping.");
+            return Ok(());
+        };
+
+        info!("Restarting source connector: {key}");
+        self.stop_connector(key, metrics).await?;
+
+        let config = config_provider
+            .get_source_config(key, None)
+            .await
+            .map_err(|e| RuntimeError::InvalidConfiguration(e.to_string()))?
+            .ok_or_else(|| RuntimeError::SourceNotFound(key.to_string()))?;
+
+        self.start_connector(key, &config, iggy_client, metrics, state_path, 
context)
+            .await?;
+        info!("Source connector: {key} restarted successfully.");
+        Ok(())
+    }
 }
 
 #[derive(Debug, Clone)]
@@ -111,8 +291,295 @@ pub struct SourceInfo {
     pub plugin_config_format: Option<ConfigFormat>,
 }
 
-#[derive(Debug)]
 pub struct SourceDetails {
     pub info: SourceInfo,
     pub config: SourceConfig,
+    pub handler_tasks: Vec<JoinHandle<()>>,
+    pub container: Option<Arc<Container<SourceApi>>>,
+    pub restart_guard: Arc<Mutex<()>>,
+}
+
+impl fmt::Debug for SourceDetails {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        f.debug_struct("SourceDetails")
+            .field("info", &self.info)
+            .field("config", &self.config)
+            .field("container", &self.container.as_ref().map(|_| "..."))
+            .finish()
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::configs::connectors::SourceConfig;
+
+    fn create_test_source_info(key: &str, id: u32) -> SourceInfo {
+        SourceInfo {
+            id,
+            key: key.to_string(),
+            name: format!("{key} source"),
+            path: format!("/path/to/{key}"),
+            version: "1.0.0".to_string(),
+            enabled: true,
+            status: ConnectorStatus::Running,
+            last_error: None,
+            plugin_config_format: None,
+        }
+    }
+
+    fn create_test_source_details(key: &str, id: u32) -> SourceDetails {
+        SourceDetails {
+            info: create_test_source_info(key, id),
+            config: SourceConfig {
+                key: key.to_string(),
+                enabled: true,
+                version: 1,
+                name: format!("{key} source"),
+                path: format!("/path/to/{key}"),
+                ..Default::default()
+            },
+            handler_tasks: vec![],
+            container: None,
+            restart_guard: Arc::new(Mutex::new(())),
+        }
+    }
+
+    #[tokio::test]
+    async fn should_create_manager_with_sources() {
+        let manager = SourceManager::new(vec![
+            create_test_source_details("pg", 1),
+            create_test_source_details("random", 2),
+        ]);
+
+        let all = manager.get_all().await;
+        assert_eq!(all.len(), 2);
+    }
+
+    #[tokio::test]
+    async fn should_get_existing_source() {
+        let manager = SourceManager::new(vec![create_test_source_details("pg", 
1)]);
+
+        let source = manager.get("pg").await;
+        assert!(source.is_some());
+        let binding = source.unwrap();
+        let details = binding.lock().await;
+        assert_eq!(details.info.key, "pg");
+        assert_eq!(details.info.id, 1);
+    }
+
+    #[tokio::test]
+    async fn should_return_none_for_unknown_key() {
+        let manager = SourceManager::new(vec![create_test_source_details("pg", 
1)]);
+
+        assert!(manager.get("nonexistent").await.is_none());
+    }
+
+    #[tokio::test]
+    async fn should_get_config() {
+        let manager = SourceManager::new(vec![create_test_source_details("pg", 
1)]);
+
+        let config = manager.get_config("pg").await;
+        assert!(config.is_some());
+        assert_eq!(config.unwrap().key, "pg");
+    }
+
+    #[tokio::test]
+    async fn should_return_none_config_for_unknown_key() {
+        let manager = SourceManager::new(vec![]);
+
+        assert!(manager.get_config("nonexistent").await.is_none());
+    }
+
+    #[tokio::test]
+    async fn should_get_all_sources() {
+        let manager = SourceManager::new(vec![
+            create_test_source_details("pg", 1),
+            create_test_source_details("random", 2),
+            create_test_source_details("es", 3),
+        ]);
+
+        let all = manager.get_all().await;
+        assert_eq!(all.len(), 3);
+        let keys: Vec<String> = all.iter().map(|s| s.key.clone()).collect();
+        assert!(keys.contains(&"pg".to_string()));
+        assert!(keys.contains(&"random".to_string()));
+        assert!(keys.contains(&"es".to_string()));
+    }
+
+    #[tokio::test]
+    async fn should_update_status() {
+        let manager = SourceManager::new(vec![create_test_source_details("pg", 
1)]);
+
+        manager
+            .update_status("pg", ConnectorStatus::Stopped, None)
+            .await;
+
+        let source = manager.get("pg").await.unwrap();
+        let details = source.lock().await;
+        assert_eq!(details.info.status, ConnectorStatus::Stopped);
+    }
+
+    #[tokio::test]
+    async fn should_increment_metrics_when_transitioning_to_running() {
+        let metrics = Arc::new(Metrics::init());
+        let mut details = create_test_source_details("pg", 1);
+        details.info.status = ConnectorStatus::Stopped;
+        let manager = SourceManager::new(vec![details]);
+
+        manager
+            .update_status("pg", ConnectorStatus::Running, Some(&metrics))
+            .await;
+
+        assert_eq!(metrics.get_sources_running(), 1);
+    }
+
+    #[tokio::test]
+    async fn should_decrement_metrics_when_leaving_running() {
+        let metrics = Arc::new(Metrics::init());
+        let manager = SourceManager::new(vec![create_test_source_details("pg", 
1)]);
+        metrics.increment_sources_running();
+
+        manager
+            .update_status("pg", ConnectorStatus::Stopped, Some(&metrics))
+            .await;
+
+        assert_eq!(metrics.get_sources_running(), 0);
+    }
+
+    #[tokio::test]
+    async fn should_clear_error_when_status_becomes_running() {
+        let manager = SourceManager::new(vec![create_test_source_details("pg", 
1)]);
+        manager.set_error("pg", "some error").await;
+
+        manager
+            .update_status("pg", ConnectorStatus::Running, None)
+            .await;
+
+        let source = manager.get("pg").await.unwrap();
+        let details = source.lock().await;
+        assert!(details.info.last_error.is_none());
+    }
+
+    #[tokio::test]
+    async fn should_set_error_status_and_message() {
+        let manager = SourceManager::new(vec![create_test_source_details("pg", 
1)]);
+
+        manager.set_error("pg", "connection failed").await;
+
+        let source = manager.get("pg").await.unwrap();
+        let details = source.lock().await;
+        assert_eq!(details.info.status, ConnectorStatus::Error);
+        assert!(details.info.last_error.is_some());
+    }
+
+    #[tokio::test]
+    async fn stop_should_return_not_found_for_unknown_key() {
+        let metrics = Arc::new(Metrics::init());
+        let manager = SourceManager::new(vec![]);
+
+        let result = manager.stop_connector("nonexistent", &metrics).await;
+        assert!(result.is_err());
+        let err = result.unwrap_err();
+        assert!(matches!(err, RuntimeError::SourceNotFound(_)));
+    }
+
+    #[tokio::test]
+    async fn stop_should_drain_tasks_and_update_status() {
+        let metrics = Arc::new(Metrics::init());
+        metrics.increment_sources_running();
+        let handle = tokio::spawn(async {});
+        let mut details = create_test_source_details("pg", 1);
+        details.handler_tasks = vec![handle];
+        let manager = SourceManager::new(vec![details]);
+
+        let result = manager.stop_connector("pg", &metrics).await;
+        assert!(result.is_ok());
+
+        let source = manager.get("pg").await.unwrap();
+        let details = source.lock().await;
+        assert_eq!(details.info.status, ConnectorStatus::Stopped);
+        assert!(details.handler_tasks.is_empty());
+    }
+
+    #[tokio::test]
+    async fn stop_should_work_without_container() {
+        let metrics = Arc::new(Metrics::init());
+        let mut details = create_test_source_details("pg", 1);
+        details.container = None;
+        details.info.status = ConnectorStatus::Stopped;
+        let manager = SourceManager::new(vec![details]);
+
+        let result = manager.stop_connector("pg", &metrics).await;
+        assert!(result.is_ok());
+    }
+
+    #[tokio::test]
+    async fn stop_should_decrement_metrics_from_running() {
+        let metrics = Arc::new(Metrics::init());
+        metrics.increment_sources_running();
+        let manager = SourceManager::new(vec![create_test_source_details("pg", 
1)]);
+
+        manager.stop_connector("pg", &metrics).await.unwrap();
+
+        assert_eq!(metrics.get_sources_running(), 0);
+    }
+
+    #[tokio::test]
+    async fn should_clear_error_when_status_becomes_stopped() {
+        let manager = SourceManager::new(vec![create_test_source_details("pg", 
1)]);
+        manager.set_error("pg", "some error").await;
+
+        manager
+            .update_status("pg", ConnectorStatus::Stopped, None)
+            .await;
+
+        let source = manager.get("pg").await.unwrap();
+        let details = source.lock().await;
+        assert_eq!(details.info.status, ConnectorStatus::Stopped);
+        assert!(details.info.last_error.is_none());
+    }
+
+    #[tokio::test]
+    async fn stop_should_clear_last_error() {
+        let metrics = Arc::new(Metrics::init());
+        let mut details = create_test_source_details("pg", 1);
+        details.info.status = ConnectorStatus::Error;
+        details.info.last_error = Some(ConnectorError::new("previous error"));
+        let manager = SourceManager::new(vec![details]);
+
+        manager.stop_connector("pg", &metrics).await.unwrap();
+
+        let source = manager.get("pg").await.unwrap();
+        let details = source.lock().await;
+        assert!(details.info.last_error.is_none());
+    }
+
+    #[tokio::test]
+    async fn stop_should_not_decrement_metrics_from_non_running() {
+        let metrics = Arc::new(Metrics::init());
+        let mut details = create_test_source_details("pg", 1);
+        details.info.status = ConnectorStatus::Stopped;
+        let manager = SourceManager::new(vec![details]);
+
+        manager.stop_connector("pg", &metrics).await.unwrap();
+
+        assert_eq!(metrics.get_sources_running(), 0);
+    }
+
+    #[tokio::test]
+    async fn update_status_should_be_noop_for_unknown_key() {
+        let manager = SourceManager::new(vec![]);
+
+        manager
+            .update_status("nonexistent", ConnectorStatus::Running, None)
+            .await;
+    }
+
+    #[tokio::test]
+    async fn set_error_should_be_noop_for_unknown_key() {
+        let manager = SourceManager::new(vec![]);
+
+        manager.set_error("nonexistent", "some error").await;
+    }
 }
diff --git a/core/connectors/runtime/src/sink.rs 
b/core/connectors/runtime/src/sink.rs
index 1e82703fa..2074f845c 100644
--- a/core/connectors/runtime/src/sink.rs
+++ b/core/connectors/runtime/src/sink.rs
@@ -31,7 +31,6 @@ use iggy::prelude::{
     AutoCommit, AutoCommitWhen, IggyClient, IggyConsumer, IggyDuration, 
IggyMessage,
     PollingStrategy,
 };
-use iggy_connector_sdk::api::ConnectorStatus;
 use iggy_connector_sdk::{
     DecodedMessage, MessagesMetadata, RawMessage, RawMessages, 
ReceivedMessage, StreamDecoder,
     TopicMetadata, sink::ConsumeCallback, transforms::Transform,
@@ -42,6 +41,8 @@ use std::{
     sync::{Arc, atomic::Ordering},
     time::Instant,
 };
+use tokio::sync::watch;
+use tokio::task::JoinHandle;
 use tracing::{debug, error, info, warn};
 
 pub async fn init(
@@ -50,7 +51,7 @@ pub async fn init(
 ) -> Result<HashMap<String, SinkConnector>, RuntimeError> {
     let mut sink_connectors: HashMap<String, SinkConnector> = HashMap::new();
     for (key, config) in sink_configs {
-        let name = config.name;
+        let name = config.name.clone();
         if !config.enabled {
             warn!("Sink: {name} is disabled ({key})");
             continue;
@@ -68,7 +69,7 @@ pub async fn init(
             let version = get_plugin_version(&container.container);
             init_error = init_sink(
                 &container.container,
-                &config.plugin_config.unwrap_or_default(),
+                &config.plugin_config.clone().unwrap_or_default(),
                 plugin_id,
             )
             .err()
@@ -96,7 +97,7 @@ pub async fn init(
             let version = get_plugin_version(&container);
             init_error = init_sink(
                 &container,
-                &config.plugin_config.unwrap_or_default(),
+                &config.plugin_config.clone().unwrap_or_default(),
                 plugin_id,
             )
             .err()
@@ -129,21 +130,7 @@ pub async fn init(
             );
         }
 
-        let transforms = if let Some(transforms_config) = config.transforms {
-            let transforms = 
transform::load(&transforms_config).map_err(|error| {
-                RuntimeError::InvalidConfiguration(format!("Failed to load 
transforms: {error}"))
-            })?;
-            let types = transforms
-                .iter()
-                .map(|t| t.r#type().into())
-                .collect::<Vec<&'static str>>()
-                .join(", ");
-            info!("Enabled transforms for sink: {name} ({key}): {types}",);
-            transforms
-        } else {
-            vec![]
-        };
-
+        let consumers = setup_sink_consumers(&key, &config, 
iggy_client).await?;
         let connector = sink_connectors.get_mut(&path).ok_or_else(|| {
             RuntimeError::InvalidConfiguration(format!("Sink connector not 
found for path: {path}"))
         })?;
@@ -156,46 +143,24 @@ pub async fn init(
                     "Sink plugin not found for ID: {plugin_id}"
                 ))
             })?;
-
-        for stream in config.streams.iter() {
-            let poll_interval = IggyDuration::from_str(
-                stream.poll_interval.as_deref().unwrap_or("5ms"),
-            )
-            .map_err(|error| {
-                RuntimeError::InvalidConfiguration(format!("Invalid poll 
interval: {error}"))
-            })?;
-            let default_consumer_group = format!("iggy-connect-sink-{key}");
-            let consumer_group = stream
-                .consumer_group
-                .as_deref()
-                .unwrap_or(&default_consumer_group);
-            let batch_length = stream.batch_length.unwrap_or(1000);
-            for topic in stream.topics.iter() {
-                let mut consumer = iggy_client
-                    .consumer_group(consumer_group, &stream.stream, topic)?
-                    
.auto_commit(AutoCommit::When(AutoCommitWhen::PollingMessages))
-                    .create_consumer_group_if_not_exists()
-                    .auto_join_consumer_group()
-                    .polling_strategy(PollingStrategy::next())
-                    .poll_interval(poll_interval)
-                    .batch_length(batch_length)
-                    .build();
-
-                consumer.init().await?;
-                plugin.consumers.push(SinkConnectorConsumer {
-                    consumer,
-                    decoder: stream.schema.decoder(),
-                    batch_size: batch_length,
-                    transforms: transforms.clone(),
-                });
-            }
+        for (consumer, decoder, batch_size, transforms) in consumers {
+            plugin.consumers.push(SinkConnectorConsumer {
+                consumer,
+                decoder,
+                batch_size,
+                transforms,
+            });
         }
     }
 
     Ok(sink_connectors)
 }
 
-pub fn consume(sinks: Vec<SinkConnectorWrapper>, context: Arc<RuntimeContext>) 
{
+pub fn consume(
+    sinks: Vec<SinkConnectorWrapper>,
+    context: Arc<RuntimeContext>,
+) -> Vec<(String, watch::Sender<()>, Vec<JoinHandle<()>>)> {
+    let mut handles = Vec::new();
     for sink in sinks {
         for plugin in sink.plugins {
             if let Some(error) = &plugin.error {
@@ -206,56 +171,79 @@ pub fn consume(sinks: Vec<SinkConnectorWrapper>, context: 
Arc<RuntimeContext>) {
                 continue;
             }
             info!("Starting consume for sink with ID: {}...", plugin.id);
-            for consumer in plugin.consumers {
-                let plugin_key = plugin.key.clone();
-                let context = context.clone();
-
-                tokio::spawn(async move {
-                    context
-                        .sinks
-                        .update_status(
-                            &plugin_key,
-                            ConnectorStatus::Running,
-                            Some(&context.metrics),
-                        )
-                        .await;
-
-                    if let Err(error) = consume_messages(
-                        plugin.id,
-                        consumer.decoder,
-                        consumer.batch_size,
-                        sink.callback,
-                        consumer.transforms,
-                        consumer.consumer,
-                        plugin.verbose,
-                        &plugin_key,
-                        &context.metrics,
-                    )
-                    .await
-                    {
-                        let error_msg = format!(
-                            "Failed to consume messages for sink connector 
with ID: {}. {error}",
-                            plugin.id
-                        );
-                        error!("{error_msg}");
-                        context
-                            .metrics
-                            .increment_errors(&plugin_key, 
ConnectorType::Sink);
-                        context.sinks.set_error(&plugin_key, &error_msg).await;
-                        return;
-                    }
-                    info!(
-                        "Consume messages for sink connector with ID: {} 
started successfully.",
-                        plugin.id
-                    );
-                });
-            }
+            let consumers = plugin
+                .consumers
+                .into_iter()
+                .map(|c| (c.consumer, c.decoder, c.batch_size, c.transforms))
+                .collect();
+            let (shutdown_tx, task_handles) = spawn_consume_tasks(
+                plugin.id,
+                &plugin.key,
+                consumers,
+                sink.callback,
+                plugin.verbose,
+                &context.metrics,
+                context.clone(),
+            );
+            handles.push((plugin.key, shutdown_tx, task_handles));
         }
     }
+    handles
+}
+
+#[allow(clippy::type_complexity)]
+pub(crate) fn spawn_consume_tasks(
+    plugin_id: u32,
+    plugin_key: &str,
+    consumers: Vec<(
+        IggyConsumer,
+        Arc<dyn StreamDecoder>,
+        u32,
+        Vec<Arc<dyn Transform>>,
+    )>,
+    callback: ConsumeCallback,
+    verbose: bool,
+    metrics: &Arc<Metrics>,
+    context: Arc<RuntimeContext>,
+) -> (watch::Sender<()>, Vec<JoinHandle<()>>) {
+    let (shutdown_tx, shutdown_rx) = watch::channel(());
+    let mut task_handles = Vec::new();
+    for (consumer, decoder, batch_size, transforms) in consumers {
+        let plugin_key = plugin_key.to_string();
+        let metrics = metrics.clone();
+        let shutdown_rx = shutdown_rx.clone();
+        let context = context.clone();
+        let handle = tokio::spawn(async move {
+            if let Err(error) = consume_messages(
+                plugin_id,
+                decoder,
+                batch_size,
+                callback,
+                transforms,
+                consumer,
+                verbose,
+                &plugin_key,
+                &metrics,
+                shutdown_rx,
+            )
+            .await
+            {
+                error!(
+                    "Failed to consume messages for sink connector with ID: 
{plugin_id}: {error}"
+                );
+                context
+                    .sinks
+                    .set_error(&plugin_key, &error.to_string())
+                    .await;
+            }
+        });
+        task_handles.push(handle);
+    }
+    (shutdown_tx, task_handles)
 }
 
 #[allow(clippy::too_many_arguments)]
-async fn consume_messages(
+pub(crate) async fn consume_messages(
     plugin_id: u32,
     decoder: Arc<dyn StreamDecoder>,
     batch_size: u32,
@@ -265,6 +253,7 @@ async fn consume_messages(
     verbose: bool,
     plugin_key: &str,
     metrics: &Arc<Metrics>,
+    mut shutdown_rx: watch::Receiver<()>,
 ) -> Result<(), RuntimeError> {
     info!("Started consuming messages for sink connector with ID: 
{plugin_id}");
     let batch_size = batch_size as usize;
@@ -274,7 +263,18 @@ async fn consume_messages(
         topic: consumer.topic().to_string(),
     };
 
-    while let Some(message) = consumer.next().await {
+    loop {
+        let message = tokio::select! {
+            _ = shutdown_rx.changed() => {
+                info!("Sink connector with ID: {plugin_id} received shutdown 
signal");
+                break;
+            }
+            msg = consumer.next() => msg,
+        };
+
+        let Some(message) = message else {
+            break;
+        };
         let Ok(message) = message else {
             error!("Failed to receive message.");
             continue;
@@ -356,7 +356,7 @@ fn get_plugin_version(container: &Container<SinkApi>) -> 
String {
     }
 }
 
-fn init_sink(
+pub(crate) fn init_sink(
     container: &Container<SinkApi>,
     plugin_config: &serde_json::Value,
     id: u32,
@@ -377,6 +377,67 @@ fn init_sink(
     }
 }
 
+pub(crate) async fn setup_sink_consumers(
+    key: &str,
+    config: &SinkConfig,
+    iggy_client: &IggyClient,
+) -> Result<
+    Vec<(
+        IggyConsumer,
+        Arc<dyn StreamDecoder>,
+        u32,
+        Vec<Arc<dyn Transform>>,
+    )>,
+    RuntimeError,
+> {
+    let transforms = if let Some(transforms_config) = &config.transforms {
+        let loaded = transform::load(transforms_config).map_err(|error| {
+            RuntimeError::InvalidConfiguration(format!("Failed to load 
transforms: {error}"))
+        })?;
+        for t in &loaded {
+            info!("Loaded transform: {:?} for sink: {key}", t.r#type());
+        }
+        loaded
+    } else {
+        vec![]
+    };
+
+    let mut consumers = Vec::new();
+    for stream in config.streams.iter() {
+        let poll_interval = IggyDuration::from_str(
+            stream.poll_interval.as_deref().unwrap_or("5ms"),
+        )
+        .map_err(|error| {
+            RuntimeError::InvalidConfiguration(format!("Invalid poll interval: 
{error}"))
+        })?;
+        let default_consumer_group = format!("iggy-connect-sink-{key}");
+        let consumer_group = stream
+            .consumer_group
+            .as_deref()
+            .unwrap_or(&default_consumer_group);
+        let batch_length = stream.batch_length.unwrap_or(1000);
+        for topic in stream.topics.iter() {
+            let mut consumer = iggy_client
+                .consumer_group(consumer_group, &stream.stream, topic)?
+                .auto_commit(AutoCommit::When(AutoCommitWhen::PollingMessages))
+                .create_consumer_group_if_not_exists()
+                .auto_join_consumer_group()
+                .polling_strategy(PollingStrategy::next())
+                .poll_interval(poll_interval)
+                .batch_length(batch_length)
+                .build();
+            consumer.init().await?;
+            consumers.push((
+                consumer,
+                stream.schema.decoder(),
+                batch_length,
+                transforms.clone(),
+            ));
+        }
+    }
+    Ok(consumers)
+}
+
 async fn process_messages(
     plugin_id: u32,
     messages_metadata: MessagesMetadata,
diff --git a/core/connectors/runtime/src/source.rs 
b/core/connectors/runtime/src/source.rs
index 8fe422510..51ec06026 100644
--- a/core/connectors/runtime/src/source.rs
+++ b/core/connectors/runtime/src/source.rs
@@ -22,10 +22,11 @@ use dlopen2::wrapper::Container;
 use flume::{Receiver, Sender};
 use iggy::prelude::{
     DirectConfig, HeaderKey, HeaderValue, IggyClient, IggyDuration, IggyError, 
IggyMessage,
+    IggyProducer,
 };
 use iggy_connector_sdk::{
     ConnectorState, DecodedMessage, Error, ProducedMessages, StreamEncoder, 
TopicMetadata,
-    transforms::Transform,
+    source::HandleCallback, transforms::Transform,
 };
 use once_cell::sync::Lazy;
 use std::{
@@ -46,6 +47,7 @@ use crate::{
     transform,
 };
 use iggy_connector_sdk::api::ConnectorStatus;
+use tokio::task::JoinHandle;
 
 pub static SOURCE_SENDERS: Lazy<DashMap<u32, Sender<ProducedMessages>>> = 
Lazy::new(DashMap::new);
 
@@ -60,7 +62,7 @@ pub async fn init(
 ) -> Result<HashMap<String, SourceConnector>, RuntimeError> {
     let mut source_connectors: HashMap<String, SourceConnector> = 
HashMap::new();
     for (key, config) in source_configs {
-        let name = config.name;
+        let name = config.name.clone();
         if !config.enabled {
             warn!("Source: {name} is disabled ({key})");
             continue;
@@ -82,7 +84,7 @@ pub async fn init(
             let version = get_plugin_version(&container.container);
             init_error = init_source(
                 &container.container,
-                &config.plugin_config.unwrap_or_default(),
+                &config.plugin_config.clone().unwrap_or_default(),
                 plugin_id,
                 state,
             )
@@ -113,7 +115,7 @@ pub async fn init(
             let version = get_plugin_version(&container);
             init_error = init_source(
                 &container,
-                &config.plugin_config.unwrap_or_default(),
+                &config.plugin_config.clone().unwrap_or_default(),
                 plugin_id,
                 state,
             )
@@ -149,20 +151,8 @@ pub async fn init(
             );
         }
 
-        let transforms = if let Some(transforms_config) = config.transforms {
-            let transforms = 
transform::load(&transforms_config).map_err(|error| {
-                RuntimeError::InvalidConfiguration(format!("Failed to load 
transforms: {error}"))
-            })?;
-            let types = transforms
-                .iter()
-                .map(|t| t.r#type().into())
-                .collect::<Vec<&'static str>>()
-                .join(", ");
-            info!("Enabled transforms for source: {name} ({key}): {types}",);
-            transforms
-        } else {
-            vec![]
-        };
+        let (producer, encoder, transforms) =
+            setup_source_producer(&key, &config, iggy_client).await?;
 
         let connector = source_connectors.get_mut(&path).ok_or_else(|| {
             RuntimeError::InvalidConfiguration(format!(
@@ -178,32 +168,8 @@ pub async fn init(
                     "Source plugin not found for ID: {plugin_id}"
                 ))
             })?;
-
-        for stream in config.streams.iter() {
-            let linger_time = IggyDuration::from_str(
-                stream.linger_time.as_deref().unwrap_or("5ms"),
-            )
-            .map_err(|error| {
-                RuntimeError::InvalidConfiguration(format!("Invalid linger 
time: {error}"))
-            })?;
-            let batch_length = stream.batch_length.unwrap_or(1000);
-            let producer = iggy_client
-                .producer(&stream.stream, &stream.topic)?
-                .direct(
-                    DirectConfig::builder()
-                        .batch_length(batch_length)
-                        .linger_time(linger_time)
-                        .build(),
-                )
-                .build();
-
-            producer.init().await?;
-            plugin.producer = Some(SourceConnectorProducer {
-                producer,
-                encoder: stream.schema.encoder(),
-            });
-            plugin.transforms = transforms.clone();
-        }
+        plugin.producer = Some(SourceConnectorProducer { producer, encoder });
+        plugin.transforms = transforms;
     }
 
     Ok(source_connectors)
@@ -218,7 +184,7 @@ fn get_plugin_version(container: &Container<SourceApi>) -> 
String {
     }
 }
 
-fn init_source(
+pub(crate) fn init_source(
     container: &Container<SourceApi>,
     plugin_config: &serde_json::Value,
     id: u32,
@@ -246,21 +212,256 @@ fn init_source(
     }
 }
 
-fn get_state_storage(state_path: &str, key: &str) -> StateStorage {
+pub(crate) fn get_state_storage(state_path: &str, key: &str) -> StateStorage {
     let path = format!("{state_path}/source_{key}.state");
     StateStorage::File(FileStateProvider::new(path))
 }
 
+pub(crate) async fn setup_source_producer(
+    key: &str,
+    config: &SourceConfig,
+    iggy_client: &IggyClient,
+) -> Result<
+    (
+        IggyProducer,
+        Arc<dyn StreamEncoder>,
+        Vec<Arc<dyn Transform>>,
+    ),
+    RuntimeError,
+> {
+    let transforms = if let Some(transforms_config) = &config.transforms {
+        let loaded = transform::load(transforms_config).map_err(|error| {
+            RuntimeError::InvalidConfiguration(format!("Failed to load 
transforms: {error}"))
+        })?;
+        for t in &loaded {
+            info!("Loaded transform: {:?} for source: {key}", t.r#type());
+        }
+        loaded
+    } else {
+        vec![]
+    };
+
+    let mut last_producer = None;
+    let mut last_encoder = None;
+    for stream in config.streams.iter() {
+        let linger_time = 
IggyDuration::from_str(stream.linger_time.as_deref().unwrap_or("5ms"))
+            .map_err(|error| {
+                RuntimeError::InvalidConfiguration(format!("Invalid linger 
time: {error}"))
+            })?;
+        let batch_length = stream.batch_length.unwrap_or(1000);
+        let producer = iggy_client
+            .producer(&stream.stream, &stream.topic)?
+            .direct(
+                DirectConfig::builder()
+                    .batch_length(batch_length)
+                    .linger_time(linger_time)
+                    .build(),
+            )
+            .build();
+        producer.init().await?;
+        last_encoder = Some(stream.schema.encoder());
+        last_producer = Some(producer);
+    }
+
+    let producer = last_producer.ok_or_else(|| {
+        RuntimeError::InvalidConfiguration("No streams configured for 
source".to_string())
+    })?;
+    let encoder = last_encoder.ok_or_else(|| {
+        RuntimeError::InvalidConfiguration("No encoder configured for 
source".to_string())
+    })?;
+
+    Ok((producer, encoder, transforms))
+}
+
+#[allow(clippy::too_many_arguments)]
+pub(crate) async fn source_forwarding_loop(
+    plugin_id: u32,
+    plugin_key: String,
+    verbose: bool,
+    producer: IggyProducer,
+    encoder: Arc<dyn StreamEncoder>,
+    transforms: Vec<Arc<dyn Transform>>,
+    state_storage: StateStorage,
+    receiver: Receiver<ProducedMessages>,
+    context: Arc<RuntimeContext>,
+) {
+    info!("Source connector with ID: {plugin_id} started.");
+    context
+        .sources
+        .update_status(
+            &plugin_key,
+            ConnectorStatus::Running,
+            Some(&context.metrics),
+        )
+        .await;
+
+    let mut number = 1u64;
+    let topic_metadata = TopicMetadata {
+        stream: producer.stream().to_string(),
+        topic: producer.topic().to_string(),
+    };
+
+    while let Ok(produced_messages) = receiver.recv_async().await {
+        let count = produced_messages.messages.len();
+        context
+            .metrics
+            .increment_messages_produced(&plugin_key, count as u64);
+        if verbose {
+            info!("Source connector with ID: {plugin_id} received {count} 
messages");
+        } else {
+            debug!("Source connector with ID: {plugin_id} received {count} 
messages");
+        }
+        let schema = produced_messages.schema;
+        let mut messages: Vec<DecodedMessage> = Vec::with_capacity(count);
+        for message in produced_messages.messages {
+            let Ok(payload) = schema.try_into_payload(message.payload) else {
+                error!(
+                    "Failed to decode message payload with schema: {schema} 
for source connector with ID: {plugin_id}",
+                );
+                continue;
+            };
+
+            debug!(
+                "Source connector with ID: {plugin_id}] received message: 
{number} | schema: {schema} | payload: {payload}"
+            );
+            messages.push(DecodedMessage {
+                id: message.id,
+                offset: None,
+                headers: message.headers,
+                checksum: message.checksum,
+                timestamp: message.timestamp,
+                origin_timestamp: message.origin_timestamp,
+                payload,
+            });
+            number += 1;
+        }
+
+        let Ok(iggy_messages) =
+            process_messages(plugin_id, &encoder, &topic_metadata, messages, 
&transforms)
+        else {
+            let error_msg = format!(
+                "Failed to process {count} messages by source connector with 
ID: {plugin_id} before sending them to stream: {}, topic: {}.",
+                producer.stream(),
+                producer.topic()
+            );
+            error!("{error_msg}");
+            context
+                .metrics
+                .increment_errors(&plugin_key, ConnectorType::Source);
+            context.sources.set_error(&plugin_key, &error_msg).await;
+            continue;
+        };
+
+        if let Err(error) = producer.send(iggy_messages).await {
+            let error_msg = format!(
+                "Failed to send {count} messages to stream: {}, topic: {} by 
source connector with ID: {plugin_id}. {error}",
+                producer.stream(),
+                producer.topic(),
+            );
+            error!("{error_msg}");
+            context
+                .metrics
+                .increment_errors(&plugin_key, ConnectorType::Source);
+            context.sources.set_error(&plugin_key, &error_msg).await;
+            continue;
+        }
+
+        context
+            .metrics
+            .increment_messages_sent(&plugin_key, count as u64);
+
+        if verbose {
+            info!(
+                "Sent {count} messages to stream: {}, topic: {} by source 
connector with ID: {plugin_id}",
+                producer.stream(),
+                producer.topic()
+            );
+        } else {
+            debug!(
+                "Sent {count} messages to stream: {}, topic: {} by source 
connector with ID: {plugin_id}",
+                producer.stream(),
+                producer.topic()
+            );
+        }
+
+        let Some(state) = produced_messages.state else {
+            debug!("No state provided for source connector with ID: 
{plugin_id}");
+            continue;
+        };
+
+        match &state_storage {
+            StateStorage::File(file) => {
+                if let Err(error) = file.save(state).await {
+                    let error_msg = format!(
+                        "Failed to save state for source connector with ID: 
{plugin_id}. {error}"
+                    );
+                    error!("{error_msg}");
+                    context.sources.set_error(&plugin_key, &error_msg).await;
+                    continue;
+                }
+                debug!("State saved for source connector with ID: 
{plugin_id}");
+            }
+        }
+    }
+
+    info!("Source connector with ID: {plugin_id} stopped.");
+    context
+        .sources
+        .update_status(
+            &plugin_key,
+            ConnectorStatus::Stopped,
+            Some(&context.metrics),
+        )
+        .await;
+}
+
+#[allow(clippy::too_many_arguments)]
+pub(crate) fn spawn_source_handler(
+    plugin_id: u32,
+    plugin_key: &str,
+    verbose: bool,
+    producer: IggyProducer,
+    encoder: Arc<dyn StreamEncoder>,
+    transforms: Vec<Arc<dyn Transform>>,
+    state_storage: StateStorage,
+    callback: HandleCallback,
+    context: Arc<RuntimeContext>,
+) -> Vec<JoinHandle<()>> {
+    let (sender, receiver) = flume::unbounded();
+    SOURCE_SENDERS.insert(plugin_id, sender);
+
+    let blocking_handle = tokio::task::spawn_blocking(move || {
+        callback(plugin_id, handle_produced_messages);
+    });
+
+    let plugin_key = plugin_key.to_string();
+    let handler_task = tokio::spawn(async move {
+        source_forwarding_loop(
+            plugin_id,
+            plugin_key,
+            verbose,
+            producer,
+            encoder,
+            transforms,
+            state_storage,
+            receiver,
+            context,
+        )
+        .await;
+    });
+
+    vec![blocking_handle, handler_task]
+}
+
 pub fn handle(
     sources: Vec<SourceConnectorWrapper>,
     context: Arc<RuntimeContext>,
-) -> Vec<tokio::task::JoinHandle<()>> {
-    let mut handler_tasks = Vec::new();
+) -> Vec<(String, Vec<JoinHandle<()>>)> {
+    let mut handles = Vec::new();
     for source in sources {
         for plugin in source.plugins {
             let plugin_id = plugin.id;
             let plugin_key = plugin.key.clone();
-            let context = context.clone();
 
             if let Some(error) = &plugin.error {
                 error!(
@@ -270,165 +471,27 @@ pub fn handle(
             }
             info!("Starting handler for source connector with ID: 
{plugin_id}...");
 
-            let handle = source.callback;
-            tokio::task::spawn_blocking(move || {
-                handle(plugin_id, handle_produced_messages);
-            });
-            info!("Handler for source connector with ID: {plugin_id} started 
successfully.");
-
-            let (sender, receiver): (Sender<ProducedMessages>, 
Receiver<ProducedMessages>) =
-                flume::unbounded();
-            SOURCE_SENDERS.insert(plugin_id, sender);
-            let handler_task = tokio::spawn(async move {
-                info!("Source connector with ID: {plugin_id} started.");
-                let Some(producer) = &plugin.producer else {
-                    error!("Producer not initialized for source connector with 
ID: {plugin_id}");
-                    context
-                        .sources
-                        .set_error(&plugin_key, "Producer not initialized")
-                        .await;
-                    return;
-                };
-
-                context
-                    .sources
-                    .update_status(
-                        &plugin_key,
-                        ConnectorStatus::Running,
-                        Some(&context.metrics),
-                    )
-                    .await;
-                let encoder = producer.encoder.clone();
-                let producer = &producer.producer;
-                let mut number = 1u64;
-
-                let topic_metadata = TopicMetadata {
-                    stream: producer.stream().to_string(),
-                    topic: producer.topic().to_string(),
-                };
-
-                while let Ok(produced_messages) = receiver.recv_async().await {
-                    let count = produced_messages.messages.len();
-                    context
-                        .metrics
-                        .increment_messages_produced(&plugin_key, count as 
u64);
-                    if plugin.verbose {
-                        info!("Source connector with ID: {plugin_id} received 
{count} messages");
-                    } else {
-                        debug!("Source connector with ID: {plugin_id} received 
{count} messages");
-                    }
-                    let schema = produced_messages.schema;
-                    let mut messages: Vec<DecodedMessage> = 
Vec::with_capacity(count);
-                    for message in produced_messages.messages {
-                        let Ok(payload) = 
schema.try_into_payload(message.payload) else {
-                            error!(
-                                "Failed to decode message payload with schema: 
{} for source connector with ID: {plugin_id}",
-                                produced_messages.schema
-                            );
-                            continue;
-                        };
-
-                        debug!(
-                            "Source connector with ID: {plugin_id}] received 
message: {number} | schema: {schema} | payload: {payload}"
-                        );
-                        messages.push(DecodedMessage {
-                            id: message.id,
-                            offset: None,
-                            headers: message.headers,
-                            checksum: message.checksum,
-                            timestamp: message.timestamp,
-                            origin_timestamp: message.origin_timestamp,
-                            payload,
-                        });
-                        number += 1;
-                    }
-
-                    let Ok(iggy_messages) = process_messages(
-                        plugin_id,
-                        &encoder,
-                        &topic_metadata,
-                        messages,
-                        &plugin.transforms,
-                    ) else {
-                        let error_msg = format!(
-                            "Failed to process {count} messages by source 
connector with ID: {plugin_id} before sending them to stream: {}, topic: {}.",
-                            producer.stream(),
-                            producer.topic()
-                        );
-                        error!("{error_msg}");
-                        context
-                            .metrics
-                            .increment_errors(&plugin_key, 
ConnectorType::Source);
-                        context.sources.set_error(&plugin_key, 
&error_msg).await;
-                        continue;
-                    };
-
-                    if let Err(error) = producer.send(iggy_messages).await {
-                        let error_msg = format!(
-                            "Failed to send {count} messages to stream: {}, 
topic: {} by source connector with ID: {plugin_id}. {error}",
-                            producer.stream(),
-                            producer.topic(),
-                        );
-                        error!("{error_msg}");
-                        context
-                            .metrics
-                            .increment_errors(&plugin_key, 
ConnectorType::Source);
-                        context.sources.set_error(&plugin_key, 
&error_msg).await;
-                        continue;
-                    }
-
-                    context
-                        .metrics
-                        .increment_messages_sent(&plugin_key, count as u64);
-
-                    if plugin.verbose {
-                        info!(
-                            "Sent {count} messages to stream: {}, topic: {} by 
source connector with ID: {plugin_id}",
-                            producer.stream(),
-                            producer.topic()
-                        );
-                    } else {
-                        debug!(
-                            "Sent {count} messages to stream: {}, topic: {} by 
source connector with ID: {plugin_id}",
-                            producer.stream(),
-                            producer.topic()
-                        );
-                    }
+            let Some(producer_wrapper) = plugin.producer else {
+                error!("Producer not initialized for source connector with ID: 
{plugin_id}");
+                continue;
+            };
 
-                    let Some(state) = produced_messages.state else {
-                        debug!("No state provided for source connector with 
ID: {plugin_id}");
-                        continue;
-                    };
-
-                    match &plugin.state_storage {
-                        StateStorage::File(file) => {
-                            if let Err(error) = file.save(state).await {
-                                let error_msg = format!(
-                                    "Failed to save state for source connector 
with ID: {plugin_id}. {error}"
-                                );
-                                error!("{error_msg}");
-                                context.sources.set_error(&plugin_key, 
&error_msg).await;
-                                continue;
-                            }
-                            debug!("State saved for source connector with ID: 
{plugin_id}");
-                        }
-                    }
-                }
+            let handler_tasks = spawn_source_handler(
+                plugin_id,
+                &plugin_key,
+                plugin.verbose,
+                producer_wrapper.producer,
+                producer_wrapper.encoder,
+                plugin.transforms,
+                plugin.state_storage,
+                source.callback,
+                context.clone(),
+            );
 
-                info!("Source connector with ID: {plugin_id} stopped.");
-                context
-                    .sources
-                    .update_status(
-                        &plugin_key,
-                        ConnectorStatus::Stopped,
-                        Some(&context.metrics),
-                    )
-                    .await;
-            });
-            handler_tasks.push(handler_task);
+            handles.push((plugin_key, handler_tasks));
         }
     }
-    handler_tasks
+    handles
 }
 
 fn process_messages(
@@ -475,7 +538,7 @@ fn process_messages(
     Ok(iggy_messages)
 }
 
-extern "C" fn handle_produced_messages(
+pub(crate) extern "C" fn handle_produced_messages(
     plugin_id: u32,
     messages_ptr: *const u8,
     messages_len: usize,
diff --git a/core/integration/tests/connectors/postgres/mod.rs 
b/core/integration/tests/connectors/postgres/mod.rs
index cb16dfbfb..c91abf9ab 100644
--- a/core/integration/tests/connectors/postgres/mod.rs
+++ b/core/integration/tests/connectors/postgres/mod.rs
@@ -19,6 +19,7 @@
 
 mod postgres_sink;
 mod postgres_source;
+mod restart;
 
 use crate::connectors::TestMessage;
 use serde::Deserialize;
diff --git a/core/integration/tests/connectors/postgres/restart.rs 
b/core/integration/tests/connectors/postgres/restart.rs
new file mode 100644
index 000000000..97d9f2422
--- /dev/null
+++ b/core/integration/tests/connectors/postgres/restart.rs
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use super::{POLL_ATTEMPTS, POLL_INTERVAL_MS, TEST_MESSAGE_COUNT};
+use crate::connectors::fixtures::{PostgresOps, PostgresSinkFixture};
+use crate::connectors::{TestMessage, create_test_messages};
+use bytes::Bytes;
+use iggy::prelude::{IggyMessage, Partitioning};
+use iggy_binary_protocol::MessageClient;
+use iggy_common::Identifier;
+use iggy_connector_sdk::api::{ConnectorStatus, SinkInfoResponse};
+use integration::harness::seeds;
+use integration::iggy_harness;
+use reqwest::Client;
+use std::time::Duration;
+use tokio::time::sleep;
+
+const API_KEY: &str = "test-api-key";
+const SINK_TABLE: &str = "iggy_messages";
+const SINK_KEY: &str = "postgres";
+
+type SinkRow = (i64, String, String, Vec<u8>);
+
+#[iggy_harness(
+    server(connectors_runtime(config_path = 
"tests/connectors/postgres/sink.toml")),
+    seed = seeds::connector_stream
+)]
+async fn restart_sink_connector_continues_processing(
+    harness: &TestHarness,
+    fixture: PostgresSinkFixture,
+) {
+    let client = harness.root_client().await.unwrap();
+    let api_url = harness
+        .connectors_runtime()
+        .expect("connector runtime should be available")
+        .http_url();
+    let http = Client::new();
+    let pool = fixture.create_pool().await.expect("Failed to create pool");
+
+    fixture.wait_for_table(&pool, SINK_TABLE).await;
+
+    let stream_id: Identifier = seeds::names::STREAM.try_into().unwrap();
+    let topic_id: Identifier = seeds::names::TOPIC.try_into().unwrap();
+
+    wait_for_sink_status(&http, &api_url, ConnectorStatus::Running).await;
+
+    let first_batch = create_test_messages(TEST_MESSAGE_COUNT);
+    let mut messages = build_messages(&first_batch, 0);
+    client
+        .send_messages(
+            &stream_id,
+            &topic_id,
+            &Partitioning::partition_id(0),
+            &mut messages,
+        )
+        .await
+        .expect("Failed to send first batch");
+
+    let query = format!(
+        "SELECT iggy_offset, iggy_stream, iggy_topic, payload FROM 
{SINK_TABLE} ORDER BY iggy_offset"
+    );
+    let rows: Vec<SinkRow> = fixture
+        .fetch_rows_as(&pool, &query, TEST_MESSAGE_COUNT)
+        .await
+        .expect("Failed to fetch first batch rows");
+
+    assert_eq!(
+        rows.len(),
+        TEST_MESSAGE_COUNT,
+        "Expected {TEST_MESSAGE_COUNT} rows before restart"
+    );
+
+    let resp = http
+        .post(format!("{api_url}/sinks/{SINK_KEY}/restart"))
+        .header("api-key", API_KEY)
+        .send()
+        .await
+        .expect("Failed to call restart endpoint");
+
+    assert_eq!(
+        resp.status().as_u16(),
+        204,
+        "Restart endpoint should return 204 No Content"
+    );
+
+    wait_for_sink_status(&http, &api_url, ConnectorStatus::Running).await;
+
+    let second_batch = create_test_messages(TEST_MESSAGE_COUNT);
+    let mut messages = build_messages(&second_batch, TEST_MESSAGE_COUNT);
+    client
+        .send_messages(
+            &stream_id,
+            &topic_id,
+            &Partitioning::partition_id(0),
+            &mut messages,
+        )
+        .await
+        .expect("Failed to send second batch");
+
+    let total_expected = TEST_MESSAGE_COUNT * 2;
+    let rows: Vec<SinkRow> = fixture
+        .fetch_rows_as(&pool, &query, total_expected)
+        .await
+        .expect("Failed to fetch rows after restart");
+
+    assert!(
+        rows.len() >= total_expected,
+        "Expected at least {total_expected} rows after restart, got {}",
+        rows.len()
+    );
+}
+
+#[iggy_harness(
+    server(connectors_runtime(config_path = 
"tests/connectors/postgres/sink.toml")),
+    seed = seeds::connector_stream
+)]
+async fn parallel_restart_requests_should_not_break_connector(
+    harness: &TestHarness,
+    fixture: PostgresSinkFixture,
+) {
+    let client = harness.root_client().await.unwrap();
+    let api_url = harness
+        .connectors_runtime()
+        .expect("connector runtime should be available")
+        .http_url();
+    let http = Client::new();
+    let pool = fixture.create_pool().await.expect("Failed to create pool");
+
+    fixture.wait_for_table(&pool, SINK_TABLE).await;
+
+    let stream_id: Identifier = seeds::names::STREAM.try_into().unwrap();
+    let topic_id: Identifier = seeds::names::TOPIC.try_into().unwrap();
+
+    wait_for_sink_status(&http, &api_url, ConnectorStatus::Running).await;
+
+    let mut tasks = Vec::new();
+    for _ in 0..5 {
+        let http = http.clone();
+        let url = format!("{api_url}/sinks/{SINK_KEY}/restart");
+        tasks.push(tokio::spawn(async move {
+            http.post(&url)
+                .header("api-key", API_KEY)
+                .send()
+                .await
+                .expect("Failed to call restart endpoint")
+        }));
+    }
+
+    let responses = futures::future::join_all(tasks).await;
+    for resp in responses {
+        let resp = resp.expect("Task panicked");
+        assert_eq!(
+            resp.status().as_u16(),
+            204,
+            "All restart requests should return 204"
+        );
+    }
+
+    wait_for_sink_status(&http, &api_url, ConnectorStatus::Running).await;
+
+    let batch = create_test_messages(TEST_MESSAGE_COUNT);
+    let mut messages = build_messages(&batch, 0);
+    client
+        .send_messages(
+            &stream_id,
+            &topic_id,
+            &Partitioning::partition_id(0),
+            &mut messages,
+        )
+        .await
+        .expect("Failed to send messages after parallel restarts");
+
+    let query = format!(
+        "SELECT iggy_offset, iggy_stream, iggy_topic, payload FROM 
{SINK_TABLE} ORDER BY iggy_offset"
+    );
+    let rows: Vec<SinkRow> = fixture
+        .fetch_rows_as(&pool, &query, TEST_MESSAGE_COUNT)
+        .await
+        .expect("Failed to fetch rows after parallel restarts");
+
+    assert!(
+        rows.len() >= TEST_MESSAGE_COUNT,
+        "Expected at least {TEST_MESSAGE_COUNT} rows after parallel restarts, 
got {}",
+        rows.len()
+    );
+}
+
+async fn wait_for_sink_status(
+    http: &Client,
+    api_url: &str,
+    expected: ConnectorStatus,
+) -> SinkInfoResponse {
+    for _ in 0..POLL_ATTEMPTS {
+        if let Ok(resp) = http
+            .get(format!("{api_url}/sinks/{SINK_KEY}"))
+            .header("api-key", API_KEY)
+            .send()
+            .await
+            && let Ok(info) = resp.json::<SinkInfoResponse>().await
+            && info.status == expected
+        {
+            return info;
+        }
+        sleep(Duration::from_millis(POLL_INTERVAL_MS)).await;
+    }
+    panic!("Sink connector did not reach {expected:?} status in time");
+}
+
+fn build_messages(messages_data: &[TestMessage], id_offset: usize) -> 
Vec<IggyMessage> {
+    messages_data
+        .iter()
+        .enumerate()
+        .map(|(i, msg)| {
+            let payload = serde_json::to_vec(msg).expect("Failed to serialize 
message");
+            IggyMessage::builder()
+                .id((id_offset + i + 1) as u128)
+                .payload(Bytes::from(payload))
+                .build()
+                .expect("Failed to build message")
+        })
+        .collect()
+}


Reply via email to