This is an automated email from the ASF dual-hosted git repository.
piotr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new 76e928a44 feat(connectors): add Prometheus metrics and stats endpoints
(#2633)
76e928a44 is described below
commit 76e928a4450a0c7a34fee78bb77ee20c8e9deda7
Author: Maciej Modzelewski <[email protected]>
AuthorDate: Wed Jan 28 22:29:07 2026 +0100
feat(connectors): add Prometheus metrics and stats endpoints (#2633)
- Add /metrics endpoint for Prometheus scraping (configurable)
- Add /stats endpoint returning JSON runtime statistics
- Track runtime gauges: sources/sinks total and running counts
- Track per-connector counters: messages produced/sent (source),
consumed/processed (sink), and errors
- Add cpu/memory usage and uptime to stats response
---
Cargo.lock | 4 +-
Cargo.toml | 1 +
DEPENDENCIES.md | 2 +-
core/connectors/runtime/Cargo.toml | 4 +-
core/connectors/runtime/README.md | 25 ++
core/connectors/runtime/config.toml | 4 +
core/connectors/runtime/runtime.http | 6 +
core/connectors/runtime/src/api/config.rs | 31 +-
core/connectors/runtime/src/api/mod.rs | 22 +-
core/connectors/runtime/src/context.rs | 17 +-
core/connectors/runtime/src/main.rs | 2 +
core/connectors/runtime/src/manager/sink.rs | 18 +-
core/connectors/runtime/src/manager/source.rs | 18 +-
core/connectors/runtime/src/metrics.rs | 478 +++++++++++++++++++++
core/connectors/runtime/src/sink.rs | 39 +-
core/connectors/runtime/src/source.rs | 26 +-
core/connectors/runtime/src/stats.rs | 129 ++++++
core/integration/tests/connectors/api/config.toml | 29 ++
.../tests/connectors/api/connectors/.gitkeep | 0
core/integration/tests/connectors/api/endpoints.rs | 217 ++++++++++
core/integration/tests/connectors/api/mod.rs | 29 ++
core/integration/tests/connectors/mod.rs | 1 +
core/server/Cargo.toml | 2 +-
23 files changed, 1082 insertions(+), 22 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index 5b992c303..65a3c1509 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -4536,7 +4536,7 @@ dependencies = [
[[package]]
name = "iggy-connectors"
-version = "0.2.1-edge.5"
+version = "0.2.1-edge.6"
dependencies = [
"async-trait",
"axum",
@@ -4565,6 +4565,7 @@ dependencies = [
"opentelemetry-semantic-conventions",
"opentelemetry_sdk",
"postcard",
+ "prometheus-client",
"reqwest",
"reqwest-middleware",
"reqwest-retry",
@@ -4574,6 +4575,7 @@ dependencies = [
"serde_with",
"serde_yaml_ng",
"strum",
+ "sysinfo 0.38.0",
"tempfile",
"thiserror 2.0.18",
"tokio",
diff --git a/Cargo.toml b/Cargo.toml
index e2aba3ca2..e6797945c 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -179,6 +179,7 @@ passterm = "=2.0.1"
postcard = { version = "1.1.3", features = ["alloc"] }
predicates = "3.1.3"
proc-macro2 = "1"
+prometheus-client = "0.24.0"
quinn = "0.11.9"
quote = "1"
rand = "0.9.2"
diff --git a/DEPENDENCIES.md b/DEPENDENCIES.md
index 94277cfcc..b5861ebea 100644
--- a/DEPENDENCIES.md
+++ b/DEPENDENCIES.md
@@ -389,7 +389,7 @@ iggy: 0.8.1-edge.7, "Apache-2.0",
iggy-bench: 0.3.1-edge.2, "Apache-2.0",
iggy-bench-dashboard-server: 0.5.1-edge.1, "Apache-2.0",
iggy-cli: 0.10.1-edge.1, "Apache-2.0",
-iggy-connectors: 0.2.1-edge.5, "Apache-2.0",
+iggy-connectors: 0.2.1-edge.6, "Apache-2.0",
iggy-mcp: 0.2.1-edge.5, "Apache-2.0",
iggy_binary_protocol: 0.8.1-edge.3, "Apache-2.0",
iggy_common: 0.8.1-edge.2, "Apache-2.0",
diff --git a/core/connectors/runtime/Cargo.toml
b/core/connectors/runtime/Cargo.toml
index 970c1a7e8..26c6fc9d3 100644
--- a/core/connectors/runtime/Cargo.toml
+++ b/core/connectors/runtime/Cargo.toml
@@ -17,7 +17,7 @@
[package]
name = "iggy-connectors"
-version = "0.2.1-edge.5"
+version = "0.2.1-edge.6"
description = "Connectors runtime for Iggy message streaming platform"
edition = "2024"
license = "Apache-2.0"
@@ -56,6 +56,7 @@ opentelemetry-otlp = { workspace = true }
opentelemetry-semantic-conventions = { workspace = true }
opentelemetry_sdk = { workspace = true, features = ["rt-tokio"] }
postcard = { workspace = true }
+prometheus-client = { workspace = true }
reqwest = { workspace = true }
reqwest-middleware = { workspace = true }
reqwest-retry = { workspace = true }
@@ -65,6 +66,7 @@ serde_json = { workspace = true }
serde_with = { workspace = true }
serde_yaml_ng = { workspace = true }
strum = { workspace = true }
+sysinfo = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
toml = { workspace = true }
diff --git a/core/connectors/runtime/README.md
b/core/connectors/runtime/README.md
index 8b0d2584c..6433fb822 100644
--- a/core/connectors/runtime/README.md
+++ b/core/connectors/runtime/README.md
@@ -134,6 +134,10 @@ exposed_headers = [""]
allow_credentials = false
allow_private_network = false
+[http.metrics] # Optional Prometheus metrics configuration
+enabled = false
+endpoint = "/metrics"
+
[http.tls] # Optional TLS configuration for HTTP API
enabled = false
cert_file = "core/certs/iggy_cert.pem"
@@ -144,6 +148,8 @@ Currently, it does expose the following endpoints:
- `GET /`: welcome message.
- `GET /health`: health status of the runtime.
+- `GET /stats`: runtime statistics including process info, memory/CPU usage,
and connector status.
+- `GET /metrics`: Prometheus-formatted metrics (when `http.metrics.enabled` is
`true`).
- `GET /sinks`: list of sinks.
- `GET /sinks/{key}`: sink details.
- `GET /sinks/{key}/configs`: list of configuration versions for the sink.
@@ -180,3 +186,22 @@ endpoint = "http://localhost:4317"
transport = "grpc" # Options: "grpc", "http"
endpoint = "http://localhost:4317"
```
+
+## Metrics
+
+The runtime exposes Prometheus-compatible metrics via the `/metrics` endpoint
when enabled. The following metrics are available:
+
+### Runtime Gauges
+
+- `iggy_connectors_sources_total`: Total configured source connectors
+- `iggy_connectors_sources_running`: Sources currently in Running status
+- `iggy_connectors_sinks_total`: Total configured sink connectors
+- `iggy_connectors_sinks_running`: Sinks currently in Running status
+
+### Per-Connector Counters (labeled with `connector_key` and `connector_type`)
+
+- `iggy_connector_messages_produced_total`: Messages received from source
plugin
+- `iggy_connector_messages_sent_total`: Messages sent to Iggy (source)
+- `iggy_connector_messages_consumed_total`: Messages consumed from Iggy (sink)
+- `iggy_connector_messages_processed_total`: Messages processed and sent to
sink plugin
+- `iggy_connector_errors_total`: Errors encountered
diff --git a/core/connectors/runtime/config.toml
b/core/connectors/runtime/config.toml
index abfa1ab50..247a67e6b 100644
--- a/core/connectors/runtime/config.toml
+++ b/core/connectors/runtime/config.toml
@@ -29,6 +29,10 @@ exposed_headers = [""]
allow_credentials = false
allow_private_network = false
+[http.metrics] # Optional Prometheus metrics configuration
+enabled = false
+endpoint = "/metrics"
+
[http.tls] # Optional TLS configuration for HTTP API
enabled = false
cert_file = "core/certs/iggy_cert.pem"
diff --git a/core/connectors/runtime/runtime.http
b/core/connectors/runtime/runtime.http
index 7c08bef4f..3205c7b8b 100644
--- a/core/connectors/runtime/runtime.http
+++ b/core/connectors/runtime/runtime.http
@@ -27,6 +27,12 @@ GET {{url}}
###
GET {{url}}/health
+###
+GET {{url}}/stats
+
+###
+GET {{url}}/metrics
+
###
GET {{url}}/sinks
diff --git a/core/connectors/runtime/src/api/config.rs
b/core/connectors/runtime/src/api/config.rs
index 8efdb8ae5..2b5bc009a 100644
--- a/core/connectors/runtime/src/api/config.rs
+++ b/core/connectors/runtime/src/api/config.rs
@@ -39,6 +39,13 @@ pub struct HttpConfig {
pub api_key: String,
pub cors: HttpCorsConfig,
pub tls: HttpTlsConfig,
+ pub metrics: HttpMetricsConfig,
+}
+
+#[derive(Debug, Clone, Deserialize, Serialize, ConfigEnv)]
+pub struct HttpMetricsConfig {
+ pub enabled: bool,
+ pub endpoint: String,
}
#[derive(Debug, Default, Deserialize, Serialize, Clone, ConfigEnv)]
@@ -135,6 +142,25 @@ pub fn configure_cors(config: &HttpCorsConfig) ->
CorsLayer {
.allow_private_network(config.allow_private_network)
}
+impl Default for HttpMetricsConfig {
+ fn default() -> Self {
+ Self {
+ enabled: false,
+ endpoint: "/metrics".to_owned(),
+ }
+ }
+}
+
+impl std::fmt::Display for HttpMetricsConfig {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ write!(
+ f,
+ "{{ enabled: {}, endpoint: {} }}",
+ self.enabled, self.endpoint
+ )
+ }
+}
+
impl Default for HttpConfig {
fn default() -> Self {
Self {
@@ -143,6 +169,7 @@ impl Default for HttpConfig {
api_key: "".to_owned(),
cors: HttpCorsConfig::default(),
tls: HttpTlsConfig::default(),
+ metrics: HttpMetricsConfig::default(),
}
}
}
@@ -151,8 +178,8 @@ impl std::fmt::Display for HttpConfig {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
- "{{ address: {}, api_key: {}, cors: {}, tls: {} }}",
- self.address, self.api_key, self.cors, self.tls
+ "{{ address: {}, api_key: {}, cors: {}, tls: {}, metrics: {} }}",
+ self.address, self.api_key, self.cors, self.tls, self.metrics
)
}
}
diff --git a/core/connectors/runtime/src/api/mod.rs
b/core/connectors/runtime/src/api/mod.rs
index 85b2d4400..d8dea6b82 100644
--- a/core/connectors/runtime/src/api/mod.rs
+++ b/core/connectors/runtime/src/api/mod.rs
@@ -18,8 +18,9 @@
*/
use crate::context::RuntimeContext;
+use crate::stats;
use auth::resolve_api_key;
-use axum::{Json, Router, middleware, routing::get};
+use axum::{Json, Router, extract::State, middleware, routing::get};
use axum_server::tls_rustls::RustlsConfig;
use config::{HttpConfig, configure_cors};
use std::{net::SocketAddr, path::PathBuf, sync::Arc};
@@ -41,12 +42,21 @@ pub async fn init(config: &HttpConfig, context:
Arc<RuntimeContext>) {
return;
}
+ let mut system_router = Router::new().route("/stats", get(get_stats));
+
+ if config.metrics.enabled {
+ system_router = system_router.route(&config.metrics.endpoint,
get(get_metrics));
+ }
+
+ let system_router = system_router.with_state(context.clone());
+
let mut app = Router::new()
.route("/", get(|| async { "Connector Runtime API" }))
.route(
"/health",
get(|| async { Json(serde_json::json!({ "status": "healthy" })) }),
)
+ .merge(system_router)
.merge(sink::router(context.clone()))
.merge(source::router(context.clone()));
@@ -111,3 +121,13 @@ pub async fn init(config: &HttpConfig, context:
Arc<RuntimeContext>) {
}
});
}
+
+async fn get_metrics(State(context): State<Arc<RuntimeContext>>) -> String {
+ context.metrics.get_formatted_output()
+}
+
+async fn get_stats(
+ State(context): State<Arc<RuntimeContext>>,
+) -> Json<stats::ConnectorRuntimeStats> {
+ Json(stats::get_runtime_stats(&context).await)
+}
diff --git a/core/connectors/runtime/src/context.rs
b/core/connectors/runtime/src/context.rs
index fe2a8e034..20d9906ca 100644
--- a/core/connectors/runtime/src/context.rs
+++ b/core/connectors/runtime/src/context.rs
@@ -19,6 +19,7 @@
use crate::configs::connectors::{ConnectorsConfigProvider, SinkConfig,
SourceConfig};
use crate::configs::runtime::ConnectorsRuntimeConfig;
use crate::manager::status::ConnectorError;
+use crate::metrics::Metrics;
use crate::{
SinkConnectorWrapper, SourceConnectorWrapper,
manager::{
@@ -27,6 +28,7 @@ use crate::{
status::ConnectorStatus,
},
};
+use iggy_common::IggyTimestamp;
use std::collections::HashMap;
use std::sync::Arc;
use tracing::error;
@@ -36,6 +38,8 @@ pub struct RuntimeContext {
pub sources: SourceManager,
pub api_key: String,
pub config_provider: Arc<dyn ConnectorsConfigProvider>,
+ pub metrics: Arc<Metrics>,
+ pub start_time: IggyTimestamp,
}
pub fn init(
@@ -46,11 +50,20 @@ pub fn init(
source_wrappers: &[SourceConnectorWrapper],
config_provider: Box<dyn ConnectorsConfigProvider>,
) -> RuntimeContext {
+ let metrics = Arc::new(Metrics::init());
+ let sinks = SinkManager::new(map_sinks(sinks_config, sink_wrappers));
+ let sources = SourceManager::new(map_sources(sources_config,
source_wrappers));
+
+ metrics.set_sinks_total(sinks_config.len() as u32);
+ metrics.set_sources_total(sources_config.len() as u32);
+
RuntimeContext {
- sinks: SinkManager::new(map_sinks(sinks_config, sink_wrappers)),
- sources: SourceManager::new(map_sources(sources_config,
source_wrappers)),
+ sinks,
+ sources,
api_key: config.http.api_key.to_owned(),
config_provider: Arc::from(config_provider),
+ metrics,
+ start_time: IggyTimestamp::now(),
}
}
diff --git a/core/connectors/runtime/src/main.rs
b/core/connectors/runtime/src/main.rs
index 657ed646a..b71bd5eef 100644
--- a/core/connectors/runtime/src/main.rs
+++ b/core/connectors/runtime/src/main.rs
@@ -48,9 +48,11 @@ pub(crate) mod context;
pub(crate) mod error;
mod log;
mod manager;
+pub(crate) mod metrics;
mod sink;
mod source;
mod state;
+pub(crate) mod stats;
mod stream;
mod transform;
diff --git a/core/connectors/runtime/src/manager/sink.rs
b/core/connectors/runtime/src/manager/sink.rs
index 93ad280bd..495693716 100644
--- a/core/connectors/runtime/src/manager/sink.rs
+++ b/core/connectors/runtime/src/manager/sink.rs
@@ -18,6 +18,7 @@
*/
use super::status::{ConnectorError, ConnectorStatus};
use crate::configs::connectors::{ConfigFormat, SinkConfig};
+use crate::metrics::Metrics;
use dashmap::DashMap;
use std::collections::HashMap;
use std::sync::Arc;
@@ -63,13 +64,28 @@ impl SinkManager {
results
}
- pub async fn update_status(&self, key: &str, status: ConnectorStatus) {
+ pub async fn update_status(
+ &self,
+ key: &str,
+ status: ConnectorStatus,
+ metrics: Option<&Arc<Metrics>>,
+ ) {
if let Some(sink) = self.sinks.get(key) {
let mut sink = sink.lock().await;
+ let old_status = sink.info.status;
sink.info.status = status;
if matches!(status, ConnectorStatus::Running |
ConnectorStatus::Stopped) {
sink.info.last_error = None;
}
+ if let Some(metrics) = metrics {
+ if old_status != ConnectorStatus::Running && status ==
ConnectorStatus::Running {
+ metrics.increment_sinks_running();
+ } else if old_status == ConnectorStatus::Running
+ && status != ConnectorStatus::Running
+ {
+ metrics.decrement_sinks_running();
+ }
+ }
}
}
diff --git a/core/connectors/runtime/src/manager/source.rs
b/core/connectors/runtime/src/manager/source.rs
index 801a567e7..051668640 100644
--- a/core/connectors/runtime/src/manager/source.rs
+++ b/core/connectors/runtime/src/manager/source.rs
@@ -18,6 +18,7 @@
*/
use super::status::{ConnectorError, ConnectorStatus};
use crate::configs::connectors::{ConfigFormat, SourceConfig};
+use crate::metrics::Metrics;
use dashmap::DashMap;
use std::collections::HashMap;
use std::sync::Arc;
@@ -63,13 +64,28 @@ impl SourceManager {
results
}
- pub async fn update_status(&self, key: &str, status: ConnectorStatus) {
+ pub async fn update_status(
+ &self,
+ key: &str,
+ status: ConnectorStatus,
+ metrics: Option<&Arc<Metrics>>,
+ ) {
if let Some(source) = self.sources.get(key) {
let mut source = source.lock().await;
+ let old_status = source.info.status;
source.info.status = status;
if matches!(status, ConnectorStatus::Running |
ConnectorStatus::Stopped) {
source.info.last_error = None;
}
+ if let Some(metrics) = metrics {
+ if old_status != ConnectorStatus::Running && status ==
ConnectorStatus::Running {
+ metrics.increment_sources_running();
+ } else if old_status == ConnectorStatus::Running
+ && status != ConnectorStatus::Running
+ {
+ metrics.decrement_sources_running();
+ }
+ }
}
}
diff --git a/core/connectors/runtime/src/metrics.rs
b/core/connectors/runtime/src/metrics.rs
new file mode 100644
index 000000000..8d9c29cdb
--- /dev/null
+++ b/core/connectors/runtime/src/metrics.rs
@@ -0,0 +1,478 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use prometheus_client::encoding::text::encode;
+use prometheus_client::encoding::{EncodeLabelSet, EncodeLabelValue};
+use prometheus_client::metrics::counter::Counter;
+use prometheus_client::metrics::family::Family;
+use prometheus_client::metrics::gauge::Gauge;
+use prometheus_client::registry::Registry;
+use std::sync::Arc;
+use tracing::error;
+
+#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)]
+pub struct ConnectorLabels {
+ pub connector_key: String,
+ pub connector_type: ConnectorType,
+}
+
+#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelValue)]
+pub enum ConnectorType {
+ Source,
+ Sink,
+}
+
+#[derive(Debug, Clone)]
+pub struct Metrics {
+ registry: Arc<Registry>,
+ sources_total: Gauge,
+ sources_running: Gauge,
+ sinks_total: Gauge,
+ sinks_running: Gauge,
+ messages_produced: Family<ConnectorLabels, Counter>,
+ messages_sent: Family<ConnectorLabels, Counter>,
+ messages_consumed: Family<ConnectorLabels, Counter>,
+ messages_processed: Family<ConnectorLabels, Counter>,
+ errors: Family<ConnectorLabels, Counter>,
+}
+
+impl Metrics {
+ pub fn init() -> Self {
+ let mut registry = Registry::default();
+
+ let sources_total = Gauge::default();
+ let sources_running = Gauge::default();
+ let sinks_total = Gauge::default();
+ let sinks_running = Gauge::default();
+ let messages_produced = Family::<ConnectorLabels, Counter>::default();
+ let messages_sent = Family::<ConnectorLabels, Counter>::default();
+ let messages_consumed = Family::<ConnectorLabels, Counter>::default();
+ let messages_processed = Family::<ConnectorLabels, Counter>::default();
+ let errors = Family::<ConnectorLabels, Counter>::default();
+
+ registry.register(
+ "iggy_connectors_sources_total",
+ "Total configured source connectors",
+ sources_total.clone(),
+ );
+ registry.register(
+ "iggy_connectors_sources_running",
+ "Sources in Running status",
+ sources_running.clone(),
+ );
+ registry.register(
+ "iggy_connectors_sinks_total",
+ "Total configured sink connectors",
+ sinks_total.clone(),
+ );
+ registry.register(
+ "iggy_connectors_sinks_running",
+ "Sinks in Running status",
+ sinks_running.clone(),
+ );
+ registry.register(
+ "iggy_connector_messages_produced_total",
+ "Messages received from source plugin poll",
+ messages_produced.clone(),
+ );
+ registry.register(
+ "iggy_connector_messages_sent_total",
+ "Messages sent to Iggy (source)",
+ messages_sent.clone(),
+ );
+ registry.register(
+ "iggy_connector_messages_consumed_total",
+ "Messages consumed from Iggy (sink)",
+ messages_consumed.clone(),
+ );
+ registry.register(
+ "iggy_connector_messages_processed_total",
+ "Messages processed and sent to sink plugin",
+ messages_processed.clone(),
+ );
+ registry.register(
+ "iggy_connector_errors_total",
+ "Errors encountered",
+ errors.clone(),
+ );
+
+ Self {
+ registry: Arc::new(registry),
+ sources_total,
+ sources_running,
+ sinks_total,
+ sinks_running,
+ messages_produced,
+ messages_sent,
+ messages_consumed,
+ messages_processed,
+ errors,
+ }
+ }
+
+ pub fn get_formatted_output(&self) -> String {
+ let mut buffer = String::new();
+ if let Err(err) = encode(&mut buffer, &self.registry) {
+ error!("Failed to encode metrics: {}", err);
+ }
+ buffer
+ }
+
+ pub fn set_sources_total(&self, count: u32) {
+ self.sources_total.set(count as i64);
+ }
+
+ pub fn set_sinks_total(&self, count: u32) {
+ self.sinks_total.set(count as i64);
+ }
+
+ pub fn get_sources_total(&self) -> u32 {
+ self.sources_total.get() as u32
+ }
+
+ pub fn get_sinks_total(&self) -> u32 {
+ self.sinks_total.get() as u32
+ }
+
+ pub fn get_sources_running(&self) -> u32 {
+ self.sources_running.get() as u32
+ }
+
+ pub fn get_sinks_running(&self) -> u32 {
+ self.sinks_running.get() as u32
+ }
+
+ pub fn increment_sources_running(&self) {
+ self.sources_running.inc();
+ }
+
+ pub fn decrement_sources_running(&self) {
+ self.sources_running.dec();
+ }
+
+ pub fn increment_sinks_running(&self) {
+ self.sinks_running.inc();
+ }
+
+ pub fn decrement_sinks_running(&self) {
+ self.sinks_running.dec();
+ }
+
+ pub fn increment_messages_produced(&self, key: &str, count: u64) {
+ self.messages_produced
+ .get_or_create(&ConnectorLabels {
+ connector_key: key.to_owned(),
+ connector_type: ConnectorType::Source,
+ })
+ .inc_by(count);
+ }
+
+ pub fn increment_messages_sent(&self, key: &str, count: u64) {
+ self.messages_sent
+ .get_or_create(&ConnectorLabels {
+ connector_key: key.to_owned(),
+ connector_type: ConnectorType::Source,
+ })
+ .inc_by(count);
+ }
+
+ pub fn increment_messages_consumed(&self, key: &str, count: u64) {
+ self.messages_consumed
+ .get_or_create(&ConnectorLabels {
+ connector_key: key.to_owned(),
+ connector_type: ConnectorType::Sink,
+ })
+ .inc_by(count);
+ }
+
+ pub fn increment_messages_processed(&self, key: &str, count: u64) {
+ self.messages_processed
+ .get_or_create(&ConnectorLabels {
+ connector_key: key.to_owned(),
+ connector_type: ConnectorType::Sink,
+ })
+ .inc_by(count);
+ }
+
+ pub fn increment_errors(&self, key: &str, connector_type: ConnectorType) {
+ self.errors
+ .get_or_create(&ConnectorLabels {
+ connector_key: key.to_owned(),
+ connector_type,
+ })
+ .inc();
+ }
+
+ pub fn get_messages_produced(&self, key: &str) -> u64 {
+ self.messages_produced
+ .get_or_create(&ConnectorLabels {
+ connector_key: key.to_owned(),
+ connector_type: ConnectorType::Source,
+ })
+ .get()
+ }
+
+ pub fn get_messages_sent(&self, key: &str) -> u64 {
+ self.messages_sent
+ .get_or_create(&ConnectorLabels {
+ connector_key: key.to_owned(),
+ connector_type: ConnectorType::Source,
+ })
+ .get()
+ }
+
+ pub fn get_messages_consumed(&self, key: &str) -> u64 {
+ self.messages_consumed
+ .get_or_create(&ConnectorLabels {
+ connector_key: key.to_owned(),
+ connector_type: ConnectorType::Sink,
+ })
+ .get()
+ }
+
+ pub fn get_messages_processed(&self, key: &str) -> u64 {
+ self.messages_processed
+ .get_or_create(&ConnectorLabels {
+ connector_key: key.to_owned(),
+ connector_type: ConnectorType::Sink,
+ })
+ .get()
+ }
+
+ pub fn get_errors(&self, key: &str, connector_type: ConnectorType) -> u64 {
+ self.errors
+ .get_or_create(&ConnectorLabels {
+ connector_key: key.to_owned(),
+ connector_type,
+ })
+ .get()
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_metrics_init() {
+ let metrics = Metrics::init();
+ let output = metrics.get_formatted_output();
+
+ assert!(output.contains("iggy_connectors_sources_total"));
+ assert!(output.contains("iggy_connectors_sources_running"));
+ assert!(output.contains("iggy_connectors_sinks_total"));
+ assert!(output.contains("iggy_connectors_sinks_running"));
+ }
+
+ #[test]
+ fn test_set_sources_total() {
+ let metrics = Metrics::init();
+ metrics.set_sources_total(5);
+
+ let output = metrics.get_formatted_output();
+ assert!(output.contains("iggy_connectors_sources_total 5"));
+ }
+
+ #[test]
+ fn test_set_sinks_total() {
+ let metrics = Metrics::init();
+ metrics.set_sinks_total(3);
+
+ let output = metrics.get_formatted_output();
+ assert!(output.contains("iggy_connectors_sinks_total 3"));
+ }
+
+ #[test]
+ fn test_increment_sources_running() {
+ let metrics = Metrics::init();
+ metrics.increment_sources_running();
+ metrics.increment_sources_running();
+
+ let output = metrics.get_formatted_output();
+ assert!(output.contains("iggy_connectors_sources_running 2"));
+ }
+
+ #[test]
+ fn test_decrement_sources_running() {
+ let metrics = Metrics::init();
+ metrics.increment_sources_running();
+ metrics.increment_sources_running();
+ metrics.decrement_sources_running();
+
+ let output = metrics.get_formatted_output();
+ assert!(output.contains("iggy_connectors_sources_running 1"));
+ }
+
+ #[test]
+ fn test_increment_sinks_running() {
+ let metrics = Metrics::init();
+ metrics.increment_sinks_running();
+
+ let output = metrics.get_formatted_output();
+ assert!(output.contains("iggy_connectors_sinks_running 1"));
+ }
+
+ #[test]
+ fn test_decrement_sinks_running() {
+ let metrics = Metrics::init();
+ metrics.increment_sinks_running();
+ metrics.increment_sinks_running();
+ metrics.decrement_sinks_running();
+
+ let output = metrics.get_formatted_output();
+ assert!(output.contains("iggy_connectors_sinks_running 1"));
+ }
+
+ #[test]
+ fn test_increment_messages_produced() {
+ let metrics = Metrics::init();
+ metrics.increment_messages_produced("test-source", 10);
+
+ let output = metrics.get_formatted_output();
+ assert!(output.contains("iggy_connector_messages_produced_total"));
+ assert!(output.contains("connector_key=\"test-source\""));
+ assert!(output.contains("connector_type=\"Source\""));
+ }
+
+ #[test]
+ fn test_increment_messages_sent() {
+ let metrics = Metrics::init();
+ metrics.increment_messages_sent("test-source", 5);
+
+ let output = metrics.get_formatted_output();
+ assert!(output.contains("iggy_connector_messages_sent_total"));
+ assert!(output.contains("connector_key=\"test-source\""));
+ }
+
+ #[test]
+ fn test_increment_messages_consumed() {
+ let metrics = Metrics::init();
+ metrics.increment_messages_consumed("test-sink", 20);
+
+ let output = metrics.get_formatted_output();
+ assert!(output.contains("iggy_connector_messages_consumed_total"));
+ assert!(output.contains("connector_key=\"test-sink\""));
+ assert!(output.contains("connector_type=\"Sink\""));
+ }
+
+ #[test]
+ fn test_increment_messages_processed() {
+ let metrics = Metrics::init();
+ metrics.increment_messages_processed("test-sink", 15);
+
+ let output = metrics.get_formatted_output();
+ assert!(output.contains("iggy_connector_messages_processed_total"));
+ assert!(output.contains("connector_key=\"test-sink\""));
+ }
+
+ #[test]
+ fn test_increment_errors_source() {
+ let metrics = Metrics::init();
+ metrics.increment_errors("test-source", ConnectorType::Source);
+
+ let output = metrics.get_formatted_output();
+ assert!(output.contains("iggy_connector_errors_total"));
+ assert!(output.contains("connector_key=\"test-source\""));
+ assert!(output.contains("connector_type=\"Source\""));
+ }
+
+ #[test]
+ fn test_increment_errors_sink() {
+ let metrics = Metrics::init();
+ metrics.increment_errors("test-sink", ConnectorType::Sink);
+
+ let output = metrics.get_formatted_output();
+ assert!(output.contains("iggy_connector_errors_total"));
+ assert!(output.contains("connector_key=\"test-sink\""));
+ assert!(output.contains("connector_type=\"Sink\""));
+ }
+
+ #[test]
+ fn test_multiple_connectors() {
+ let metrics = Metrics::init();
+ metrics.increment_messages_produced("source-1", 10);
+ metrics.increment_messages_produced("source-2", 20);
+ metrics.increment_messages_consumed("sink-1", 15);
+
+ let output = metrics.get_formatted_output();
+ assert!(output.contains("connector_key=\"source-1\""));
+ assert!(output.contains("connector_key=\"source-2\""));
+ assert!(output.contains("connector_key=\"sink-1\""));
+ }
+
+ #[test]
+ fn test_cumulative_counts() {
+ let metrics = Metrics::init();
+ metrics.increment_messages_produced("test-source", 10);
+ metrics.increment_messages_produced("test-source", 5);
+
+ let output = metrics.get_formatted_output();
+ // Verify cumulative behavior - the same key should accumulate
+ assert!(output.contains("iggy_connector_messages_produced_total"));
+ assert!(output.contains("connector_key=\"test-source\""));
+ // Check the value is 15 (10 + 5)
+ assert!(output.contains("} 15"));
+ }
+
+ #[test]
+ fn test_get_messages_produced() {
+ let metrics = Metrics::init();
+ metrics.increment_messages_produced("test-source", 10);
+ metrics.increment_messages_produced("test-source", 5);
+
+ assert_eq!(metrics.get_messages_produced("test-source"), 15);
+ assert_eq!(metrics.get_messages_produced("nonexistent"), 0);
+ }
+
+ #[test]
+ fn test_get_messages_sent() {
+ let metrics = Metrics::init();
+ metrics.increment_messages_sent("test-source", 20);
+
+ assert_eq!(metrics.get_messages_sent("test-source"), 20);
+ }
+
+ #[test]
+ fn test_get_messages_consumed() {
+ let metrics = Metrics::init();
+ metrics.increment_messages_consumed("test-sink", 30);
+
+ assert_eq!(metrics.get_messages_consumed("test-sink"), 30);
+ }
+
+ #[test]
+ fn test_get_messages_processed() {
+ let metrics = Metrics::init();
+ metrics.increment_messages_processed("test-sink", 25);
+
+ assert_eq!(metrics.get_messages_processed("test-sink"), 25);
+ }
+
+ #[test]
+ fn test_get_errors() {
+ let metrics = Metrics::init();
+ metrics.increment_errors("test-source", ConnectorType::Source);
+ metrics.increment_errors("test-source", ConnectorType::Source);
+ metrics.increment_errors("test-sink", ConnectorType::Sink);
+
+ assert_eq!(metrics.get_errors("test-source", ConnectorType::Source),
2);
+ assert_eq!(metrics.get_errors("test-sink", ConnectorType::Sink), 1);
+ assert_eq!(metrics.get_errors("nonexistent", ConnectorType::Source),
0);
+ }
+}
diff --git a/core/connectors/runtime/src/sink.rs
b/core/connectors/runtime/src/sink.rs
index 40b41d886..0987eb7e5 100644
--- a/core/connectors/runtime/src/sink.rs
+++ b/core/connectors/runtime/src/sink.rs
@@ -21,6 +21,7 @@ use crate::configs::connectors::SinkConfig;
use crate::context::RuntimeContext;
use crate::log::LOG_CALLBACK;
use crate::manager::status::ConnectorStatus;
+use crate::metrics::{ConnectorType, Metrics};
use crate::{
PLUGIN_ID, RuntimeError, SinkApi, SinkConnector, SinkConnectorConsumer,
SinkConnectorPlugin,
SinkConnectorWrapper, resolve_plugin_path, transform,
@@ -196,7 +197,11 @@ pub fn consume(sinks: Vec<SinkConnectorWrapper>, context:
Arc<RuntimeContext>) {
tokio::spawn(async move {
context
.sinks
- .update_status(&plugin_key, ConnectorStatus::Running)
+ .update_status(
+ &plugin_key,
+ ConnectorStatus::Running,
+ Some(&context.metrics),
+ )
.await;
if let Err(error) = consume_messages(
@@ -207,6 +212,8 @@ pub fn consume(sinks: Vec<SinkConnectorWrapper>, context:
Arc<RuntimeContext>) {
consumer.transforms,
consumer.consumer,
plugin.verbose,
+ &plugin_key,
+ &context.metrics,
)
.await
{
@@ -215,6 +222,9 @@ pub fn consume(sinks: Vec<SinkConnectorWrapper>, context:
Arc<RuntimeContext>) {
plugin.id
);
error!(err);
+ context
+ .metrics
+ .increment_errors(&plugin_key,
ConnectorType::Sink);
context.sinks.set_error(&plugin_key, &err).await;
return;
}
@@ -228,6 +238,7 @@ pub fn consume(sinks: Vec<SinkConnectorWrapper>, context:
Arc<RuntimeContext>) {
}
}
+#[allow(clippy::too_many_arguments)]
async fn consume_messages(
plugin_id: u32,
decoder: Arc<dyn StreamDecoder>,
@@ -236,6 +247,8 @@ async fn consume_messages(
transforms: Vec<Arc<dyn Transform>>,
mut consumer: IggyConsumer,
verbose: bool,
+ plugin_key: &str,
+ metrics: &Arc<Metrics>,
) -> Result<(), RuntimeError> {
info!("Started consuming messages for sink connector with ID:
{plugin_id}");
let batch_size = batch_size as usize;
@@ -261,6 +274,7 @@ async fn consume_messages(
let messages = std::mem::take(&mut batch);
let messages_count = messages.len();
+ metrics.increment_messages_consumed(plugin_key, messages_count as u64);
let messages_metadata = MessagesMetadata {
partition_id,
current_offset,
@@ -278,7 +292,7 @@ async fn consume_messages(
);
}
let start = Instant::now();
- if let Err(error) = process_messages(
+ let processed_count = match process_messages(
plugin_id,
messages_metadata,
&topic_metadata,
@@ -289,12 +303,17 @@ async fn consume_messages(
)
.await
{
- error!(
- "Failed to process {messages_count} messages for sink
connector with ID: {plugin_id}. {error}",
- );
- return Err(error);
- }
+ Ok(count) => count,
+ Err(error) => {
+ error!(
+ "Failed to process {messages_count} messages for sink
connector with ID: {plugin_id}. {error}",
+ );
+ metrics.increment_errors(plugin_key, ConnectorType::Sink);
+ return Err(error);
+ }
+ };
+ metrics.increment_messages_processed(plugin_key, processed_count as
u64);
let elapsed = start.elapsed();
if verbose {
info!(
@@ -341,7 +360,7 @@ async fn process_messages(
consume: &ConsumeCallback,
transforms: &Vec<Arc<dyn Transform>>,
decoder: &Arc<dyn StreamDecoder>,
-) -> Result<(), RuntimeError> {
+) -> Result<usize, RuntimeError> {
let messages = messages.into_iter().map(|message| ReceivedMessage {
id: message.header.id,
offset: message.header.offset,
@@ -453,6 +472,8 @@ async fn process_messages(
});
}
+ let processed_count = messages.len();
+
let topic_meta = postcard::to_allocvec(topic_metadata).map_err(|error| {
error!(
"Failed to serialize topic metadata for sink connector with ID:
{plugin_id}. {error}"
@@ -486,5 +507,5 @@ async fn process_messages(
messages.len(),
);
- Ok(())
+ Ok(processed_count)
}
diff --git a/core/connectors/runtime/src/source.rs
b/core/connectors/runtime/src/source.rs
index fba517989..c4f155b5a 100644
--- a/core/connectors/runtime/src/source.rs
+++ b/core/connectors/runtime/src/source.rs
@@ -39,6 +39,7 @@ use crate::configs::connectors::SourceConfig;
use crate::context::RuntimeContext;
use crate::log::LOG_CALLBACK;
use crate::manager::status::ConnectorStatus;
+use crate::metrics::ConnectorType;
use crate::{
PLUGIN_ID, RuntimeError, SourceApi, SourceConnector, SourceConnectorPlugin,
SourceConnectorProducer, SourceConnectorWrapper, resolve_plugin_path,
@@ -256,7 +257,11 @@ pub fn handle(sources: Vec<SourceConnectorWrapper>,
context: Arc<RuntimeContext>
context
.sources
- .update_status(&plugin_key, ConnectorStatus::Running)
+ .update_status(
+ &plugin_key,
+ ConnectorStatus::Running,
+ Some(&context.metrics),
+ )
.await;
let encoder = producer.encoder.clone();
let producer = &producer.producer;
@@ -269,6 +274,9 @@ pub fn handle(sources: Vec<SourceConnectorWrapper>,
context: Arc<RuntimeContext>
while let Ok(produced_messages) = receiver.recv_async().await {
let count = produced_messages.messages.len();
+ context
+ .metrics
+ .increment_messages_produced(&plugin_key, count as
u64);
if plugin.verbose {
info!("Source connector with ID: {plugin_id} received
{count} messages",);
} else {
@@ -313,6 +321,9 @@ pub fn handle(sources: Vec<SourceConnectorWrapper>,
context: Arc<RuntimeContext>
producer.topic()
);
error!(err);
+ context
+ .metrics
+ .increment_errors(&plugin_key,
ConnectorType::Source);
context.sources.set_error(&plugin_key, &err).await;
continue;
};
@@ -324,10 +335,17 @@ pub fn handle(sources: Vec<SourceConnectorWrapper>,
context: Arc<RuntimeContext>
producer.topic(),
);
error!(err);
+ context
+ .metrics
+ .increment_errors(&plugin_key,
ConnectorType::Source);
context.sources.set_error(&plugin_key, &err).await;
continue;
}
+ context
+ .metrics
+ .increment_messages_sent(&plugin_key, count as u64);
+
if plugin.verbose {
info!(
"Sent {count} messages to stream: {}, topic: {} by
source connector with ID: {plugin_id}",
@@ -365,7 +383,11 @@ pub fn handle(sources: Vec<SourceConnectorWrapper>,
context: Arc<RuntimeContext>
info!("Source connector with ID: {plugin_id} stopped.");
context
.sources
- .update_status(&plugin_key, ConnectorStatus::Stopped)
+ .update_status(
+ &plugin_key,
+ ConnectorStatus::Stopped,
+ Some(&context.metrics),
+ )
.await;
});
}
diff --git a/core/connectors/runtime/src/stats.rs
b/core/connectors/runtime/src/stats.rs
new file mode 100644
index 000000000..df2ca62ed
--- /dev/null
+++ b/core/connectors/runtime/src/stats.rs
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::context::RuntimeContext;
+use crate::metrics::ConnectorType;
+use iggy_common::IggyTimestamp;
+use serde::Serialize;
+use std::sync::Arc;
+use sysinfo::System;
+
+#[derive(Debug, Serialize)]
+pub struct ConnectorRuntimeStats {
+ pub process_id: u32,
+ pub cpu_usage: f32,
+ pub memory_usage: u64,
+ pub run_time: u64,
+ pub start_time: u64,
+ pub sources_total: u32,
+ pub sources_running: u32,
+ pub sinks_total: u32,
+ pub sinks_running: u32,
+ pub connectors: Vec<ConnectorStats>,
+}
+
+#[derive(Debug, Serialize)]
+pub struct ConnectorStats {
+ pub key: String,
+ pub name: String,
+ pub connector_type: String,
+ pub status: String,
+ pub enabled: bool,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub messages_produced: Option<u64>,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub messages_sent: Option<u64>,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub messages_consumed: Option<u64>,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub messages_processed: Option<u64>,
+ pub errors: u64,
+}
+
+pub async fn get_runtime_stats(context: &Arc<RuntimeContext>) ->
ConnectorRuntimeStats {
+ let pid = std::process::id();
+
+ let mut system = System::new();
+ system.refresh_processes(
+ sysinfo::ProcessesToUpdate::Some(&[sysinfo::Pid::from_u32(pid)]),
+ true,
+ );
+
+ let (cpu_usage, memory_usage) = system
+ .process(sysinfo::Pid::from_u32(pid))
+ .map(|p| (p.cpu_usage(), p.memory()))
+ .unwrap_or((0.0, 0));
+
+ let sources = context.sources.get_all().await;
+ let sinks = context.sinks.get_all().await;
+
+ let sources_total = context.metrics.get_sources_total();
+ let sinks_total = context.metrics.get_sinks_total();
+ let sources_running = context.metrics.get_sources_running();
+ let sinks_running = context.metrics.get_sinks_running();
+
+ let mut connectors = Vec::with_capacity(sources.len() + sinks.len());
+ for source in &sources {
+ connectors.push(ConnectorStats {
+ key: source.key.clone(),
+ name: source.name.clone(),
+ connector_type: "source".to_owned(),
+ status: source.status.to_string(),
+ enabled: source.enabled,
+ messages_produced:
Some(context.metrics.get_messages_produced(&source.key)),
+ messages_sent:
Some(context.metrics.get_messages_sent(&source.key)),
+ messages_consumed: None,
+ messages_processed: None,
+ errors: context
+ .metrics
+ .get_errors(&source.key, ConnectorType::Source),
+ });
+ }
+ for sink in &sinks {
+ connectors.push(ConnectorStats {
+ key: sink.key.clone(),
+ name: sink.name.clone(),
+ connector_type: "sink".to_owned(),
+ status: sink.status.to_string(),
+ enabled: sink.enabled,
+ messages_produced: None,
+ messages_sent: None,
+ messages_consumed:
Some(context.metrics.get_messages_consumed(&sink.key)),
+ messages_processed:
Some(context.metrics.get_messages_processed(&sink.key)),
+ errors: context.metrics.get_errors(&sink.key, ConnectorType::Sink),
+ });
+ }
+
+ let now = IggyTimestamp::now().as_micros();
+ let start = context.start_time.as_micros();
+ let run_time = now.saturating_sub(start);
+
+ ConnectorRuntimeStats {
+ process_id: pid,
+ cpu_usage,
+ memory_usage,
+ run_time,
+ start_time: start,
+ sources_total,
+ sources_running,
+ sinks_total,
+ sinks_running,
+ connectors,
+ }
+}
diff --git a/core/integration/tests/connectors/api/config.toml
b/core/integration/tests/connectors/api/config.toml
new file mode 100644
index 000000000..b36fa71c5
--- /dev/null
+++ b/core/integration/tests/connectors/api/config.toml
@@ -0,0 +1,29 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[http]
+enabled = true
+address = "0.0.0.0:0"
+api_key = "test-api-key"
+
+[http.metrics]
+enabled = true
+endpoint = "/metrics"
+
+[connectors]
+config_type = "local"
+config_dir = "tests/connectors/api/connectors"
diff --git a/core/integration/tests/connectors/api/connectors/.gitkeep
b/core/integration/tests/connectors/api/connectors/.gitkeep
new file mode 100644
index 000000000..e69de29bb
diff --git a/core/integration/tests/connectors/api/endpoints.rs
b/core/integration/tests/connectors/api/endpoints.rs
new file mode 100644
index 000000000..7bc9b7588
--- /dev/null
+++ b/core/integration/tests/connectors/api/endpoints.rs
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::connectors::api::setup;
+use reqwest::Client;
+
+const API_KEY: &str = "test-api-key";
+
+#[tokio::test]
+async fn test_root_endpoint_returns_welcome_message() {
+ let runtime = setup().await;
+ let api_address = runtime.connectors_api_address().unwrap();
+ let client = Client::new();
+
+ let response = client
+ .get(format!("{}/", api_address))
+ .send()
+ .await
+ .unwrap();
+
+ assert_eq!(response.status(), 200);
+ let body = response.text().await.unwrap();
+ assert_eq!(body, "Connector Runtime API");
+}
+
+#[tokio::test]
+async fn test_health_endpoint_returns_healthy() {
+ let runtime = setup().await;
+ let api_address = runtime.connectors_api_address().unwrap();
+ let client = Client::new();
+
+ let response = client
+ .get(format!("{}/health", api_address))
+ .send()
+ .await
+ .unwrap();
+
+ assert_eq!(response.status(), 200);
+ let body: serde_json::Value = response.json().await.unwrap();
+ assert_eq!(body["status"], "healthy");
+}
+
+#[tokio::test]
+async fn test_stats_endpoint_returns_runtime_stats() {
+ let runtime = setup().await;
+ let api_address = runtime.connectors_api_address().unwrap();
+ let client = Client::new();
+
+ let response = client
+ .get(format!("{}/stats", api_address))
+ .header("api-key", API_KEY)
+ .send()
+ .await
+ .unwrap();
+
+ assert_eq!(response.status(), 200);
+ let stats: serde_json::Value = response.json().await.unwrap();
+
+ assert!(stats.get("process_id").is_some());
+ assert!(stats.get("cpu_usage").is_some());
+ assert!(stats.get("memory_usage").is_some());
+ assert!(stats.get("run_time").is_some());
+ assert!(stats.get("start_time").is_some());
+ assert!(stats.get("sources_total").is_some());
+ assert!(stats.get("sources_running").is_some());
+ assert!(stats.get("sinks_total").is_some());
+ assert!(stats.get("sinks_running").is_some());
+ assert!(stats.get("connectors").is_some());
+
+ assert!(stats["connectors"].is_array());
+ assert_eq!(stats["sources_total"], 0);
+ assert_eq!(stats["sources_running"], 0);
+ assert_eq!(stats["sinks_total"], 0);
+ assert_eq!(stats["sinks_running"], 0);
+}
+
+#[tokio::test]
+async fn test_metrics_endpoint_returns_prometheus_format() {
+ let runtime = setup().await;
+ let api_address = runtime.connectors_api_address().unwrap();
+ let client = Client::new();
+
+ let response = client
+ .get(format!("{}/metrics", api_address))
+ .header("api-key", API_KEY)
+ .send()
+ .await
+ .unwrap();
+
+ assert_eq!(response.status(), 200);
+ let body = response.text().await.unwrap();
+
+ assert!(body.contains("iggy_connectors_sources_total"));
+ assert!(body.contains("iggy_connectors_sources_running"));
+ assert!(body.contains("iggy_connectors_sinks_total"));
+ assert!(body.contains("iggy_connectors_sinks_running"));
+}
+
+#[tokio::test]
+async fn test_sources_endpoint_returns_list() {
+ let runtime = setup().await;
+ let api_address = runtime.connectors_api_address().unwrap();
+ let client = Client::new();
+
+ let response = client
+ .get(format!("{}/sources", api_address))
+ .header("api-key", API_KEY)
+ .send()
+ .await
+ .unwrap();
+
+ assert_eq!(response.status(), 200);
+ let sources: serde_json::Value = response.json().await.unwrap();
+
+ assert!(sources.is_array());
+ assert_eq!(sources.as_array().unwrap().len(), 0);
+}
+
+#[tokio::test]
+async fn test_sinks_endpoint_returns_list() {
+ let runtime = setup().await;
+ let api_address = runtime.connectors_api_address().unwrap();
+ let client = Client::new();
+
+ let response = client
+ .get(format!("{}/sinks", api_address))
+ .header("api-key", API_KEY)
+ .send()
+ .await
+ .unwrap();
+
+ assert_eq!(response.status(), 200);
+ let sinks: serde_json::Value = response.json().await.unwrap();
+
+ assert!(sinks.is_array());
+ assert_eq!(sinks.as_array().unwrap().len(), 0);
+}
+
+#[tokio::test]
+async fn test_api_key_authentication_required() {
+ let runtime = setup().await;
+ let api_address = runtime.connectors_api_address().unwrap();
+ let client = Client::new();
+
+ let response = client
+ .get(format!("{}/stats", api_address))
+ .send()
+ .await
+ .unwrap();
+
+ assert_eq!(response.status(), 401);
+
+ let response = client
+ .get(format!("{}/metrics", api_address))
+ .send()
+ .await
+ .unwrap();
+
+ assert_eq!(response.status(), 401);
+
+ let response = client
+ .get(format!("{}/sources", api_address))
+ .send()
+ .await
+ .unwrap();
+
+ assert_eq!(response.status(), 401);
+
+ let response = client
+ .get(format!("{}/sinks", api_address))
+ .send()
+ .await
+ .unwrap();
+
+ assert_eq!(response.status(), 401);
+}
+
+#[tokio::test]
+async fn test_api_key_authentication_rejected_with_invalid_key() {
+ let runtime = setup().await;
+ let api_address = runtime.connectors_api_address().unwrap();
+ let client = Client::new();
+
+ let response = client
+ .get(format!("{}/stats", api_address))
+ .header("api-key", "invalid-key")
+ .send()
+ .await
+ .unwrap();
+
+ assert_eq!(response.status(), 401);
+
+ let response = client
+ .get(format!("{}/metrics", api_address))
+ .header("api-key", "wrong-api-key")
+ .send()
+ .await
+ .unwrap();
+
+ assert_eq!(response.status(), 401);
+}
diff --git a/core/integration/tests/connectors/api/mod.rs
b/core/integration/tests/connectors/api/mod.rs
new file mode 100644
index 000000000..7e6f5fdcd
--- /dev/null
+++ b/core/integration/tests/connectors/api/mod.rs
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::connectors::{ConnectorsRuntime, IggySetup, setup_runtime};
+
+mod endpoints;
+
+pub(crate) async fn setup() -> ConnectorsRuntime {
+ let iggy_setup = IggySetup::default();
+ let mut runtime = setup_runtime();
+ runtime.init("api/config.toml", None, iggy_setup).await;
+ runtime
+}
diff --git a/core/integration/tests/connectors/mod.rs
b/core/integration/tests/connectors/mod.rs
index ffbbf0dca..28ca40809 100644
--- a/core/integration/tests/connectors/mod.rs
+++ b/core/integration/tests/connectors/mod.rs
@@ -33,6 +33,7 @@ use integration::{
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
+mod api;
mod http_config_provider;
mod postgres;
mod random;
diff --git a/core/server/Cargo.toml b/core/server/Cargo.toml
index d311bc870..8adfda896 100644
--- a/core/server/Cargo.toml
+++ b/core/server/Cargo.toml
@@ -84,7 +84,7 @@ opentelemetry-otlp = { workspace = true }
opentelemetry-semantic-conventions = { workspace = true }
opentelemetry_sdk = { workspace = true }
papaya = "0.2.3"
-prometheus-client = "0.24.0"
+prometheus-client = { workspace = true }
rand = { workspace = true }
reqwest = { workspace = true, features = ["rustls-tls-no-provider"] }
ringbuffer = "0.16.0"