This is an automated email from the ASF dual-hosted git repository.

maciej pushed a commit to branch connectors-runtime-metrics
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit edc5f6b78849064c8e7177fac16130ece63678b0
Author: Maciej Modzelewski <[email protected]>
AuthorDate: Wed Jan 28 15:23:33 2026 +0100

    feat(connectors): add Prometheus metrics and stats endpoints
    
    - Add /metrics endpoint for Prometheus scraping (configurable)
    - Add /stats endpoint returning JSON runtime statistics
    - Track runtime gauges: sources/sinks total and running counts
    - Track per-connector counters: messages produced/sent (source),
      consumed/processed (sink), and errors
    - Add cpu/memory usage and uptime to stats response
---
 Cargo.lock                                    |   2 +
 Cargo.toml                                    |   1 +
 core/connectors/runtime/Cargo.toml            |   2 +
 core/connectors/runtime/README.md             |  25 ++
 core/connectors/runtime/config.toml           |   4 +
 core/connectors/runtime/runtime.http          |   6 +
 core/connectors/runtime/src/api/config.rs     |  72 +++-
 core/connectors/runtime/src/api/mod.rs        |  22 +-
 core/connectors/runtime/src/context.rs        |  17 +-
 core/connectors/runtime/src/main.rs           |   2 +
 core/connectors/runtime/src/manager/sink.rs   |  18 +-
 core/connectors/runtime/src/manager/source.rs |  18 +-
 core/connectors/runtime/src/metrics.rs        | 478 ++++++++++++++++++++++++++
 core/connectors/runtime/src/sink.rs           |  39 ++-
 core/connectors/runtime/src/source.rs         |  26 +-
 core/connectors/runtime/src/stats.rs          | 256 ++++++++++++++
 core/server/Cargo.toml                        |   2 +-
 17 files changed, 971 insertions(+), 19 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 5b992c303..8353cef63 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -4565,6 +4565,7 @@ dependencies = [
  "opentelemetry-semantic-conventions",
  "opentelemetry_sdk",
  "postcard",
+ "prometheus-client",
  "reqwest",
  "reqwest-middleware",
  "reqwest-retry",
@@ -4574,6 +4575,7 @@ dependencies = [
  "serde_with",
  "serde_yaml_ng",
  "strum",
+ "sysinfo 0.38.0",
  "tempfile",
  "thiserror 2.0.18",
  "tokio",
diff --git a/Cargo.toml b/Cargo.toml
index e2aba3ca2..e6797945c 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -179,6 +179,7 @@ passterm = "=2.0.1"
 postcard = { version = "1.1.3", features = ["alloc"] }
 predicates = "3.1.3"
 proc-macro2 = "1"
+prometheus-client = "0.24.0"
 quinn = "0.11.9"
 quote = "1"
 rand = "0.9.2"
diff --git a/core/connectors/runtime/Cargo.toml 
b/core/connectors/runtime/Cargo.toml
index 970c1a7e8..bdb8bd307 100644
--- a/core/connectors/runtime/Cargo.toml
+++ b/core/connectors/runtime/Cargo.toml
@@ -56,6 +56,7 @@ opentelemetry-otlp = { workspace = true }
 opentelemetry-semantic-conventions = { workspace = true }
 opentelemetry_sdk = { workspace = true, features = ["rt-tokio"] }
 postcard = { workspace = true }
+prometheus-client = { workspace = true }
 reqwest = { workspace = true }
 reqwest-middleware = { workspace = true }
 reqwest-retry = { workspace = true }
@@ -65,6 +66,7 @@ serde_json = { workspace = true }
 serde_with = { workspace = true }
 serde_yaml_ng = { workspace = true }
 strum = { workspace = true }
+sysinfo = { workspace = true }
 thiserror = { workspace = true }
 tokio = { workspace = true }
 toml = { workspace = true }
diff --git a/core/connectors/runtime/README.md 
b/core/connectors/runtime/README.md
index 8b0d2584c..6433fb822 100644
--- a/core/connectors/runtime/README.md
+++ b/core/connectors/runtime/README.md
@@ -134,6 +134,10 @@ exposed_headers = [""]
 allow_credentials = false
 allow_private_network = false
 
+[http.metrics] # Optional Prometheus metrics configuration
+enabled = false
+endpoint = "/metrics"
+
 [http.tls] # Optional TLS configuration for HTTP API
 enabled = false
 cert_file = "core/certs/iggy_cert.pem"
@@ -144,6 +148,8 @@ Currently, it does expose the following endpoints:
 
 - `GET /`: welcome message.
 - `GET /health`: health status of the runtime.
+- `GET /stats`: runtime statistics including process info, memory/CPU usage, 
and connector status.
+- `GET /metrics`: Prometheus-formatted metrics (when `http.metrics.enabled` is 
`true`).
 - `GET /sinks`: list of sinks.
 - `GET /sinks/{key}`: sink details.
 - `GET /sinks/{key}/configs`: list of configuration versions for the sink.
@@ -180,3 +186,22 @@ endpoint = "http://localhost:4317";
 transport = "grpc" # Options: "grpc", "http"
 endpoint = "http://localhost:4317";
 ```
+
+## Metrics
+
+The runtime exposes Prometheus-compatible metrics via the `/metrics` endpoint 
when enabled. The following metrics are available:
+
+### Runtime Gauges
+
+- `iggy_connectors_sources_total`: Total configured source connectors
+- `iggy_connectors_sources_running`: Sources currently in Running status
+- `iggy_connectors_sinks_total`: Total configured sink connectors
+- `iggy_connectors_sinks_running`: Sinks currently in Running status
+
+### Per-Connector Counters (labeled with `connector_key` and `connector_type`)
+
+- `iggy_connector_messages_produced_total`: Messages received from source 
plugin
+- `iggy_connector_messages_sent_total`: Messages sent to Iggy (source)
+- `iggy_connector_messages_consumed_total`: Messages consumed from Iggy (sink)
+- `iggy_connector_messages_processed_total`: Messages processed and sent to 
sink plugin
+- `iggy_connector_errors_total`: Errors encountered
diff --git a/core/connectors/runtime/config.toml 
b/core/connectors/runtime/config.toml
index abfa1ab50..247a67e6b 100644
--- a/core/connectors/runtime/config.toml
+++ b/core/connectors/runtime/config.toml
@@ -29,6 +29,10 @@ exposed_headers = [""]
 allow_credentials = false
 allow_private_network = false
 
+[http.metrics] # Optional Prometheus metrics configuration
+enabled = false
+endpoint = "/metrics"
+
 [http.tls] # Optional TLS configuration for HTTP API
 enabled = false
 cert_file = "core/certs/iggy_cert.pem"
diff --git a/core/connectors/runtime/runtime.http 
b/core/connectors/runtime/runtime.http
index 7c08bef4f..3205c7b8b 100644
--- a/core/connectors/runtime/runtime.http
+++ b/core/connectors/runtime/runtime.http
@@ -27,6 +27,12 @@ GET {{url}}
 ###
 GET {{url}}/health
 
+###
+GET {{url}}/stats
+
+###
+GET {{url}}/metrics
+
 ###
 GET {{url}}/sinks
 
diff --git a/core/connectors/runtime/src/api/config.rs 
b/core/connectors/runtime/src/api/config.rs
index 8efdb8ae5..596a48c60 100644
--- a/core/connectors/runtime/src/api/config.rs
+++ b/core/connectors/runtime/src/api/config.rs
@@ -39,6 +39,13 @@ pub struct HttpConfig {
     pub api_key: String,
     pub cors: HttpCorsConfig,
     pub tls: HttpTlsConfig,
+    pub metrics: HttpMetricsConfig,
+}
+
+#[derive(Debug, Clone, Deserialize, Serialize, ConfigEnv)]
+pub struct HttpMetricsConfig {
+    pub enabled: bool,
+    pub endpoint: String,
 }
 
 #[derive(Debug, Default, Deserialize, Serialize, Clone, ConfigEnv)]
@@ -135,6 +142,25 @@ pub fn configure_cors(config: &HttpCorsConfig) -> 
CorsLayer {
         .allow_private_network(config.allow_private_network)
 }
 
+impl Default for HttpMetricsConfig {
+    fn default() -> Self {
+        Self {
+            enabled: false,
+            endpoint: "/metrics".to_owned(),
+        }
+    }
+}
+
+impl std::fmt::Display for HttpMetricsConfig {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        write!(
+            f,
+            "{{ enabled: {}, endpoint: {} }}",
+            self.enabled, self.endpoint
+        )
+    }
+}
+
 impl Default for HttpConfig {
     fn default() -> Self {
         Self {
@@ -143,6 +169,7 @@ impl Default for HttpConfig {
             api_key: "".to_owned(),
             cors: HttpCorsConfig::default(),
             tls: HttpTlsConfig::default(),
+            metrics: HttpMetricsConfig::default(),
         }
     }
 }
@@ -151,8 +178,8 @@ impl std::fmt::Display for HttpConfig {
     fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
         write!(
             f,
-            "{{ address: {}, api_key: {}, cors: {}, tls: {} }}",
-            self.address, self.api_key, self.cors, self.tls
+            "{{ address: {}, api_key: {}, cors: {}, tls: {}, metrics: {} }}",
+            self.address, self.api_key, self.cors, self.tls, self.metrics
         )
     }
 }
@@ -182,3 +209,44 @@ impl std::fmt::Display for HttpCorsConfig {
         )
     }
 }
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn test_http_metrics_config_default() {
+        let config = HttpMetricsConfig::default();
+
+        assert!(!config.enabled);
+        assert_eq!(config.endpoint, "/metrics");
+    }
+
+    #[test]
+    fn test_http_metrics_config_display() {
+        let config = HttpMetricsConfig {
+            enabled: true,
+            endpoint: "/custom-metrics".to_owned(),
+        };
+
+        let display = format!("{}", config);
+        assert!(display.contains("enabled: true"));
+        assert!(display.contains("endpoint: /custom-metrics"));
+    }
+
+    #[test]
+    fn test_http_config_default_includes_metrics() {
+        let config = HttpConfig::default();
+
+        assert!(!config.metrics.enabled);
+        assert_eq!(config.metrics.endpoint, "/metrics");
+    }
+
+    #[test]
+    fn test_http_config_display_includes_metrics() {
+        let config = HttpConfig::default();
+
+        let display = format!("{}", config);
+        assert!(display.contains("metrics:"));
+    }
+}
diff --git a/core/connectors/runtime/src/api/mod.rs 
b/core/connectors/runtime/src/api/mod.rs
index 85b2d4400..d8dea6b82 100644
--- a/core/connectors/runtime/src/api/mod.rs
+++ b/core/connectors/runtime/src/api/mod.rs
@@ -18,8 +18,9 @@
  */
 
 use crate::context::RuntimeContext;
+use crate::stats;
 use auth::resolve_api_key;
-use axum::{Json, Router, middleware, routing::get};
+use axum::{Json, Router, extract::State, middleware, routing::get};
 use axum_server::tls_rustls::RustlsConfig;
 use config::{HttpConfig, configure_cors};
 use std::{net::SocketAddr, path::PathBuf, sync::Arc};
@@ -41,12 +42,21 @@ pub async fn init(config: &HttpConfig, context: 
Arc<RuntimeContext>) {
         return;
     }
 
+    let mut system_router = Router::new().route("/stats", get(get_stats));
+
+    if config.metrics.enabled {
+        system_router = system_router.route(&config.metrics.endpoint, 
get(get_metrics));
+    }
+
+    let system_router = system_router.with_state(context.clone());
+
     let mut app = Router::new()
         .route("/", get(|| async { "Connector Runtime API" }))
         .route(
             "/health",
             get(|| async { Json(serde_json::json!({ "status": "healthy" })) }),
         )
+        .merge(system_router)
         .merge(sink::router(context.clone()))
         .merge(source::router(context.clone()));
 
@@ -111,3 +121,13 @@ pub async fn init(config: &HttpConfig, context: 
Arc<RuntimeContext>) {
         }
     });
 }
+
+async fn get_metrics(State(context): State<Arc<RuntimeContext>>) -> String {
+    context.metrics.get_formatted_output()
+}
+
+async fn get_stats(
+    State(context): State<Arc<RuntimeContext>>,
+) -> Json<stats::ConnectorRuntimeStats> {
+    Json(stats::get_runtime_stats(&context).await)
+}
diff --git a/core/connectors/runtime/src/context.rs 
b/core/connectors/runtime/src/context.rs
index fe2a8e034..20d9906ca 100644
--- a/core/connectors/runtime/src/context.rs
+++ b/core/connectors/runtime/src/context.rs
@@ -19,6 +19,7 @@
 use crate::configs::connectors::{ConnectorsConfigProvider, SinkConfig, 
SourceConfig};
 use crate::configs::runtime::ConnectorsRuntimeConfig;
 use crate::manager::status::ConnectorError;
+use crate::metrics::Metrics;
 use crate::{
     SinkConnectorWrapper, SourceConnectorWrapper,
     manager::{
@@ -27,6 +28,7 @@ use crate::{
         status::ConnectorStatus,
     },
 };
+use iggy_common::IggyTimestamp;
 use std::collections::HashMap;
 use std::sync::Arc;
 use tracing::error;
@@ -36,6 +38,8 @@ pub struct RuntimeContext {
     pub sources: SourceManager,
     pub api_key: String,
     pub config_provider: Arc<dyn ConnectorsConfigProvider>,
+    pub metrics: Arc<Metrics>,
+    pub start_time: IggyTimestamp,
 }
 
 pub fn init(
@@ -46,11 +50,20 @@ pub fn init(
     source_wrappers: &[SourceConnectorWrapper],
     config_provider: Box<dyn ConnectorsConfigProvider>,
 ) -> RuntimeContext {
+    let metrics = Arc::new(Metrics::init());
+    let sinks = SinkManager::new(map_sinks(sinks_config, sink_wrappers));
+    let sources = SourceManager::new(map_sources(sources_config, 
source_wrappers));
+
+    metrics.set_sinks_total(sinks_config.len() as u32);
+    metrics.set_sources_total(sources_config.len() as u32);
+
     RuntimeContext {
-        sinks: SinkManager::new(map_sinks(sinks_config, sink_wrappers)),
-        sources: SourceManager::new(map_sources(sources_config, 
source_wrappers)),
+        sinks,
+        sources,
         api_key: config.http.api_key.to_owned(),
         config_provider: Arc::from(config_provider),
+        metrics,
+        start_time: IggyTimestamp::now(),
     }
 }
 
diff --git a/core/connectors/runtime/src/main.rs 
b/core/connectors/runtime/src/main.rs
index 657ed646a..b71bd5eef 100644
--- a/core/connectors/runtime/src/main.rs
+++ b/core/connectors/runtime/src/main.rs
@@ -48,9 +48,11 @@ pub(crate) mod context;
 pub(crate) mod error;
 mod log;
 mod manager;
+pub(crate) mod metrics;
 mod sink;
 mod source;
 mod state;
+pub(crate) mod stats;
 mod stream;
 mod transform;
 
diff --git a/core/connectors/runtime/src/manager/sink.rs 
b/core/connectors/runtime/src/manager/sink.rs
index 93ad280bd..495693716 100644
--- a/core/connectors/runtime/src/manager/sink.rs
+++ b/core/connectors/runtime/src/manager/sink.rs
@@ -18,6 +18,7 @@
  */
 use super::status::{ConnectorError, ConnectorStatus};
 use crate::configs::connectors::{ConfigFormat, SinkConfig};
+use crate::metrics::Metrics;
 use dashmap::DashMap;
 use std::collections::HashMap;
 use std::sync::Arc;
@@ -63,13 +64,28 @@ impl SinkManager {
         results
     }
 
-    pub async fn update_status(&self, key: &str, status: ConnectorStatus) {
+    pub async fn update_status(
+        &self,
+        key: &str,
+        status: ConnectorStatus,
+        metrics: Option<&Arc<Metrics>>,
+    ) {
         if let Some(sink) = self.sinks.get(key) {
             let mut sink = sink.lock().await;
+            let old_status = sink.info.status;
             sink.info.status = status;
             if matches!(status, ConnectorStatus::Running | 
ConnectorStatus::Stopped) {
                 sink.info.last_error = None;
             }
+            if let Some(metrics) = metrics {
+                if old_status != ConnectorStatus::Running && status == 
ConnectorStatus::Running {
+                    metrics.increment_sinks_running();
+                } else if old_status == ConnectorStatus::Running
+                    && status != ConnectorStatus::Running
+                {
+                    metrics.decrement_sinks_running();
+                }
+            }
         }
     }
 
diff --git a/core/connectors/runtime/src/manager/source.rs 
b/core/connectors/runtime/src/manager/source.rs
index 801a567e7..051668640 100644
--- a/core/connectors/runtime/src/manager/source.rs
+++ b/core/connectors/runtime/src/manager/source.rs
@@ -18,6 +18,7 @@
  */
 use super::status::{ConnectorError, ConnectorStatus};
 use crate::configs::connectors::{ConfigFormat, SourceConfig};
+use crate::metrics::Metrics;
 use dashmap::DashMap;
 use std::collections::HashMap;
 use std::sync::Arc;
@@ -63,13 +64,28 @@ impl SourceManager {
         results
     }
 
-    pub async fn update_status(&self, key: &str, status: ConnectorStatus) {
+    pub async fn update_status(
+        &self,
+        key: &str,
+        status: ConnectorStatus,
+        metrics: Option<&Arc<Metrics>>,
+    ) {
         if let Some(source) = self.sources.get(key) {
             let mut source = source.lock().await;
+            let old_status = source.info.status;
             source.info.status = status;
             if matches!(status, ConnectorStatus::Running | 
ConnectorStatus::Stopped) {
                 source.info.last_error = None;
             }
+            if let Some(metrics) = metrics {
+                if old_status != ConnectorStatus::Running && status == 
ConnectorStatus::Running {
+                    metrics.increment_sources_running();
+                } else if old_status == ConnectorStatus::Running
+                    && status != ConnectorStatus::Running
+                {
+                    metrics.decrement_sources_running();
+                }
+            }
         }
     }
 
diff --git a/core/connectors/runtime/src/metrics.rs 
b/core/connectors/runtime/src/metrics.rs
new file mode 100644
index 000000000..8d9c29cdb
--- /dev/null
+++ b/core/connectors/runtime/src/metrics.rs
@@ -0,0 +1,478 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use prometheus_client::encoding::text::encode;
+use prometheus_client::encoding::{EncodeLabelSet, EncodeLabelValue};
+use prometheus_client::metrics::counter::Counter;
+use prometheus_client::metrics::family::Family;
+use prometheus_client::metrics::gauge::Gauge;
+use prometheus_client::registry::Registry;
+use std::sync::Arc;
+use tracing::error;
+
+#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)]
+pub struct ConnectorLabels {
+    pub connector_key: String,
+    pub connector_type: ConnectorType,
+}
+
+#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelValue)]
+pub enum ConnectorType {
+    Source,
+    Sink,
+}
+
+#[derive(Debug, Clone)]
+pub struct Metrics {
+    registry: Arc<Registry>,
+    sources_total: Gauge,
+    sources_running: Gauge,
+    sinks_total: Gauge,
+    sinks_running: Gauge,
+    messages_produced: Family<ConnectorLabels, Counter>,
+    messages_sent: Family<ConnectorLabels, Counter>,
+    messages_consumed: Family<ConnectorLabels, Counter>,
+    messages_processed: Family<ConnectorLabels, Counter>,
+    errors: Family<ConnectorLabels, Counter>,
+}
+
+impl Metrics {
+    pub fn init() -> Self {
+        let mut registry = Registry::default();
+
+        let sources_total = Gauge::default();
+        let sources_running = Gauge::default();
+        let sinks_total = Gauge::default();
+        let sinks_running = Gauge::default();
+        let messages_produced = Family::<ConnectorLabels, Counter>::default();
+        let messages_sent = Family::<ConnectorLabels, Counter>::default();
+        let messages_consumed = Family::<ConnectorLabels, Counter>::default();
+        let messages_processed = Family::<ConnectorLabels, Counter>::default();
+        let errors = Family::<ConnectorLabels, Counter>::default();
+
+        registry.register(
+            "iggy_connectors_sources_total",
+            "Total configured source connectors",
+            sources_total.clone(),
+        );
+        registry.register(
+            "iggy_connectors_sources_running",
+            "Sources in Running status",
+            sources_running.clone(),
+        );
+        registry.register(
+            "iggy_connectors_sinks_total",
+            "Total configured sink connectors",
+            sinks_total.clone(),
+        );
+        registry.register(
+            "iggy_connectors_sinks_running",
+            "Sinks in Running status",
+            sinks_running.clone(),
+        );
+        registry.register(
+            "iggy_connector_messages_produced_total",
+            "Messages received from source plugin poll",
+            messages_produced.clone(),
+        );
+        registry.register(
+            "iggy_connector_messages_sent_total",
+            "Messages sent to Iggy (source)",
+            messages_sent.clone(),
+        );
+        registry.register(
+            "iggy_connector_messages_consumed_total",
+            "Messages consumed from Iggy (sink)",
+            messages_consumed.clone(),
+        );
+        registry.register(
+            "iggy_connector_messages_processed_total",
+            "Messages processed and sent to sink plugin",
+            messages_processed.clone(),
+        );
+        registry.register(
+            "iggy_connector_errors_total",
+            "Errors encountered",
+            errors.clone(),
+        );
+
+        Self {
+            registry: Arc::new(registry),
+            sources_total,
+            sources_running,
+            sinks_total,
+            sinks_running,
+            messages_produced,
+            messages_sent,
+            messages_consumed,
+            messages_processed,
+            errors,
+        }
+    }
+
+    pub fn get_formatted_output(&self) -> String {
+        let mut buffer = String::new();
+        if let Err(err) = encode(&mut buffer, &self.registry) {
+            error!("Failed to encode metrics: {}", err);
+        }
+        buffer
+    }
+
+    pub fn set_sources_total(&self, count: u32) {
+        self.sources_total.set(count as i64);
+    }
+
+    pub fn set_sinks_total(&self, count: u32) {
+        self.sinks_total.set(count as i64);
+    }
+
+    pub fn get_sources_total(&self) -> u32 {
+        self.sources_total.get() as u32
+    }
+
+    pub fn get_sinks_total(&self) -> u32 {
+        self.sinks_total.get() as u32
+    }
+
+    pub fn get_sources_running(&self) -> u32 {
+        self.sources_running.get() as u32
+    }
+
+    pub fn get_sinks_running(&self) -> u32 {
+        self.sinks_running.get() as u32
+    }
+
+    pub fn increment_sources_running(&self) {
+        self.sources_running.inc();
+    }
+
+    pub fn decrement_sources_running(&self) {
+        self.sources_running.dec();
+    }
+
+    pub fn increment_sinks_running(&self) {
+        self.sinks_running.inc();
+    }
+
+    pub fn decrement_sinks_running(&self) {
+        self.sinks_running.dec();
+    }
+
+    pub fn increment_messages_produced(&self, key: &str, count: u64) {
+        self.messages_produced
+            .get_or_create(&ConnectorLabels {
+                connector_key: key.to_owned(),
+                connector_type: ConnectorType::Source,
+            })
+            .inc_by(count);
+    }
+
+    pub fn increment_messages_sent(&self, key: &str, count: u64) {
+        self.messages_sent
+            .get_or_create(&ConnectorLabels {
+                connector_key: key.to_owned(),
+                connector_type: ConnectorType::Source,
+            })
+            .inc_by(count);
+    }
+
+    pub fn increment_messages_consumed(&self, key: &str, count: u64) {
+        self.messages_consumed
+            .get_or_create(&ConnectorLabels {
+                connector_key: key.to_owned(),
+                connector_type: ConnectorType::Sink,
+            })
+            .inc_by(count);
+    }
+
+    pub fn increment_messages_processed(&self, key: &str, count: u64) {
+        self.messages_processed
+            .get_or_create(&ConnectorLabels {
+                connector_key: key.to_owned(),
+                connector_type: ConnectorType::Sink,
+            })
+            .inc_by(count);
+    }
+
+    pub fn increment_errors(&self, key: &str, connector_type: ConnectorType) {
+        self.errors
+            .get_or_create(&ConnectorLabels {
+                connector_key: key.to_owned(),
+                connector_type,
+            })
+            .inc();
+    }
+
+    pub fn get_messages_produced(&self, key: &str) -> u64 {
+        self.messages_produced
+            .get_or_create(&ConnectorLabels {
+                connector_key: key.to_owned(),
+                connector_type: ConnectorType::Source,
+            })
+            .get()
+    }
+
+    pub fn get_messages_sent(&self, key: &str) -> u64 {
+        self.messages_sent
+            .get_or_create(&ConnectorLabels {
+                connector_key: key.to_owned(),
+                connector_type: ConnectorType::Source,
+            })
+            .get()
+    }
+
+    pub fn get_messages_consumed(&self, key: &str) -> u64 {
+        self.messages_consumed
+            .get_or_create(&ConnectorLabels {
+                connector_key: key.to_owned(),
+                connector_type: ConnectorType::Sink,
+            })
+            .get()
+    }
+
+    pub fn get_messages_processed(&self, key: &str) -> u64 {
+        self.messages_processed
+            .get_or_create(&ConnectorLabels {
+                connector_key: key.to_owned(),
+                connector_type: ConnectorType::Sink,
+            })
+            .get()
+    }
+
+    pub fn get_errors(&self, key: &str, connector_type: ConnectorType) -> u64 {
+        self.errors
+            .get_or_create(&ConnectorLabels {
+                connector_key: key.to_owned(),
+                connector_type,
+            })
+            .get()
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn test_metrics_init() {
+        let metrics = Metrics::init();
+        let output = metrics.get_formatted_output();
+
+        assert!(output.contains("iggy_connectors_sources_total"));
+        assert!(output.contains("iggy_connectors_sources_running"));
+        assert!(output.contains("iggy_connectors_sinks_total"));
+        assert!(output.contains("iggy_connectors_sinks_running"));
+    }
+
+    #[test]
+    fn test_set_sources_total() {
+        let metrics = Metrics::init();
+        metrics.set_sources_total(5);
+
+        let output = metrics.get_formatted_output();
+        assert!(output.contains("iggy_connectors_sources_total 5"));
+    }
+
+    #[test]
+    fn test_set_sinks_total() {
+        let metrics = Metrics::init();
+        metrics.set_sinks_total(3);
+
+        let output = metrics.get_formatted_output();
+        assert!(output.contains("iggy_connectors_sinks_total 3"));
+    }
+
+    #[test]
+    fn test_increment_sources_running() {
+        let metrics = Metrics::init();
+        metrics.increment_sources_running();
+        metrics.increment_sources_running();
+
+        let output = metrics.get_formatted_output();
+        assert!(output.contains("iggy_connectors_sources_running 2"));
+    }
+
+    #[test]
+    fn test_decrement_sources_running() {
+        let metrics = Metrics::init();
+        metrics.increment_sources_running();
+        metrics.increment_sources_running();
+        metrics.decrement_sources_running();
+
+        let output = metrics.get_formatted_output();
+        assert!(output.contains("iggy_connectors_sources_running 1"));
+    }
+
+    #[test]
+    fn test_increment_sinks_running() {
+        let metrics = Metrics::init();
+        metrics.increment_sinks_running();
+
+        let output = metrics.get_formatted_output();
+        assert!(output.contains("iggy_connectors_sinks_running 1"));
+    }
+
+    #[test]
+    fn test_decrement_sinks_running() {
+        let metrics = Metrics::init();
+        metrics.increment_sinks_running();
+        metrics.increment_sinks_running();
+        metrics.decrement_sinks_running();
+
+        let output = metrics.get_formatted_output();
+        assert!(output.contains("iggy_connectors_sinks_running 1"));
+    }
+
+    #[test]
+    fn test_increment_messages_produced() {
+        let metrics = Metrics::init();
+        metrics.increment_messages_produced("test-source", 10);
+
+        let output = metrics.get_formatted_output();
+        assert!(output.contains("iggy_connector_messages_produced_total"));
+        assert!(output.contains("connector_key=\"test-source\""));
+        assert!(output.contains("connector_type=\"Source\""));
+    }
+
+    #[test]
+    fn test_increment_messages_sent() {
+        let metrics = Metrics::init();
+        metrics.increment_messages_sent("test-source", 5);
+
+        let output = metrics.get_formatted_output();
+        assert!(output.contains("iggy_connector_messages_sent_total"));
+        assert!(output.contains("connector_key=\"test-source\""));
+    }
+
+    #[test]
+    fn test_increment_messages_consumed() {
+        let metrics = Metrics::init();
+        metrics.increment_messages_consumed("test-sink", 20);
+
+        let output = metrics.get_formatted_output();
+        assert!(output.contains("iggy_connector_messages_consumed_total"));
+        assert!(output.contains("connector_key=\"test-sink\""));
+        assert!(output.contains("connector_type=\"Sink\""));
+    }
+
+    #[test]
+    fn test_increment_messages_processed() {
+        let metrics = Metrics::init();
+        metrics.increment_messages_processed("test-sink", 15);
+
+        let output = metrics.get_formatted_output();
+        assert!(output.contains("iggy_connector_messages_processed_total"));
+        assert!(output.contains("connector_key=\"test-sink\""));
+    }
+
+    #[test]
+    fn test_increment_errors_source() {
+        let metrics = Metrics::init();
+        metrics.increment_errors("test-source", ConnectorType::Source);
+
+        let output = metrics.get_formatted_output();
+        assert!(output.contains("iggy_connector_errors_total"));
+        assert!(output.contains("connector_key=\"test-source\""));
+        assert!(output.contains("connector_type=\"Source\""));
+    }
+
+    #[test]
+    fn test_increment_errors_sink() {
+        let metrics = Metrics::init();
+        metrics.increment_errors("test-sink", ConnectorType::Sink);
+
+        let output = metrics.get_formatted_output();
+        assert!(output.contains("iggy_connector_errors_total"));
+        assert!(output.contains("connector_key=\"test-sink\""));
+        assert!(output.contains("connector_type=\"Sink\""));
+    }
+
+    #[test]
+    fn test_multiple_connectors() {
+        let metrics = Metrics::init();
+        metrics.increment_messages_produced("source-1", 10);
+        metrics.increment_messages_produced("source-2", 20);
+        metrics.increment_messages_consumed("sink-1", 15);
+
+        let output = metrics.get_formatted_output();
+        assert!(output.contains("connector_key=\"source-1\""));
+        assert!(output.contains("connector_key=\"source-2\""));
+        assert!(output.contains("connector_key=\"sink-1\""));
+    }
+
+    #[test]
+    fn test_cumulative_counts() {
+        let metrics = Metrics::init();
+        metrics.increment_messages_produced("test-source", 10);
+        metrics.increment_messages_produced("test-source", 5);
+
+        let output = metrics.get_formatted_output();
+        // Verify cumulative behavior - the same key should accumulate
+        assert!(output.contains("iggy_connector_messages_produced_total"));
+        assert!(output.contains("connector_key=\"test-source\""));
+        // Check the value is 15 (10 + 5)
+        assert!(output.contains("} 15"));
+    }
+
+    #[test]
+    fn test_get_messages_produced() {
+        let metrics = Metrics::init();
+        metrics.increment_messages_produced("test-source", 10);
+        metrics.increment_messages_produced("test-source", 5);
+
+        assert_eq!(metrics.get_messages_produced("test-source"), 15);
+        assert_eq!(metrics.get_messages_produced("nonexistent"), 0);
+    }
+
+    #[test]
+    fn test_get_messages_sent() {
+        let metrics = Metrics::init();
+        metrics.increment_messages_sent("test-source", 20);
+
+        assert_eq!(metrics.get_messages_sent("test-source"), 20);
+    }
+
+    #[test]
+    fn test_get_messages_consumed() {
+        let metrics = Metrics::init();
+        metrics.increment_messages_consumed("test-sink", 30);
+
+        assert_eq!(metrics.get_messages_consumed("test-sink"), 30);
+    }
+
+    #[test]
+    fn test_get_messages_processed() {
+        let metrics = Metrics::init();
+        metrics.increment_messages_processed("test-sink", 25);
+
+        assert_eq!(metrics.get_messages_processed("test-sink"), 25);
+    }
+
+    #[test]
+    fn test_get_errors() {
+        let metrics = Metrics::init();
+        metrics.increment_errors("test-source", ConnectorType::Source);
+        metrics.increment_errors("test-source", ConnectorType::Source);
+        metrics.increment_errors("test-sink", ConnectorType::Sink);
+
+        assert_eq!(metrics.get_errors("test-source", ConnectorType::Source), 
2);
+        assert_eq!(metrics.get_errors("test-sink", ConnectorType::Sink), 1);
+        assert_eq!(metrics.get_errors("nonexistent", ConnectorType::Source), 
0);
+    }
+}
diff --git a/core/connectors/runtime/src/sink.rs 
b/core/connectors/runtime/src/sink.rs
index 40b41d886..0987eb7e5 100644
--- a/core/connectors/runtime/src/sink.rs
+++ b/core/connectors/runtime/src/sink.rs
@@ -21,6 +21,7 @@ use crate::configs::connectors::SinkConfig;
 use crate::context::RuntimeContext;
 use crate::log::LOG_CALLBACK;
 use crate::manager::status::ConnectorStatus;
+use crate::metrics::{ConnectorType, Metrics};
 use crate::{
     PLUGIN_ID, RuntimeError, SinkApi, SinkConnector, SinkConnectorConsumer, 
SinkConnectorPlugin,
     SinkConnectorWrapper, resolve_plugin_path, transform,
@@ -196,7 +197,11 @@ pub fn consume(sinks: Vec<SinkConnectorWrapper>, context: 
Arc<RuntimeContext>) {
                 tokio::spawn(async move {
                     context
                         .sinks
-                        .update_status(&plugin_key, ConnectorStatus::Running)
+                        .update_status(
+                            &plugin_key,
+                            ConnectorStatus::Running,
+                            Some(&context.metrics),
+                        )
                         .await;
 
                     if let Err(error) = consume_messages(
@@ -207,6 +212,8 @@ pub fn consume(sinks: Vec<SinkConnectorWrapper>, context: 
Arc<RuntimeContext>) {
                         consumer.transforms,
                         consumer.consumer,
                         plugin.verbose,
+                        &plugin_key,
+                        &context.metrics,
                     )
                     .await
                     {
@@ -215,6 +222,9 @@ pub fn consume(sinks: Vec<SinkConnectorWrapper>, context: 
Arc<RuntimeContext>) {
                             plugin.id
                         );
                         error!(err);
+                        context
+                            .metrics
+                            .increment_errors(&plugin_key, 
ConnectorType::Sink);
                         context.sinks.set_error(&plugin_key, &err).await;
                         return;
                     }
@@ -228,6 +238,7 @@ pub fn consume(sinks: Vec<SinkConnectorWrapper>, context: 
Arc<RuntimeContext>) {
     }
 }
 
+#[allow(clippy::too_many_arguments)]
 async fn consume_messages(
     plugin_id: u32,
     decoder: Arc<dyn StreamDecoder>,
@@ -236,6 +247,8 @@ async fn consume_messages(
     transforms: Vec<Arc<dyn Transform>>,
     mut consumer: IggyConsumer,
     verbose: bool,
+    plugin_key: &str,
+    metrics: &Arc<Metrics>,
 ) -> Result<(), RuntimeError> {
     info!("Started consuming messages for sink connector with ID: 
{plugin_id}");
     let batch_size = batch_size as usize;
@@ -261,6 +274,7 @@ async fn consume_messages(
 
         let messages = std::mem::take(&mut batch);
         let messages_count = messages.len();
+        metrics.increment_messages_consumed(plugin_key, messages_count as u64);
         let messages_metadata = MessagesMetadata {
             partition_id,
             current_offset,
@@ -278,7 +292,7 @@ async fn consume_messages(
             );
         }
         let start = Instant::now();
-        if let Err(error) = process_messages(
+        let processed_count = match process_messages(
             plugin_id,
             messages_metadata,
             &topic_metadata,
@@ -289,12 +303,17 @@ async fn consume_messages(
         )
         .await
         {
-            error!(
-                "Failed to process {messages_count} messages for sink 
connector with ID: {plugin_id}. {error}",
-            );
-            return Err(error);
-        }
+            Ok(count) => count,
+            Err(error) => {
+                error!(
+                    "Failed to process {messages_count} messages for sink 
connector with ID: {plugin_id}. {error}",
+                );
+                metrics.increment_errors(plugin_key, ConnectorType::Sink);
+                return Err(error);
+            }
+        };
 
+        metrics.increment_messages_processed(plugin_key, processed_count as 
u64);
         let elapsed = start.elapsed();
         if verbose {
             info!(
@@ -341,7 +360,7 @@ async fn process_messages(
     consume: &ConsumeCallback,
     transforms: &Vec<Arc<dyn Transform>>,
     decoder: &Arc<dyn StreamDecoder>,
-) -> Result<(), RuntimeError> {
+) -> Result<usize, RuntimeError> {
     let messages = messages.into_iter().map(|message| ReceivedMessage {
         id: message.header.id,
         offset: message.header.offset,
@@ -453,6 +472,8 @@ async fn process_messages(
         });
     }
 
+    let processed_count = messages.len();
+
     let topic_meta = postcard::to_allocvec(topic_metadata).map_err(|error| {
         error!(
             "Failed to serialize topic metadata for sink connector with ID: 
{plugin_id}. {error}"
@@ -486,5 +507,5 @@ async fn process_messages(
         messages.len(),
     );
 
-    Ok(())
+    Ok(processed_count)
 }
diff --git a/core/connectors/runtime/src/source.rs 
b/core/connectors/runtime/src/source.rs
index fba517989..c4f155b5a 100644
--- a/core/connectors/runtime/src/source.rs
+++ b/core/connectors/runtime/src/source.rs
@@ -39,6 +39,7 @@ use crate::configs::connectors::SourceConfig;
 use crate::context::RuntimeContext;
 use crate::log::LOG_CALLBACK;
 use crate::manager::status::ConnectorStatus;
+use crate::metrics::ConnectorType;
 use crate::{
     PLUGIN_ID, RuntimeError, SourceApi, SourceConnector, SourceConnectorPlugin,
     SourceConnectorProducer, SourceConnectorWrapper, resolve_plugin_path,
@@ -256,7 +257,11 @@ pub fn handle(sources: Vec<SourceConnectorWrapper>, 
context: Arc<RuntimeContext>
 
                 context
                     .sources
-                    .update_status(&plugin_key, ConnectorStatus::Running)
+                    .update_status(
+                        &plugin_key,
+                        ConnectorStatus::Running,
+                        Some(&context.metrics),
+                    )
                     .await;
                 let encoder = producer.encoder.clone();
                 let producer = &producer.producer;
@@ -269,6 +274,9 @@ pub fn handle(sources: Vec<SourceConnectorWrapper>, 
context: Arc<RuntimeContext>
 
                 while let Ok(produced_messages) = receiver.recv_async().await {
                     let count = produced_messages.messages.len();
+                    context
+                        .metrics
+                        .increment_messages_produced(&plugin_key, count as 
u64);
                     if plugin.verbose {
                         info!("Source connector with ID: {plugin_id} received 
{count} messages",);
                     } else {
@@ -313,6 +321,9 @@ pub fn handle(sources: Vec<SourceConnectorWrapper>, 
context: Arc<RuntimeContext>
                             producer.topic()
                         );
                         error!(err);
+                        context
+                            .metrics
+                            .increment_errors(&plugin_key, 
ConnectorType::Source);
                         context.sources.set_error(&plugin_key, &err).await;
                         continue;
                     };
@@ -324,10 +335,17 @@ pub fn handle(sources: Vec<SourceConnectorWrapper>, 
context: Arc<RuntimeContext>
                             producer.topic(),
                         );
                         error!(err);
+                        context
+                            .metrics
+                            .increment_errors(&plugin_key, 
ConnectorType::Source);
                         context.sources.set_error(&plugin_key, &err).await;
                         continue;
                     }
 
+                    context
+                        .metrics
+                        .increment_messages_sent(&plugin_key, count as u64);
+
                     if plugin.verbose {
                         info!(
                             "Sent {count} messages to stream: {}, topic: {} by 
source connector with ID: {plugin_id}",
@@ -365,7 +383,11 @@ pub fn handle(sources: Vec<SourceConnectorWrapper>, 
context: Arc<RuntimeContext>
                 info!("Source connector with ID: {plugin_id} stopped.");
                 context
                     .sources
-                    .update_status(&plugin_key, ConnectorStatus::Stopped)
+                    .update_status(
+                        &plugin_key,
+                        ConnectorStatus::Stopped,
+                        Some(&context.metrics),
+                    )
                     .await;
             });
         }
diff --git a/core/connectors/runtime/src/stats.rs 
b/core/connectors/runtime/src/stats.rs
new file mode 100644
index 000000000..3effe2ca6
--- /dev/null
+++ b/core/connectors/runtime/src/stats.rs
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::context::RuntimeContext;
+use crate::metrics::ConnectorType;
+use iggy_common::IggyTimestamp;
+use serde::Serialize;
+use std::sync::Arc;
+use sysinfo::System;
+
+#[derive(Debug, Serialize)]
+pub struct ConnectorRuntimeStats {
+    pub process_id: u32,
+    pub cpu_usage: f32,
+    pub memory_usage: u64,
+    pub run_time: u64,
+    pub start_time: u64,
+    pub sources_total: u32,
+    pub sources_running: u32,
+    pub sinks_total: u32,
+    pub sinks_running: u32,
+    pub connectors: Vec<ConnectorStats>,
+}
+
+#[derive(Debug, Serialize)]
+pub struct ConnectorStats {
+    pub key: String,
+    pub name: String,
+    pub connector_type: String,
+    pub status: String,
+    pub enabled: bool,
+    #[serde(skip_serializing_if = "Option::is_none")]
+    pub messages_produced: Option<u64>,
+    #[serde(skip_serializing_if = "Option::is_none")]
+    pub messages_sent: Option<u64>,
+    #[serde(skip_serializing_if = "Option::is_none")]
+    pub messages_consumed: Option<u64>,
+    #[serde(skip_serializing_if = "Option::is_none")]
+    pub messages_processed: Option<u64>,
+    pub errors: u64,
+}
+
+pub async fn get_runtime_stats(context: &Arc<RuntimeContext>) -> 
ConnectorRuntimeStats {
+    let pid = std::process::id();
+
+    let mut system = System::new();
+    system.refresh_processes(
+        sysinfo::ProcessesToUpdate::Some(&[sysinfo::Pid::from_u32(pid)]),
+        true,
+    );
+
+    let (cpu_usage, memory_usage) = system
+        .process(sysinfo::Pid::from_u32(pid))
+        .map(|p| (p.cpu_usage(), p.memory()))
+        .unwrap_or((0.0, 0));
+
+    let sources = context.sources.get_all().await;
+    let sinks = context.sinks.get_all().await;
+
+    let sources_total = context.metrics.get_sources_total();
+    let sinks_total = context.metrics.get_sinks_total();
+    let sources_running = context.metrics.get_sources_running();
+    let sinks_running = context.metrics.get_sinks_running();
+
+    let mut connectors = Vec::with_capacity(sources.len() + sinks.len());
+    for source in &sources {
+        connectors.push(ConnectorStats {
+            key: source.key.clone(),
+            name: source.name.clone(),
+            connector_type: "source".to_owned(),
+            status: source.status.to_string(),
+            enabled: source.enabled,
+            messages_produced: 
Some(context.metrics.get_messages_produced(&source.key)),
+            messages_sent: 
Some(context.metrics.get_messages_sent(&source.key)),
+            messages_consumed: None,
+            messages_processed: None,
+            errors: context
+                .metrics
+                .get_errors(&source.key, ConnectorType::Source),
+        });
+    }
+    for sink in &sinks {
+        connectors.push(ConnectorStats {
+            key: sink.key.clone(),
+            name: sink.name.clone(),
+            connector_type: "sink".to_owned(),
+            status: sink.status.to_string(),
+            enabled: sink.enabled,
+            messages_produced: None,
+            messages_sent: None,
+            messages_consumed: 
Some(context.metrics.get_messages_consumed(&sink.key)),
+            messages_processed: 
Some(context.metrics.get_messages_processed(&sink.key)),
+            errors: context.metrics.get_errors(&sink.key, ConnectorType::Sink),
+        });
+    }
+
+    let now = IggyTimestamp::now().as_micros();
+    let start = context.start_time.as_micros();
+    let run_time = now.saturating_sub(start);
+
+    ConnectorRuntimeStats {
+        process_id: pid,
+        cpu_usage,
+        memory_usage,
+        run_time,
+        start_time: start,
+        sources_total,
+        sources_running,
+        sinks_total,
+        sinks_running,
+        connectors,
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn test_connector_stats_source_serialization() {
+        let stats = ConnectorStats {
+            key: "test-source".to_owned(),
+            name: "Test Source".to_owned(),
+            connector_type: "source".to_owned(),
+            status: "running".to_owned(),
+            enabled: true,
+            messages_produced: Some(100),
+            messages_sent: Some(95),
+            messages_consumed: None,
+            messages_processed: None,
+            errors: 5,
+        };
+
+        let json = serde_json::to_string(&stats).unwrap();
+        assert!(json.contains("\"key\":\"test-source\""));
+        assert!(json.contains("\"connector_type\":\"source\""));
+        assert!(json.contains("\"messages_produced\":100"));
+        assert!(json.contains("\"messages_sent\":95"));
+        // None fields should not be serialized
+        assert!(!json.contains("messages_consumed"));
+        assert!(!json.contains("messages_processed"));
+        assert!(json.contains("\"errors\":5"));
+    }
+
+    #[test]
+    fn test_connector_stats_sink_serialization() {
+        let stats = ConnectorStats {
+            key: "test-sink".to_owned(),
+            name: "Test Sink".to_owned(),
+            connector_type: "sink".to_owned(),
+            status: "running".to_owned(),
+            enabled: true,
+            messages_produced: None,
+            messages_sent: None,
+            messages_consumed: Some(200),
+            messages_processed: Some(195),
+            errors: 3,
+        };
+
+        let json = serde_json::to_string(&stats).unwrap();
+        assert!(json.contains("\"key\":\"test-sink\""));
+        assert!(json.contains("\"connector_type\":\"sink\""));
+        // None fields should not be serialized
+        assert!(!json.contains("messages_produced"));
+        assert!(!json.contains("messages_sent"));
+        assert!(json.contains("\"messages_consumed\":200"));
+        assert!(json.contains("\"messages_processed\":195"));
+        assert!(json.contains("\"errors\":3"));
+    }
+
+    #[test]
+    fn test_connector_runtime_stats_serialization() {
+        let stats = ConnectorRuntimeStats {
+            process_id: 12345,
+            cpu_usage: 25.5,
+            memory_usage: 1024000,
+            run_time: 60000000,
+            start_time: 1700000000000000,
+            sources_total: 2,
+            sources_running: 1,
+            sinks_total: 3,
+            sinks_running: 2,
+            connectors: vec![
+                ConnectorStats {
+                    key: "source-1".to_owned(),
+                    name: "Source One".to_owned(),
+                    connector_type: "source".to_owned(),
+                    status: "running".to_owned(),
+                    enabled: true,
+                    messages_produced: Some(100),
+                    messages_sent: Some(100),
+                    messages_consumed: None,
+                    messages_processed: None,
+                    errors: 0,
+                },
+                ConnectorStats {
+                    key: "sink-1".to_owned(),
+                    name: "Sink One".to_owned(),
+                    connector_type: "sink".to_owned(),
+                    status: "stopped".to_owned(),
+                    enabled: false,
+                    messages_produced: None,
+                    messages_sent: None,
+                    messages_consumed: Some(50),
+                    messages_processed: Some(50),
+                    errors: 0,
+                },
+            ],
+        };
+
+        let json = serde_json::to_string(&stats).unwrap();
+        assert!(json.contains("\"process_id\":12345"));
+        assert!(json.contains("\"sources_total\":2"));
+        assert!(json.contains("\"sources_running\":1"));
+        assert!(json.contains("\"sinks_total\":3"));
+        assert!(json.contains("\"sinks_running\":2"));
+        assert!(json.contains("\"connectors\":["));
+        assert!(json.contains("\"messages_produced\":100"));
+        assert!(json.contains("\"messages_consumed\":50"));
+    }
+
+    #[test]
+    fn test_connector_runtime_stats_empty_connectors() {
+        let stats = ConnectorRuntimeStats {
+            process_id: 1,
+            cpu_usage: 0.0,
+            memory_usage: 0,
+            run_time: 0,
+            start_time: 0,
+            sources_total: 0,
+            sources_running: 0,
+            sinks_total: 0,
+            sinks_running: 0,
+            connectors: vec![],
+        };
+
+        let json = serde_json::to_string(&stats).unwrap();
+        assert!(json.contains("\"connectors\":[]"));
+    }
+}
diff --git a/core/server/Cargo.toml b/core/server/Cargo.toml
index d311bc870..8adfda896 100644
--- a/core/server/Cargo.toml
+++ b/core/server/Cargo.toml
@@ -84,7 +84,7 @@ opentelemetry-otlp = { workspace = true }
 opentelemetry-semantic-conventions = { workspace = true }
 opentelemetry_sdk = { workspace = true }
 papaya = "0.2.3"
-prometheus-client = "0.24.0"
+prometheus-client = { workspace = true }
 rand = { workspace = true }
 reqwest = { workspace = true, features = ["rustls-tls-no-provider"] }
 ringbuffer = "0.16.0"

Reply via email to