This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git
The following commit(s) were added to refs/heads/main by this push:
new 126b5d67f feat(core): Implement http related metrics support for prom
client (#5798)
126b5d67f is described below
commit 126b5d67f7662f74793a185ac2c2b850c704ce6e
Author: Xuanwo <[email protected]>
AuthorDate: Tue Mar 18 14:56:56 2025 +0800
feat(core): Implement http related metrics support for prom client (#5798)
Signed-off-by: Xuanwo <[email protected]>
---
core/src/layers/observe/metrics.rs | 73 ++++++++++++++++++-
core/src/layers/observe/mod.rs | 16 +++--
core/src/layers/prometheus_client.rs | 135 +++++++++++++++++++++++++++++++++++
core/src/raw/http_util/client.rs | 5 ++
core/src/raw/http_util/mod.rs | 1 +
5 files changed, 221 insertions(+), 9 deletions(-)
diff --git a/core/src/layers/observe/metrics.rs
b/core/src/layers/observe/metrics.rs
index 607e52905..7a73fcec1 100644
--- a/core/src/layers/observe/metrics.rs
+++ b/core/src/layers/observe/metrics.rs
@@ -67,6 +67,16 @@ pub static METRIC_OPERATION_ERRORS_TOTAL: MetricMetadata =
MetricMetadata {
name: "operation_errors_total",
help: "Error counter during opendal operations",
};
+/// The metric metadata for the http request duration in seconds.
+pub static METRIC_HTTP_REQUEST_DURATION_SECONDS: MetricMetadata =
MetricMetadata {
+ name: "http_request_duration_seconds",
+ help: "Histogram of time spent during http requests",
+};
+/// The metric metadata for the http request bytes.
+pub static METRIC_HTTP_REQUEST_BYTES: MetricMetadata = MetricMetadata {
+ name: "http_request_bytes",
+ help: "Histogram of the bytes transferred during http requests",
+};
/// The metric label for the scheme like s3, fs, cos.
pub static LABEL_SCHEME: &str = "scheme";
@@ -111,6 +121,21 @@ pub trait MetricsIntercept: Debug + Clone + Send + Sync +
Unpin + 'static {
op: Operation,
error: ErrorKind,
);
+
+ /// Observe the http request duration in seconds.
+ fn observe_http_request_duration_seconds(
+ &self,
+ info: Arc<AccessorInfo>,
+ op: Operation,
+ duration: Duration,
+ ) {
+ let _ = (info, op, duration);
+ }
+
+ /// Observe the operation bytes happened in http request like read and
write.
+ fn observe_http_request_bytes(&self, info: Arc<AccessorInfo>, op:
Operation, bytes: usize) {
+ let _ = (info, op, bytes);
+ }
}
/// The metrics layer for opendal.
@@ -132,18 +157,60 @@ impl<A: Access, I: MetricsIntercept> Layer<A> for
MetricsLayer<I> {
fn layer(&self, inner: A) -> Self::LayeredAccess {
let info = inner.info();
+ // Update http client with metrics http fetcher.
+ info.update_http_client(|client| {
+ HttpClient::with(MetricsHttpFetcher {
+ inner: client.into_inner(),
+ info: info.clone(),
+ interceptor: self.interceptor.clone(),
+ })
+ });
+
MetricsAccessor {
- inner: Arc::new(inner),
+ inner,
info,
interceptor: self.interceptor.clone(),
}
}
}
+/// The metrics http fetcher for opendal.
+pub struct MetricsHttpFetcher<I: MetricsIntercept> {
+ inner: HttpFetcher,
+ info: Arc<AccessorInfo>,
+ interceptor: I,
+}
+
+impl<I: MetricsIntercept> HttpFetch for MetricsHttpFetcher<I> {
+ async fn fetch(&self, req: http::Request<Buffer>) ->
Result<http::Response<HttpBody>> {
+ // Extract context from the http request.
+ let size = req.body().len();
+ let op = req.extensions().get::<Operation>().copied();
+
+ let start = Instant::now();
+ let res = self.inner.fetch(req).await;
+ let duration = start.elapsed();
+
+ // We only inject the metrics when the operation exists.
+ if let Some(op) = op {
+ if res.is_ok() {
+ self.interceptor.observe_http_request_duration_seconds(
+ self.info.clone(),
+ op,
+ duration,
+ );
+ self.interceptor
+ .observe_http_request_bytes(self.info.clone(), op, size);
+ }
+ }
+
+ res
+ }
+}
+
/// The metrics accessor for opendal.
-#[derive(Clone)]
pub struct MetricsAccessor<A: Access, I: MetricsIntercept> {
- inner: Arc<A>,
+ inner: A,
info: Arc<AccessorInfo>,
interceptor: I,
}
diff --git a/core/src/layers/observe/mod.rs b/core/src/layers/observe/mod.rs
index 5465bcd0a..96aca4b8d 100644
--- a/core/src/layers/observe/mod.rs
+++ b/core/src/layers/observe/mod.rs
@@ -19,15 +19,17 @@
//!
//! This module offers essential components to facilitate the implementation
of observability in OpenDAL.
//!
-//! # Prometheus Metrics
+//! # Metrics
//!
//! These metrics are essential for understanding the behavior and performance
of our applications.
//!
-//! | Metric Name | Type | Description
| Labels
|
-//!
|------------------------------|-----------|--------------------------------------------------------------|-------------------------------------------------|
-//! | operation_duration_seconds | Histogram | Histogram of time spent
during opendal operations | scheme, namespace, root, operation, path
|
-//! | operation_bytes. | Histogram | Histogram of the bytes
transferred during opendal operations | scheme, operation, root, operation,
path |
-//! | operation_errors_total | Counter | Error counter during opendal
operations | scheme, operation, root, operation, path,
error |
+//! | Metric Name | Type | Description
| Labels
|
+//!
|-------------------------------|-----------|--------------------------------------------------------------|-------------------------------------------------|
+//! | operation_duration_seconds | Histogram | Histogram of time spent
during opendal operations | scheme, namespace, root, operation, path
|
+//! | operation_bytes. | Histogram | Histogram of the bytes
transferred during opendal operations | scheme, namespace, root, operation,
path |
+//! | operation_errors_total | Counter | Error counter during opendal
operations | scheme, namespace, root, operation, path,
error |
+//! | http_request_duration_seconds | Histogram | Histogram of time spent
during http requests | scheme, namespace, root, operation
|
+//! | http_request_bytes. | Histogram | Histogram of the bytes
transferred during http requests | scheme, namespace, root, operation
|
mod metrics;
@@ -41,6 +43,8 @@ pub use metrics::LABEL_OPERATION;
pub use metrics::LABEL_PATH;
pub use metrics::LABEL_ROOT;
pub use metrics::LABEL_SCHEME;
+pub use metrics::METRIC_HTTP_REQUEST_BYTES;
+pub use metrics::METRIC_HTTP_REQUEST_DURATION_SECONDS;
pub use metrics::METRIC_OPERATION_BYTES;
pub use metrics::METRIC_OPERATION_DURATION_SECONDS;
pub use metrics::METRIC_OPERATION_ERRORS_TOTAL;
diff --git a/core/src/layers/prometheus_client.rs
b/core/src/layers/prometheus_client.rs
index 2b56b4808..de5c1f4a1 100644
--- a/core/src/layers/prometheus_client.rs
+++ b/core/src/layers/prometheus_client.rs
@@ -128,9 +128,13 @@ impl PrometheusClientLayer {
pub fn builder() -> PrometheusClientLayerBuilder {
let operation_duration_seconds_buckets = exponential_buckets(0.01,
2.0, 16).collect();
let operation_bytes_buckets = exponential_buckets(1.0, 2.0,
16).collect();
+ let http_request_duration_seconds_buckets = exponential_buckets(0.01,
2.0, 16).collect();
+ let http_request_bytes_buckets = exponential_buckets(1.0, 2.0,
16).collect();
PrometheusClientLayerBuilder::new(
operation_duration_seconds_buckets,
operation_bytes_buckets,
+ http_request_duration_seconds_buckets,
+ http_request_bytes_buckets,
0,
)
}
@@ -148,6 +152,8 @@ impl<A: Access> Layer<A> for PrometheusClientLayer {
pub struct PrometheusClientLayerBuilder {
operation_duration_seconds_buckets: Vec<f64>,
operation_bytes_buckets: Vec<f64>,
+ http_request_duration_seconds_buckets: Vec<f64>,
+ http_request_bytes_buckets: Vec<f64>,
path_label_level: usize,
}
@@ -155,11 +161,15 @@ impl PrometheusClientLayerBuilder {
fn new(
operation_duration_seconds_buckets: Vec<f64>,
operation_bytes_buckets: Vec<f64>,
+ http_request_duration_seconds_buckets: Vec<f64>,
+ http_request_bytes_buckets: Vec<f64>,
path_label_level: usize,
) -> Self {
Self {
operation_duration_seconds_buckets,
operation_bytes_buckets,
+ http_request_duration_seconds_buckets,
+ http_request_bytes_buckets,
path_label_level,
}
}
@@ -240,6 +250,82 @@ impl PrometheusClientLayerBuilder {
self
}
+ /// Set buckets for `http_request_duration_seconds` histogram.
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// # use log::debug;
+ /// # use opendal::layers::PrometheusClientLayer;
+ /// # use opendal::services;
+ /// # use opendal::Operator;
+ /// # use opendal::Result;
+ ///
+ /// # #[tokio::main]
+ /// # async fn main() -> Result<()> {
+ /// // Pick a builder and configure it.
+ /// let builder = services::Memory::default();
+ /// let mut registry = prometheus_client::registry::Registry::default();
+ ///
+ /// let buckets =
+ /// prometheus_client::metrics::histogram::exponential_buckets(0.01,
2.0, 16).collect();
+ /// let op = Operator::new(builder)?
+ /// .layer(
+ /// PrometheusClientLayer::builder()
+ /// .http_request_duration_seconds_buckets(buckets)
+ /// .register(&mut registry),
+ /// )
+ /// .finish();
+ /// debug!("operator: {op:?}");
+ ///
+ /// Ok(())
+ /// # }
+ /// ```
+ pub fn http_request_duration_seconds_buckets(mut self, buckets: Vec<f64>)
-> Self {
+ if !buckets.is_empty() {
+ self.http_request_duration_seconds_buckets = buckets;
+ }
+ self
+ }
+
+ /// Set buckets for `operation_bytes` histogram.
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// # use log::debug;
+ /// # use opendal::layers::PrometheusClientLayer;
+ /// # use opendal::services;
+ /// # use opendal::Operator;
+ /// # use opendal::Result;
+ ///
+ /// # #[tokio::main]
+ /// # async fn main() -> Result<()> {
+ /// // Pick a builder and configure it.
+ /// let builder = services::Memory::default();
+ /// let mut registry = prometheus_client::registry::Registry::default();
+ ///
+ /// let buckets =
+ /// prometheus_client::metrics::histogram::exponential_buckets(1.0,
2.0, 16).collect();
+ /// let op = Operator::new(builder)?
+ /// .layer(
+ /// PrometheusClientLayer::builder()
+ /// .http_request_bytes_buckets(buckets)
+ /// .register(&mut registry),
+ /// )
+ /// .finish();
+ /// debug!("operator: {op:?}");
+ ///
+ /// Ok(())
+ /// # }
+ /// ```
+ pub fn http_request_bytes_buckets(mut self, buckets: Vec<f64>) -> Self {
+ if !buckets.is_empty() {
+ self.http_request_bytes_buckets = buckets;
+ }
+ self
+ }
+
/// Set the level of path label.
///
/// - level = 0: we will ignore the path label.
@@ -313,6 +399,14 @@ impl PrometheusClientLayerBuilder {
buckets: self.operation_bytes_buckets,
});
let operation_errors_total = Family::<OperationLabels,
Counter>::default();
+ let http_request_duration_seconds =
+ Family::<OperationLabels, Histogram,
_>::new_with_constructor(HistogramConstructor {
+ buckets: self.http_request_duration_seconds_buckets,
+ });
+ let http_request_bytes =
+ Family::<OperationLabels, Histogram,
_>::new_with_constructor(HistogramConstructor {
+ buckets: self.http_request_bytes_buckets,
+ });
registry.register(
observe::METRIC_OPERATION_DURATION_SECONDS.name(),
@@ -331,12 +425,24 @@ impl PrometheusClientLayerBuilder {
observe::METRIC_OPERATION_ERRORS_TOTAL.help(),
operation_errors_total.clone(),
);
+ registry.register(
+ observe::METRIC_HTTP_REQUEST_DURATION_SECONDS.name(),
+ observe::METRIC_HTTP_REQUEST_DURATION_SECONDS.help(),
+ operation_duration_seconds.clone(),
+ );
+ registry.register(
+ observe::METRIC_HTTP_REQUEST_BYTES.name(),
+ observe::METRIC_HTTP_REQUEST_BYTES.help(),
+ operation_bytes.clone(),
+ );
PrometheusClientLayer {
interceptor: PrometheusClientInterceptor {
operation_duration_seconds,
operation_bytes,
operation_errors_total,
+ http_request_duration_seconds,
+ http_request_bytes,
path_label_level: self.path_label_level,
},
}
@@ -359,6 +465,8 @@ pub struct PrometheusClientInterceptor {
operation_duration_seconds: Family<OperationLabels, Histogram,
HistogramConstructor>,
operation_bytes: Family<OperationLabels, Histogram, HistogramConstructor>,
operation_errors_total: Family<OperationLabels, Counter>,
+ http_request_duration_seconds: Family<OperationLabels, Histogram,
HistogramConstructor>,
+ http_request_bytes: Family<OperationLabels, Histogram,
HistogramConstructor>,
path_label_level: usize,
}
@@ -413,6 +521,33 @@ impl observe::MetricsIntercept for
PrometheusClientInterceptor {
})
.inc();
}
+
+ fn observe_http_request_duration_seconds(
+ &self,
+ info: Arc<AccessorInfo>,
+ op: Operation,
+ duration: Duration,
+ ) {
+ self.http_request_duration_seconds
+ .get_or_create(&OperationLabels {
+ info,
+ operation: op,
+ path: None,
+ error: None,
+ })
+ .observe(duration.as_secs_f64())
+ }
+
+ fn observe_http_request_bytes(&self, info: Arc<AccessorInfo>, op:
Operation, bytes: usize) {
+ self.http_request_bytes
+ .get_or_create(&OperationLabels {
+ info,
+ operation: op,
+ path: None,
+ error: None,
+ })
+ .observe(bytes as f64)
+ }
}
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
diff --git a/core/src/raw/http_util/client.rs b/core/src/raw/http_util/client.rs
index 117973194..1e302cc0f 100644
--- a/core/src/raw/http_util/client.rs
+++ b/core/src/raw/http_util/client.rs
@@ -82,6 +82,11 @@ impl HttpClient {
Self { fetcher }
}
+ /// Get the inner http client.
+ pub(crate) fn into_inner(self) -> HttpFetcher {
+ self.fetcher
+ }
+
/// Build a new http client in async context.
#[deprecated]
pub fn build(builder: reqwest::ClientBuilder) -> Result<Self> {
diff --git a/core/src/raw/http_util/mod.rs b/core/src/raw/http_util/mod.rs
index 86ebce174..8aaab3f00 100644
--- a/core/src/raw/http_util/mod.rs
+++ b/core/src/raw/http_util/mod.rs
@@ -25,6 +25,7 @@
mod client;
pub use client::HttpClient;
pub use client::HttpFetch;
+pub use client::HttpFetcher;
/// temporary client used by several features
#[allow(unused_imports)]