This is an automated email from the ASF dual-hosted git repository. maciej pushed a commit to branch connectors-runtime-metrics in repository https://gitbox.apache.org/repos/asf/iggy.git
commit edc5f6b78849064c8e7177fac16130ece63678b0 Author: Maciej Modzelewski <[email protected]> AuthorDate: Wed Jan 28 15:23:33 2026 +0100 feat(connectors): add Prometheus metrics and stats endpoints - Add /metrics endpoint for Prometheus scraping (configurable) - Add /stats endpoint returning JSON runtime statistics - Track runtime gauges: sources/sinks total and running counts - Track per-connector counters: messages produced/sent (source), consumed/processed (sink), and errors - Add cpu/memory usage and uptime to stats response --- Cargo.lock | 2 + Cargo.toml | 1 + core/connectors/runtime/Cargo.toml | 2 + core/connectors/runtime/README.md | 25 ++ core/connectors/runtime/config.toml | 4 + core/connectors/runtime/runtime.http | 6 + core/connectors/runtime/src/api/config.rs | 72 +++- core/connectors/runtime/src/api/mod.rs | 22 +- core/connectors/runtime/src/context.rs | 17 +- core/connectors/runtime/src/main.rs | 2 + core/connectors/runtime/src/manager/sink.rs | 18 +- core/connectors/runtime/src/manager/source.rs | 18 +- core/connectors/runtime/src/metrics.rs | 478 ++++++++++++++++++++++++++ core/connectors/runtime/src/sink.rs | 39 ++- core/connectors/runtime/src/source.rs | 26 +- core/connectors/runtime/src/stats.rs | 256 ++++++++++++++ core/server/Cargo.toml | 2 +- 17 files changed, 971 insertions(+), 19 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5b992c303..8353cef63 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4565,6 +4565,7 @@ dependencies = [ "opentelemetry-semantic-conventions", "opentelemetry_sdk", "postcard", + "prometheus-client", "reqwest", "reqwest-middleware", "reqwest-retry", @@ -4574,6 +4575,7 @@ dependencies = [ "serde_with", "serde_yaml_ng", "strum", + "sysinfo 0.38.0", "tempfile", "thiserror 2.0.18", "tokio", diff --git a/Cargo.toml b/Cargo.toml index e2aba3ca2..e6797945c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -179,6 +179,7 @@ passterm = "=2.0.1" postcard = { version = "1.1.3", features = ["alloc"] } predicates = "3.1.3" proc-macro2 = "1" +prometheus-client = "0.24.0" quinn = "0.11.9" quote = "1" rand = "0.9.2" diff --git a/core/connectors/runtime/Cargo.toml b/core/connectors/runtime/Cargo.toml index 970c1a7e8..bdb8bd307 100644 --- a/core/connectors/runtime/Cargo.toml +++ b/core/connectors/runtime/Cargo.toml @@ -56,6 +56,7 @@ opentelemetry-otlp = { workspace = true } opentelemetry-semantic-conventions = { workspace = true } opentelemetry_sdk = { workspace = true, features = ["rt-tokio"] } postcard = { workspace = true } +prometheus-client = { workspace = true } reqwest = { workspace = true } reqwest-middleware = { workspace = true } reqwest-retry = { workspace = true } @@ -65,6 +66,7 @@ serde_json = { workspace = true } serde_with = { workspace = true } serde_yaml_ng = { workspace = true } strum = { workspace = true } +sysinfo = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true } toml = { workspace = true } diff --git a/core/connectors/runtime/README.md b/core/connectors/runtime/README.md index 8b0d2584c..6433fb822 100644 --- a/core/connectors/runtime/README.md +++ b/core/connectors/runtime/README.md @@ -134,6 +134,10 @@ exposed_headers = [""] allow_credentials = false allow_private_network = false +[http.metrics] # Optional Prometheus metrics configuration +enabled = false +endpoint = "/metrics" + [http.tls] # Optional TLS configuration for HTTP API enabled = false cert_file = "core/certs/iggy_cert.pem" @@ -144,6 +148,8 @@ Currently, it does expose the following endpoints: - `GET /`: welcome message. - `GET /health`: health status of the runtime. +- `GET /stats`: runtime statistics including process info, memory/CPU usage, and connector status. +- `GET /metrics`: Prometheus-formatted metrics (when `http.metrics.enabled` is `true`). - `GET /sinks`: list of sinks. - `GET /sinks/{key}`: sink details. - `GET /sinks/{key}/configs`: list of configuration versions for the sink. @@ -180,3 +186,22 @@ endpoint = "http://localhost:4317" transport = "grpc" # Options: "grpc", "http" endpoint = "http://localhost:4317" ``` + +## Metrics + +The runtime exposes Prometheus-compatible metrics via the `/metrics` endpoint when enabled. The following metrics are available: + +### Runtime Gauges + +- `iggy_connectors_sources_total`: Total configured source connectors +- `iggy_connectors_sources_running`: Sources currently in Running status +- `iggy_connectors_sinks_total`: Total configured sink connectors +- `iggy_connectors_sinks_running`: Sinks currently in Running status + +### Per-Connector Counters (labeled with `connector_key` and `connector_type`) + +- `iggy_connector_messages_produced_total`: Messages received from source plugin +- `iggy_connector_messages_sent_total`: Messages sent to Iggy (source) +- `iggy_connector_messages_consumed_total`: Messages consumed from Iggy (sink) +- `iggy_connector_messages_processed_total`: Messages processed and sent to sink plugin +- `iggy_connector_errors_total`: Errors encountered diff --git a/core/connectors/runtime/config.toml b/core/connectors/runtime/config.toml index abfa1ab50..247a67e6b 100644 --- a/core/connectors/runtime/config.toml +++ b/core/connectors/runtime/config.toml @@ -29,6 +29,10 @@ exposed_headers = [""] allow_credentials = false allow_private_network = false +[http.metrics] # Optional Prometheus metrics configuration +enabled = false +endpoint = "/metrics" + [http.tls] # Optional TLS configuration for HTTP API enabled = false cert_file = "core/certs/iggy_cert.pem" diff --git a/core/connectors/runtime/runtime.http b/core/connectors/runtime/runtime.http index 7c08bef4f..3205c7b8b 100644 --- a/core/connectors/runtime/runtime.http +++ b/core/connectors/runtime/runtime.http @@ -27,6 +27,12 @@ GET {{url}} ### GET {{url}}/health +### +GET {{url}}/stats + +### +GET {{url}}/metrics + ### GET {{url}}/sinks diff --git a/core/connectors/runtime/src/api/config.rs b/core/connectors/runtime/src/api/config.rs index 8efdb8ae5..596a48c60 100644 --- a/core/connectors/runtime/src/api/config.rs +++ b/core/connectors/runtime/src/api/config.rs @@ -39,6 +39,13 @@ pub struct HttpConfig { pub api_key: String, pub cors: HttpCorsConfig, pub tls: HttpTlsConfig, + pub metrics: HttpMetricsConfig, +} + +#[derive(Debug, Clone, Deserialize, Serialize, ConfigEnv)] +pub struct HttpMetricsConfig { + pub enabled: bool, + pub endpoint: String, } #[derive(Debug, Default, Deserialize, Serialize, Clone, ConfigEnv)] @@ -135,6 +142,25 @@ pub fn configure_cors(config: &HttpCorsConfig) -> CorsLayer { .allow_private_network(config.allow_private_network) } +impl Default for HttpMetricsConfig { + fn default() -> Self { + Self { + enabled: false, + endpoint: "/metrics".to_owned(), + } + } +} + +impl std::fmt::Display for HttpMetricsConfig { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!( + f, + "{{ enabled: {}, endpoint: {} }}", + self.enabled, self.endpoint + ) + } +} + impl Default for HttpConfig { fn default() -> Self { Self { @@ -143,6 +169,7 @@ impl Default for HttpConfig { api_key: "".to_owned(), cors: HttpCorsConfig::default(), tls: HttpTlsConfig::default(), + metrics: HttpMetricsConfig::default(), } } } @@ -151,8 +178,8 @@ impl std::fmt::Display for HttpConfig { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!( f, - "{{ address: {}, api_key: {}, cors: {}, tls: {} }}", - self.address, self.api_key, self.cors, self.tls + "{{ address: {}, api_key: {}, cors: {}, tls: {}, metrics: {} }}", + self.address, self.api_key, self.cors, self.tls, self.metrics ) } } @@ -182,3 +209,44 @@ impl std::fmt::Display for HttpCorsConfig { ) } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_http_metrics_config_default() { + let config = HttpMetricsConfig::default(); + + assert!(!config.enabled); + assert_eq!(config.endpoint, "/metrics"); + } + + #[test] + fn test_http_metrics_config_display() { + let config = HttpMetricsConfig { + enabled: true, + endpoint: "/custom-metrics".to_owned(), + }; + + let display = format!("{}", config); + assert!(display.contains("enabled: true")); + assert!(display.contains("endpoint: /custom-metrics")); + } + + #[test] + fn test_http_config_default_includes_metrics() { + let config = HttpConfig::default(); + + assert!(!config.metrics.enabled); + assert_eq!(config.metrics.endpoint, "/metrics"); + } + + #[test] + fn test_http_config_display_includes_metrics() { + let config = HttpConfig::default(); + + let display = format!("{}", config); + assert!(display.contains("metrics:")); + } +} diff --git a/core/connectors/runtime/src/api/mod.rs b/core/connectors/runtime/src/api/mod.rs index 85b2d4400..d8dea6b82 100644 --- a/core/connectors/runtime/src/api/mod.rs +++ b/core/connectors/runtime/src/api/mod.rs @@ -18,8 +18,9 @@ */ use crate::context::RuntimeContext; +use crate::stats; use auth::resolve_api_key; -use axum::{Json, Router, middleware, routing::get}; +use axum::{Json, Router, extract::State, middleware, routing::get}; use axum_server::tls_rustls::RustlsConfig; use config::{HttpConfig, configure_cors}; use std::{net::SocketAddr, path::PathBuf, sync::Arc}; @@ -41,12 +42,21 @@ pub async fn init(config: &HttpConfig, context: Arc<RuntimeContext>) { return; } + let mut system_router = Router::new().route("/stats", get(get_stats)); + + if config.metrics.enabled { + system_router = system_router.route(&config.metrics.endpoint, get(get_metrics)); + } + + let system_router = system_router.with_state(context.clone()); + let mut app = Router::new() .route("/", get(|| async { "Connector Runtime API" })) .route( "/health", get(|| async { Json(serde_json::json!({ "status": "healthy" })) }), ) + .merge(system_router) .merge(sink::router(context.clone())) .merge(source::router(context.clone())); @@ -111,3 +121,13 @@ pub async fn init(config: &HttpConfig, context: Arc<RuntimeContext>) { } }); } + +async fn get_metrics(State(context): State<Arc<RuntimeContext>>) -> String { + context.metrics.get_formatted_output() +} + +async fn get_stats( + State(context): State<Arc<RuntimeContext>>, +) -> Json<stats::ConnectorRuntimeStats> { + Json(stats::get_runtime_stats(&context).await) +} diff --git a/core/connectors/runtime/src/context.rs b/core/connectors/runtime/src/context.rs index fe2a8e034..20d9906ca 100644 --- a/core/connectors/runtime/src/context.rs +++ b/core/connectors/runtime/src/context.rs @@ -19,6 +19,7 @@ use crate::configs::connectors::{ConnectorsConfigProvider, SinkConfig, SourceConfig}; use crate::configs::runtime::ConnectorsRuntimeConfig; use crate::manager::status::ConnectorError; +use crate::metrics::Metrics; use crate::{ SinkConnectorWrapper, SourceConnectorWrapper, manager::{ @@ -27,6 +28,7 @@ use crate::{ status::ConnectorStatus, }, }; +use iggy_common::IggyTimestamp; use std::collections::HashMap; use std::sync::Arc; use tracing::error; @@ -36,6 +38,8 @@ pub struct RuntimeContext { pub sources: SourceManager, pub api_key: String, pub config_provider: Arc<dyn ConnectorsConfigProvider>, + pub metrics: Arc<Metrics>, + pub start_time: IggyTimestamp, } pub fn init( @@ -46,11 +50,20 @@ pub fn init( source_wrappers: &[SourceConnectorWrapper], config_provider: Box<dyn ConnectorsConfigProvider>, ) -> RuntimeContext { + let metrics = Arc::new(Metrics::init()); + let sinks = SinkManager::new(map_sinks(sinks_config, sink_wrappers)); + let sources = SourceManager::new(map_sources(sources_config, source_wrappers)); + + metrics.set_sinks_total(sinks_config.len() as u32); + metrics.set_sources_total(sources_config.len() as u32); + RuntimeContext { - sinks: SinkManager::new(map_sinks(sinks_config, sink_wrappers)), - sources: SourceManager::new(map_sources(sources_config, source_wrappers)), + sinks, + sources, api_key: config.http.api_key.to_owned(), config_provider: Arc::from(config_provider), + metrics, + start_time: IggyTimestamp::now(), } } diff --git a/core/connectors/runtime/src/main.rs b/core/connectors/runtime/src/main.rs index 657ed646a..b71bd5eef 100644 --- a/core/connectors/runtime/src/main.rs +++ b/core/connectors/runtime/src/main.rs @@ -48,9 +48,11 @@ pub(crate) mod context; pub(crate) mod error; mod log; mod manager; +pub(crate) mod metrics; mod sink; mod source; mod state; +pub(crate) mod stats; mod stream; mod transform; diff --git a/core/connectors/runtime/src/manager/sink.rs b/core/connectors/runtime/src/manager/sink.rs index 93ad280bd..495693716 100644 --- a/core/connectors/runtime/src/manager/sink.rs +++ b/core/connectors/runtime/src/manager/sink.rs @@ -18,6 +18,7 @@ */ use super::status::{ConnectorError, ConnectorStatus}; use crate::configs::connectors::{ConfigFormat, SinkConfig}; +use crate::metrics::Metrics; use dashmap::DashMap; use std::collections::HashMap; use std::sync::Arc; @@ -63,13 +64,28 @@ impl SinkManager { results } - pub async fn update_status(&self, key: &str, status: ConnectorStatus) { + pub async fn update_status( + &self, + key: &str, + status: ConnectorStatus, + metrics: Option<&Arc<Metrics>>, + ) { if let Some(sink) = self.sinks.get(key) { let mut sink = sink.lock().await; + let old_status = sink.info.status; sink.info.status = status; if matches!(status, ConnectorStatus::Running | ConnectorStatus::Stopped) { sink.info.last_error = None; } + if let Some(metrics) = metrics { + if old_status != ConnectorStatus::Running && status == ConnectorStatus::Running { + metrics.increment_sinks_running(); + } else if old_status == ConnectorStatus::Running + && status != ConnectorStatus::Running + { + metrics.decrement_sinks_running(); + } + } } } diff --git a/core/connectors/runtime/src/manager/source.rs b/core/connectors/runtime/src/manager/source.rs index 801a567e7..051668640 100644 --- a/core/connectors/runtime/src/manager/source.rs +++ b/core/connectors/runtime/src/manager/source.rs @@ -18,6 +18,7 @@ */ use super::status::{ConnectorError, ConnectorStatus}; use crate::configs::connectors::{ConfigFormat, SourceConfig}; +use crate::metrics::Metrics; use dashmap::DashMap; use std::collections::HashMap; use std::sync::Arc; @@ -63,13 +64,28 @@ impl SourceManager { results } - pub async fn update_status(&self, key: &str, status: ConnectorStatus) { + pub async fn update_status( + &self, + key: &str, + status: ConnectorStatus, + metrics: Option<&Arc<Metrics>>, + ) { if let Some(source) = self.sources.get(key) { let mut source = source.lock().await; + let old_status = source.info.status; source.info.status = status; if matches!(status, ConnectorStatus::Running | ConnectorStatus::Stopped) { source.info.last_error = None; } + if let Some(metrics) = metrics { + if old_status != ConnectorStatus::Running && status == ConnectorStatus::Running { + metrics.increment_sources_running(); + } else if old_status == ConnectorStatus::Running + && status != ConnectorStatus::Running + { + metrics.decrement_sources_running(); + } + } } } diff --git a/core/connectors/runtime/src/metrics.rs b/core/connectors/runtime/src/metrics.rs new file mode 100644 index 000000000..8d9c29cdb --- /dev/null +++ b/core/connectors/runtime/src/metrics.rs @@ -0,0 +1,478 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use prometheus_client::encoding::text::encode; +use prometheus_client::encoding::{EncodeLabelSet, EncodeLabelValue}; +use prometheus_client::metrics::counter::Counter; +use prometheus_client::metrics::family::Family; +use prometheus_client::metrics::gauge::Gauge; +use prometheus_client::registry::Registry; +use std::sync::Arc; +use tracing::error; + +#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)] +pub struct ConnectorLabels { + pub connector_key: String, + pub connector_type: ConnectorType, +} + +#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelValue)] +pub enum ConnectorType { + Source, + Sink, +} + +#[derive(Debug, Clone)] +pub struct Metrics { + registry: Arc<Registry>, + sources_total: Gauge, + sources_running: Gauge, + sinks_total: Gauge, + sinks_running: Gauge, + messages_produced: Family<ConnectorLabels, Counter>, + messages_sent: Family<ConnectorLabels, Counter>, + messages_consumed: Family<ConnectorLabels, Counter>, + messages_processed: Family<ConnectorLabels, Counter>, + errors: Family<ConnectorLabels, Counter>, +} + +impl Metrics { + pub fn init() -> Self { + let mut registry = Registry::default(); + + let sources_total = Gauge::default(); + let sources_running = Gauge::default(); + let sinks_total = Gauge::default(); + let sinks_running = Gauge::default(); + let messages_produced = Family::<ConnectorLabels, Counter>::default(); + let messages_sent = Family::<ConnectorLabels, Counter>::default(); + let messages_consumed = Family::<ConnectorLabels, Counter>::default(); + let messages_processed = Family::<ConnectorLabels, Counter>::default(); + let errors = Family::<ConnectorLabels, Counter>::default(); + + registry.register( + "iggy_connectors_sources_total", + "Total configured source connectors", + sources_total.clone(), + ); + registry.register( + "iggy_connectors_sources_running", + "Sources in Running status", + sources_running.clone(), + ); + registry.register( + "iggy_connectors_sinks_total", + "Total configured sink connectors", + sinks_total.clone(), + ); + registry.register( + "iggy_connectors_sinks_running", + "Sinks in Running status", + sinks_running.clone(), + ); + registry.register( + "iggy_connector_messages_produced_total", + "Messages received from source plugin poll", + messages_produced.clone(), + ); + registry.register( + "iggy_connector_messages_sent_total", + "Messages sent to Iggy (source)", + messages_sent.clone(), + ); + registry.register( + "iggy_connector_messages_consumed_total", + "Messages consumed from Iggy (sink)", + messages_consumed.clone(), + ); + registry.register( + "iggy_connector_messages_processed_total", + "Messages processed and sent to sink plugin", + messages_processed.clone(), + ); + registry.register( + "iggy_connector_errors_total", + "Errors encountered", + errors.clone(), + ); + + Self { + registry: Arc::new(registry), + sources_total, + sources_running, + sinks_total, + sinks_running, + messages_produced, + messages_sent, + messages_consumed, + messages_processed, + errors, + } + } + + pub fn get_formatted_output(&self) -> String { + let mut buffer = String::new(); + if let Err(err) = encode(&mut buffer, &self.registry) { + error!("Failed to encode metrics: {}", err); + } + buffer + } + + pub fn set_sources_total(&self, count: u32) { + self.sources_total.set(count as i64); + } + + pub fn set_sinks_total(&self, count: u32) { + self.sinks_total.set(count as i64); + } + + pub fn get_sources_total(&self) -> u32 { + self.sources_total.get() as u32 + } + + pub fn get_sinks_total(&self) -> u32 { + self.sinks_total.get() as u32 + } + + pub fn get_sources_running(&self) -> u32 { + self.sources_running.get() as u32 + } + + pub fn get_sinks_running(&self) -> u32 { + self.sinks_running.get() as u32 + } + + pub fn increment_sources_running(&self) { + self.sources_running.inc(); + } + + pub fn decrement_sources_running(&self) { + self.sources_running.dec(); + } + + pub fn increment_sinks_running(&self) { + self.sinks_running.inc(); + } + + pub fn decrement_sinks_running(&self) { + self.sinks_running.dec(); + } + + pub fn increment_messages_produced(&self, key: &str, count: u64) { + self.messages_produced + .get_or_create(&ConnectorLabels { + connector_key: key.to_owned(), + connector_type: ConnectorType::Source, + }) + .inc_by(count); + } + + pub fn increment_messages_sent(&self, key: &str, count: u64) { + self.messages_sent + .get_or_create(&ConnectorLabels { + connector_key: key.to_owned(), + connector_type: ConnectorType::Source, + }) + .inc_by(count); + } + + pub fn increment_messages_consumed(&self, key: &str, count: u64) { + self.messages_consumed + .get_or_create(&ConnectorLabels { + connector_key: key.to_owned(), + connector_type: ConnectorType::Sink, + }) + .inc_by(count); + } + + pub fn increment_messages_processed(&self, key: &str, count: u64) { + self.messages_processed + .get_or_create(&ConnectorLabels { + connector_key: key.to_owned(), + connector_type: ConnectorType::Sink, + }) + .inc_by(count); + } + + pub fn increment_errors(&self, key: &str, connector_type: ConnectorType) { + self.errors + .get_or_create(&ConnectorLabels { + connector_key: key.to_owned(), + connector_type, + }) + .inc(); + } + + pub fn get_messages_produced(&self, key: &str) -> u64 { + self.messages_produced + .get_or_create(&ConnectorLabels { + connector_key: key.to_owned(), + connector_type: ConnectorType::Source, + }) + .get() + } + + pub fn get_messages_sent(&self, key: &str) -> u64 { + self.messages_sent + .get_or_create(&ConnectorLabels { + connector_key: key.to_owned(), + connector_type: ConnectorType::Source, + }) + .get() + } + + pub fn get_messages_consumed(&self, key: &str) -> u64 { + self.messages_consumed + .get_or_create(&ConnectorLabels { + connector_key: key.to_owned(), + connector_type: ConnectorType::Sink, + }) + .get() + } + + pub fn get_messages_processed(&self, key: &str) -> u64 { + self.messages_processed + .get_or_create(&ConnectorLabels { + connector_key: key.to_owned(), + connector_type: ConnectorType::Sink, + }) + .get() + } + + pub fn get_errors(&self, key: &str, connector_type: ConnectorType) -> u64 { + self.errors + .get_or_create(&ConnectorLabels { + connector_key: key.to_owned(), + connector_type, + }) + .get() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_metrics_init() { + let metrics = Metrics::init(); + let output = metrics.get_formatted_output(); + + assert!(output.contains("iggy_connectors_sources_total")); + assert!(output.contains("iggy_connectors_sources_running")); + assert!(output.contains("iggy_connectors_sinks_total")); + assert!(output.contains("iggy_connectors_sinks_running")); + } + + #[test] + fn test_set_sources_total() { + let metrics = Metrics::init(); + metrics.set_sources_total(5); + + let output = metrics.get_formatted_output(); + assert!(output.contains("iggy_connectors_sources_total 5")); + } + + #[test] + fn test_set_sinks_total() { + let metrics = Metrics::init(); + metrics.set_sinks_total(3); + + let output = metrics.get_formatted_output(); + assert!(output.contains("iggy_connectors_sinks_total 3")); + } + + #[test] + fn test_increment_sources_running() { + let metrics = Metrics::init(); + metrics.increment_sources_running(); + metrics.increment_sources_running(); + + let output = metrics.get_formatted_output(); + assert!(output.contains("iggy_connectors_sources_running 2")); + } + + #[test] + fn test_decrement_sources_running() { + let metrics = Metrics::init(); + metrics.increment_sources_running(); + metrics.increment_sources_running(); + metrics.decrement_sources_running(); + + let output = metrics.get_formatted_output(); + assert!(output.contains("iggy_connectors_sources_running 1")); + } + + #[test] + fn test_increment_sinks_running() { + let metrics = Metrics::init(); + metrics.increment_sinks_running(); + + let output = metrics.get_formatted_output(); + assert!(output.contains("iggy_connectors_sinks_running 1")); + } + + #[test] + fn test_decrement_sinks_running() { + let metrics = Metrics::init(); + metrics.increment_sinks_running(); + metrics.increment_sinks_running(); + metrics.decrement_sinks_running(); + + let output = metrics.get_formatted_output(); + assert!(output.contains("iggy_connectors_sinks_running 1")); + } + + #[test] + fn test_increment_messages_produced() { + let metrics = Metrics::init(); + metrics.increment_messages_produced("test-source", 10); + + let output = metrics.get_formatted_output(); + assert!(output.contains("iggy_connector_messages_produced_total")); + assert!(output.contains("connector_key=\"test-source\"")); + assert!(output.contains("connector_type=\"Source\"")); + } + + #[test] + fn test_increment_messages_sent() { + let metrics = Metrics::init(); + metrics.increment_messages_sent("test-source", 5); + + let output = metrics.get_formatted_output(); + assert!(output.contains("iggy_connector_messages_sent_total")); + assert!(output.contains("connector_key=\"test-source\"")); + } + + #[test] + fn test_increment_messages_consumed() { + let metrics = Metrics::init(); + metrics.increment_messages_consumed("test-sink", 20); + + let output = metrics.get_formatted_output(); + assert!(output.contains("iggy_connector_messages_consumed_total")); + assert!(output.contains("connector_key=\"test-sink\"")); + assert!(output.contains("connector_type=\"Sink\"")); + } + + #[test] + fn test_increment_messages_processed() { + let metrics = Metrics::init(); + metrics.increment_messages_processed("test-sink", 15); + + let output = metrics.get_formatted_output(); + assert!(output.contains("iggy_connector_messages_processed_total")); + assert!(output.contains("connector_key=\"test-sink\"")); + } + + #[test] + fn test_increment_errors_source() { + let metrics = Metrics::init(); + metrics.increment_errors("test-source", ConnectorType::Source); + + let output = metrics.get_formatted_output(); + assert!(output.contains("iggy_connector_errors_total")); + assert!(output.contains("connector_key=\"test-source\"")); + assert!(output.contains("connector_type=\"Source\"")); + } + + #[test] + fn test_increment_errors_sink() { + let metrics = Metrics::init(); + metrics.increment_errors("test-sink", ConnectorType::Sink); + + let output = metrics.get_formatted_output(); + assert!(output.contains("iggy_connector_errors_total")); + assert!(output.contains("connector_key=\"test-sink\"")); + assert!(output.contains("connector_type=\"Sink\"")); + } + + #[test] + fn test_multiple_connectors() { + let metrics = Metrics::init(); + metrics.increment_messages_produced("source-1", 10); + metrics.increment_messages_produced("source-2", 20); + metrics.increment_messages_consumed("sink-1", 15); + + let output = metrics.get_formatted_output(); + assert!(output.contains("connector_key=\"source-1\"")); + assert!(output.contains("connector_key=\"source-2\"")); + assert!(output.contains("connector_key=\"sink-1\"")); + } + + #[test] + fn test_cumulative_counts() { + let metrics = Metrics::init(); + metrics.increment_messages_produced("test-source", 10); + metrics.increment_messages_produced("test-source", 5); + + let output = metrics.get_formatted_output(); + // Verify cumulative behavior - the same key should accumulate + assert!(output.contains("iggy_connector_messages_produced_total")); + assert!(output.contains("connector_key=\"test-source\"")); + // Check the value is 15 (10 + 5) + assert!(output.contains("} 15")); + } + + #[test] + fn test_get_messages_produced() { + let metrics = Metrics::init(); + metrics.increment_messages_produced("test-source", 10); + metrics.increment_messages_produced("test-source", 5); + + assert_eq!(metrics.get_messages_produced("test-source"), 15); + assert_eq!(metrics.get_messages_produced("nonexistent"), 0); + } + + #[test] + fn test_get_messages_sent() { + let metrics = Metrics::init(); + metrics.increment_messages_sent("test-source", 20); + + assert_eq!(metrics.get_messages_sent("test-source"), 20); + } + + #[test] + fn test_get_messages_consumed() { + let metrics = Metrics::init(); + metrics.increment_messages_consumed("test-sink", 30); + + assert_eq!(metrics.get_messages_consumed("test-sink"), 30); + } + + #[test] + fn test_get_messages_processed() { + let metrics = Metrics::init(); + metrics.increment_messages_processed("test-sink", 25); + + assert_eq!(metrics.get_messages_processed("test-sink"), 25); + } + + #[test] + fn test_get_errors() { + let metrics = Metrics::init(); + metrics.increment_errors("test-source", ConnectorType::Source); + metrics.increment_errors("test-source", ConnectorType::Source); + metrics.increment_errors("test-sink", ConnectorType::Sink); + + assert_eq!(metrics.get_errors("test-source", ConnectorType::Source), 2); + assert_eq!(metrics.get_errors("test-sink", ConnectorType::Sink), 1); + assert_eq!(metrics.get_errors("nonexistent", ConnectorType::Source), 0); + } +} diff --git a/core/connectors/runtime/src/sink.rs b/core/connectors/runtime/src/sink.rs index 40b41d886..0987eb7e5 100644 --- a/core/connectors/runtime/src/sink.rs +++ b/core/connectors/runtime/src/sink.rs @@ -21,6 +21,7 @@ use crate::configs::connectors::SinkConfig; use crate::context::RuntimeContext; use crate::log::LOG_CALLBACK; use crate::manager::status::ConnectorStatus; +use crate::metrics::{ConnectorType, Metrics}; use crate::{ PLUGIN_ID, RuntimeError, SinkApi, SinkConnector, SinkConnectorConsumer, SinkConnectorPlugin, SinkConnectorWrapper, resolve_plugin_path, transform, @@ -196,7 +197,11 @@ pub fn consume(sinks: Vec<SinkConnectorWrapper>, context: Arc<RuntimeContext>) { tokio::spawn(async move { context .sinks - .update_status(&plugin_key, ConnectorStatus::Running) + .update_status( + &plugin_key, + ConnectorStatus::Running, + Some(&context.metrics), + ) .await; if let Err(error) = consume_messages( @@ -207,6 +212,8 @@ pub fn consume(sinks: Vec<SinkConnectorWrapper>, context: Arc<RuntimeContext>) { consumer.transforms, consumer.consumer, plugin.verbose, + &plugin_key, + &context.metrics, ) .await { @@ -215,6 +222,9 @@ pub fn consume(sinks: Vec<SinkConnectorWrapper>, context: Arc<RuntimeContext>) { plugin.id ); error!(err); + context + .metrics + .increment_errors(&plugin_key, ConnectorType::Sink); context.sinks.set_error(&plugin_key, &err).await; return; } @@ -228,6 +238,7 @@ pub fn consume(sinks: Vec<SinkConnectorWrapper>, context: Arc<RuntimeContext>) { } } +#[allow(clippy::too_many_arguments)] async fn consume_messages( plugin_id: u32, decoder: Arc<dyn StreamDecoder>, @@ -236,6 +247,8 @@ async fn consume_messages( transforms: Vec<Arc<dyn Transform>>, mut consumer: IggyConsumer, verbose: bool, + plugin_key: &str, + metrics: &Arc<Metrics>, ) -> Result<(), RuntimeError> { info!("Started consuming messages for sink connector with ID: {plugin_id}"); let batch_size = batch_size as usize; @@ -261,6 +274,7 @@ async fn consume_messages( let messages = std::mem::take(&mut batch); let messages_count = messages.len(); + metrics.increment_messages_consumed(plugin_key, messages_count as u64); let messages_metadata = MessagesMetadata { partition_id, current_offset, @@ -278,7 +292,7 @@ async fn consume_messages( ); } let start = Instant::now(); - if let Err(error) = process_messages( + let processed_count = match process_messages( plugin_id, messages_metadata, &topic_metadata, @@ -289,12 +303,17 @@ async fn consume_messages( ) .await { - error!( - "Failed to process {messages_count} messages for sink connector with ID: {plugin_id}. {error}", - ); - return Err(error); - } + Ok(count) => count, + Err(error) => { + error!( + "Failed to process {messages_count} messages for sink connector with ID: {plugin_id}. {error}", + ); + metrics.increment_errors(plugin_key, ConnectorType::Sink); + return Err(error); + } + }; + metrics.increment_messages_processed(plugin_key, processed_count as u64); let elapsed = start.elapsed(); if verbose { info!( @@ -341,7 +360,7 @@ async fn process_messages( consume: &ConsumeCallback, transforms: &Vec<Arc<dyn Transform>>, decoder: &Arc<dyn StreamDecoder>, -) -> Result<(), RuntimeError> { +) -> Result<usize, RuntimeError> { let messages = messages.into_iter().map(|message| ReceivedMessage { id: message.header.id, offset: message.header.offset, @@ -453,6 +472,8 @@ async fn process_messages( }); } + let processed_count = messages.len(); + let topic_meta = postcard::to_allocvec(topic_metadata).map_err(|error| { error!( "Failed to serialize topic metadata for sink connector with ID: {plugin_id}. {error}" @@ -486,5 +507,5 @@ async fn process_messages( messages.len(), ); - Ok(()) + Ok(processed_count) } diff --git a/core/connectors/runtime/src/source.rs b/core/connectors/runtime/src/source.rs index fba517989..c4f155b5a 100644 --- a/core/connectors/runtime/src/source.rs +++ b/core/connectors/runtime/src/source.rs @@ -39,6 +39,7 @@ use crate::configs::connectors::SourceConfig; use crate::context::RuntimeContext; use crate::log::LOG_CALLBACK; use crate::manager::status::ConnectorStatus; +use crate::metrics::ConnectorType; use crate::{ PLUGIN_ID, RuntimeError, SourceApi, SourceConnector, SourceConnectorPlugin, SourceConnectorProducer, SourceConnectorWrapper, resolve_plugin_path, @@ -256,7 +257,11 @@ pub fn handle(sources: Vec<SourceConnectorWrapper>, context: Arc<RuntimeContext> context .sources - .update_status(&plugin_key, ConnectorStatus::Running) + .update_status( + &plugin_key, + ConnectorStatus::Running, + Some(&context.metrics), + ) .await; let encoder = producer.encoder.clone(); let producer = &producer.producer; @@ -269,6 +274,9 @@ pub fn handle(sources: Vec<SourceConnectorWrapper>, context: Arc<RuntimeContext> while let Ok(produced_messages) = receiver.recv_async().await { let count = produced_messages.messages.len(); + context + .metrics + .increment_messages_produced(&plugin_key, count as u64); if plugin.verbose { info!("Source connector with ID: {plugin_id} received {count} messages",); } else { @@ -313,6 +321,9 @@ pub fn handle(sources: Vec<SourceConnectorWrapper>, context: Arc<RuntimeContext> producer.topic() ); error!(err); + context + .metrics + .increment_errors(&plugin_key, ConnectorType::Source); context.sources.set_error(&plugin_key, &err).await; continue; }; @@ -324,10 +335,17 @@ pub fn handle(sources: Vec<SourceConnectorWrapper>, context: Arc<RuntimeContext> producer.topic(), ); error!(err); + context + .metrics + .increment_errors(&plugin_key, ConnectorType::Source); context.sources.set_error(&plugin_key, &err).await; continue; } + context + .metrics + .increment_messages_sent(&plugin_key, count as u64); + if plugin.verbose { info!( "Sent {count} messages to stream: {}, topic: {} by source connector with ID: {plugin_id}", @@ -365,7 +383,11 @@ pub fn handle(sources: Vec<SourceConnectorWrapper>, context: Arc<RuntimeContext> info!("Source connector with ID: {plugin_id} stopped."); context .sources - .update_status(&plugin_key, ConnectorStatus::Stopped) + .update_status( + &plugin_key, + ConnectorStatus::Stopped, + Some(&context.metrics), + ) .await; }); } diff --git a/core/connectors/runtime/src/stats.rs b/core/connectors/runtime/src/stats.rs new file mode 100644 index 000000000..3effe2ca6 --- /dev/null +++ b/core/connectors/runtime/src/stats.rs @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use crate::context::RuntimeContext; +use crate::metrics::ConnectorType; +use iggy_common::IggyTimestamp; +use serde::Serialize; +use std::sync::Arc; +use sysinfo::System; + +#[derive(Debug, Serialize)] +pub struct ConnectorRuntimeStats { + pub process_id: u32, + pub cpu_usage: f32, + pub memory_usage: u64, + pub run_time: u64, + pub start_time: u64, + pub sources_total: u32, + pub sources_running: u32, + pub sinks_total: u32, + pub sinks_running: u32, + pub connectors: Vec<ConnectorStats>, +} + +#[derive(Debug, Serialize)] +pub struct ConnectorStats { + pub key: String, + pub name: String, + pub connector_type: String, + pub status: String, + pub enabled: bool, + #[serde(skip_serializing_if = "Option::is_none")] + pub messages_produced: Option<u64>, + #[serde(skip_serializing_if = "Option::is_none")] + pub messages_sent: Option<u64>, + #[serde(skip_serializing_if = "Option::is_none")] + pub messages_consumed: Option<u64>, + #[serde(skip_serializing_if = "Option::is_none")] + pub messages_processed: Option<u64>, + pub errors: u64, +} + +pub async fn get_runtime_stats(context: &Arc<RuntimeContext>) -> ConnectorRuntimeStats { + let pid = std::process::id(); + + let mut system = System::new(); + system.refresh_processes( + sysinfo::ProcessesToUpdate::Some(&[sysinfo::Pid::from_u32(pid)]), + true, + ); + + let (cpu_usage, memory_usage) = system + .process(sysinfo::Pid::from_u32(pid)) + .map(|p| (p.cpu_usage(), p.memory())) + .unwrap_or((0.0, 0)); + + let sources = context.sources.get_all().await; + let sinks = context.sinks.get_all().await; + + let sources_total = context.metrics.get_sources_total(); + let sinks_total = context.metrics.get_sinks_total(); + let sources_running = context.metrics.get_sources_running(); + let sinks_running = context.metrics.get_sinks_running(); + + let mut connectors = Vec::with_capacity(sources.len() + sinks.len()); + for source in &sources { + connectors.push(ConnectorStats { + key: source.key.clone(), + name: source.name.clone(), + connector_type: "source".to_owned(), + status: source.status.to_string(), + enabled: source.enabled, + messages_produced: Some(context.metrics.get_messages_produced(&source.key)), + messages_sent: Some(context.metrics.get_messages_sent(&source.key)), + messages_consumed: None, + messages_processed: None, + errors: context + .metrics + .get_errors(&source.key, ConnectorType::Source), + }); + } + for sink in &sinks { + connectors.push(ConnectorStats { + key: sink.key.clone(), + name: sink.name.clone(), + connector_type: "sink".to_owned(), + status: sink.status.to_string(), + enabled: sink.enabled, + messages_produced: None, + messages_sent: None, + messages_consumed: Some(context.metrics.get_messages_consumed(&sink.key)), + messages_processed: Some(context.metrics.get_messages_processed(&sink.key)), + errors: context.metrics.get_errors(&sink.key, ConnectorType::Sink), + }); + } + + let now = IggyTimestamp::now().as_micros(); + let start = context.start_time.as_micros(); + let run_time = now.saturating_sub(start); + + ConnectorRuntimeStats { + process_id: pid, + cpu_usage, + memory_usage, + run_time, + start_time: start, + sources_total, + sources_running, + sinks_total, + sinks_running, + connectors, + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_connector_stats_source_serialization() { + let stats = ConnectorStats { + key: "test-source".to_owned(), + name: "Test Source".to_owned(), + connector_type: "source".to_owned(), + status: "running".to_owned(), + enabled: true, + messages_produced: Some(100), + messages_sent: Some(95), + messages_consumed: None, + messages_processed: None, + errors: 5, + }; + + let json = serde_json::to_string(&stats).unwrap(); + assert!(json.contains("\"key\":\"test-source\"")); + assert!(json.contains("\"connector_type\":\"source\"")); + assert!(json.contains("\"messages_produced\":100")); + assert!(json.contains("\"messages_sent\":95")); + // None fields should not be serialized + assert!(!json.contains("messages_consumed")); + assert!(!json.contains("messages_processed")); + assert!(json.contains("\"errors\":5")); + } + + #[test] + fn test_connector_stats_sink_serialization() { + let stats = ConnectorStats { + key: "test-sink".to_owned(), + name: "Test Sink".to_owned(), + connector_type: "sink".to_owned(), + status: "running".to_owned(), + enabled: true, + messages_produced: None, + messages_sent: None, + messages_consumed: Some(200), + messages_processed: Some(195), + errors: 3, + }; + + let json = serde_json::to_string(&stats).unwrap(); + assert!(json.contains("\"key\":\"test-sink\"")); + assert!(json.contains("\"connector_type\":\"sink\"")); + // None fields should not be serialized + assert!(!json.contains("messages_produced")); + assert!(!json.contains("messages_sent")); + assert!(json.contains("\"messages_consumed\":200")); + assert!(json.contains("\"messages_processed\":195")); + assert!(json.contains("\"errors\":3")); + } + + #[test] + fn test_connector_runtime_stats_serialization() { + let stats = ConnectorRuntimeStats { + process_id: 12345, + cpu_usage: 25.5, + memory_usage: 1024000, + run_time: 60000000, + start_time: 1700000000000000, + sources_total: 2, + sources_running: 1, + sinks_total: 3, + sinks_running: 2, + connectors: vec![ + ConnectorStats { + key: "source-1".to_owned(), + name: "Source One".to_owned(), + connector_type: "source".to_owned(), + status: "running".to_owned(), + enabled: true, + messages_produced: Some(100), + messages_sent: Some(100), + messages_consumed: None, + messages_processed: None, + errors: 0, + }, + ConnectorStats { + key: "sink-1".to_owned(), + name: "Sink One".to_owned(), + connector_type: "sink".to_owned(), + status: "stopped".to_owned(), + enabled: false, + messages_produced: None, + messages_sent: None, + messages_consumed: Some(50), + messages_processed: Some(50), + errors: 0, + }, + ], + }; + + let json = serde_json::to_string(&stats).unwrap(); + assert!(json.contains("\"process_id\":12345")); + assert!(json.contains("\"sources_total\":2")); + assert!(json.contains("\"sources_running\":1")); + assert!(json.contains("\"sinks_total\":3")); + assert!(json.contains("\"sinks_running\":2")); + assert!(json.contains("\"connectors\":[")); + assert!(json.contains("\"messages_produced\":100")); + assert!(json.contains("\"messages_consumed\":50")); + } + + #[test] + fn test_connector_runtime_stats_empty_connectors() { + let stats = ConnectorRuntimeStats { + process_id: 1, + cpu_usage: 0.0, + memory_usage: 0, + run_time: 0, + start_time: 0, + sources_total: 0, + sources_running: 0, + sinks_total: 0, + sinks_running: 0, + connectors: vec![], + }; + + let json = serde_json::to_string(&stats).unwrap(); + assert!(json.contains("\"connectors\":[]")); + } +} diff --git a/core/server/Cargo.toml b/core/server/Cargo.toml index d311bc870..8adfda896 100644 --- a/core/server/Cargo.toml +++ b/core/server/Cargo.toml @@ -84,7 +84,7 @@ opentelemetry-otlp = { workspace = true } opentelemetry-semantic-conventions = { workspace = true } opentelemetry_sdk = { workspace = true } papaya = "0.2.3" -prometheus-client = "0.24.0" +prometheus-client = { workspace = true } rand = { workspace = true } reqwest = { workspace = true, features = ["rustls-tls-no-provider"] } ringbuffer = "0.16.0"
