seokjin0414 commented on code in PR #2781:
URL: https://github.com/apache/iggy/pull/2781#discussion_r2896532894


##########
core/connectors/runtime/src/manager/sink.rs:
##########
@@ -96,6 +107,154 @@ impl SinkManager {
             sink.info.last_error = Some(ConnectorError::new(error_message));
         }
     }
+
+    pub async fn stop_connector(
+        &self,
+        key: &str,
+        metrics: &Arc<Metrics>,
+    ) -> Result<(), RuntimeError> {
+        let details_arc = self
+            .sinks
+            .get(key)
+            .map(|e| e.value().clone())
+            .ok_or_else(|| RuntimeError::SinkNotFound(key.to_string()))?;
+
+        let (shutdown_tx, task_handles, plugin_id, container) = {
+            let mut details = details_arc.lock().await;
+            (
+                details.shutdown_tx.take(),
+                std::mem::take(&mut details.task_handles),
+                details.info.id,
+                details.container.clone(),
+            )
+        };
+
+        if let Some(tx) = shutdown_tx {
+            let _ = tx.send(());
+        }
+
+        for handle in task_handles {
+            let _ = tokio::time::timeout(Duration::from_secs(5), handle).await;
+        }
+
+        if let Some(container) = &container {
+            info!("Closing sink connector with ID: {plugin_id} for plugin: 
{key}");
+            (container.iggy_sink_close)(plugin_id);
+            info!("Closed sink connector with ID: {plugin_id} for plugin: 
{key}");
+        }
+
+        {
+            let mut details = details_arc.lock().await;
+            let old_status = details.info.status;
+            details.info.status = ConnectorStatus::Stopped;
+            details.info.last_error = None;
+            if old_status == ConnectorStatus::Running {
+                metrics.decrement_sinks_running();
+            }
+        }
+
+        Ok(())
+    }
+
+    pub async fn start_connector(
+        &self,
+        key: &str,
+        config: &SinkConfig,
+        iggy_client: &IggyClient,
+        metrics: &Arc<Metrics>,
+    ) -> Result<(), RuntimeError> {
+        let details_arc = self
+            .sinks
+            .get(key)
+            .map(|e| e.value().clone())
+            .ok_or_else(|| RuntimeError::SinkNotFound(key.to_string()))?;
+
+        let container = {
+            let details = details_arc.lock().await;
+            details.container.clone().ok_or_else(|| {
+                RuntimeError::InvalidConfiguration(format!("No container 
loaded for sink: {key}"))
+            })?
+        };
+
+        let plugin_id = PLUGIN_ID.fetch_add(1, Ordering::SeqCst);
+
+        sink::init_sink(
+            &container,
+            &config.plugin_config.clone().unwrap_or_default(),
+            plugin_id,
+        )?;
+        info!("Sink connector with ID: {plugin_id} for plugin: {key} 
initialized successfully.");
+
+        let consumers = sink::setup_sink_consumers(key, config, 
iggy_client).await?;
+
+        let (shutdown_tx, shutdown_rx) = watch::channel(());
+        let callback = container.iggy_sink_consume;
+        let verbose = config.verbose;
+        let mut task_handles = Vec::new();
+
+        for (consumer, decoder, batch_size, transforms) in consumers {
+            let plugin_key = key.to_string();
+            let metrics_clone = metrics.clone();
+            let shutdown_rx = shutdown_rx.clone();
+
+            let handle = tokio::spawn(async move {
+                if let Err(error) = sink::consume_messages(
+                    plugin_id,
+                    decoder,
+                    batch_size,
+                    callback,
+                    transforms,
+                    consumer,
+                    verbose,
+                    &plugin_key,
+                    &metrics_clone,
+                    shutdown_rx,
+                )
+                .await
+                {
+                    error!(
+                        "Failed to consume messages for sink connector with 
ID: {plugin_id}: {error}"
+                    );
+                }
+            });
+            task_handles.push(handle);
+        }
+
+        {
+            let mut details = details_arc.lock().await;
+            details.info.id = plugin_id;
+            details.info.status = ConnectorStatus::Running;
+            details.info.last_error = None;
+            details.config = config.clone();
+            details.shutdown_tx = Some(shutdown_tx);
+            details.task_handles = task_handles;
+            metrics.increment_sinks_running();
+        }
+
+        Ok(())
+    }
+
+    pub async fn restart_connector(

Review Comment:
   Changed to try_lock() — if restart is already in progress returns OK 
immediately instead of queuing
   Applied to restart_connector() only; stop_connector_with_guard() keeps 
lock().await since shutdown must wait for completion
   (e06aeef6)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to