This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new 126b5d67f feat(core): Implement http related metrics support for prom 
client (#5798)
126b5d67f is described below

commit 126b5d67f7662f74793a185ac2c2b850c704ce6e
Author: Xuanwo <[email protected]>
AuthorDate: Tue Mar 18 14:56:56 2025 +0800

    feat(core): Implement http related metrics support for prom client (#5798)
    
    Signed-off-by: Xuanwo <[email protected]>
---
 core/src/layers/observe/metrics.rs   |  73 ++++++++++++++++++-
 core/src/layers/observe/mod.rs       |  16 +++--
 core/src/layers/prometheus_client.rs | 135 +++++++++++++++++++++++++++++++++++
 core/src/raw/http_util/client.rs     |   5 ++
 core/src/raw/http_util/mod.rs        |   1 +
 5 files changed, 221 insertions(+), 9 deletions(-)

diff --git a/core/src/layers/observe/metrics.rs 
b/core/src/layers/observe/metrics.rs
index 607e52905..7a73fcec1 100644
--- a/core/src/layers/observe/metrics.rs
+++ b/core/src/layers/observe/metrics.rs
@@ -67,6 +67,16 @@ pub static METRIC_OPERATION_ERRORS_TOTAL: MetricMetadata = 
MetricMetadata {
     name: "operation_errors_total",
     help: "Error counter during opendal operations",
 };
+/// The metric metadata for the http request duration in seconds.
+pub static METRIC_HTTP_REQUEST_DURATION_SECONDS: MetricMetadata = 
MetricMetadata {
+    name: "http_request_duration_seconds",
+    help: "Histogram of time spent during http requests",
+};
+/// The metric metadata for the http request bytes.
+pub static METRIC_HTTP_REQUEST_BYTES: MetricMetadata = MetricMetadata {
+    name: "http_request_bytes",
+    help: "Histogram of the bytes transferred during http requests",
+};
 
 /// The metric label for the scheme like s3, fs, cos.
 pub static LABEL_SCHEME: &str = "scheme";
@@ -111,6 +121,21 @@ pub trait MetricsIntercept: Debug + Clone + Send + Sync + 
Unpin + 'static {
         op: Operation,
         error: ErrorKind,
     );
+
+    /// Observe the http request duration in seconds.
+    fn observe_http_request_duration_seconds(
+        &self,
+        info: Arc<AccessorInfo>,
+        op: Operation,
+        duration: Duration,
+    ) {
+        let _ = (info, op, duration);
+    }
+
+    /// Observe the operation bytes happened in http request like read and 
write.
+    fn observe_http_request_bytes(&self, info: Arc<AccessorInfo>, op: 
Operation, bytes: usize) {
+        let _ = (info, op, bytes);
+    }
 }
 
 /// The metrics layer for opendal.
@@ -132,18 +157,60 @@ impl<A: Access, I: MetricsIntercept> Layer<A> for 
MetricsLayer<I> {
     fn layer(&self, inner: A) -> Self::LayeredAccess {
         let info = inner.info();
 
+        // Update http client with metrics http fetcher.
+        info.update_http_client(|client| {
+            HttpClient::with(MetricsHttpFetcher {
+                inner: client.into_inner(),
+                info: info.clone(),
+                interceptor: self.interceptor.clone(),
+            })
+        });
+
         MetricsAccessor {
-            inner: Arc::new(inner),
+            inner,
             info,
             interceptor: self.interceptor.clone(),
         }
     }
 }
 
+/// The metrics http fetcher for opendal.
+pub struct MetricsHttpFetcher<I: MetricsIntercept> {
+    inner: HttpFetcher,
+    info: Arc<AccessorInfo>,
+    interceptor: I,
+}
+
+impl<I: MetricsIntercept> HttpFetch for MetricsHttpFetcher<I> {
+    async fn fetch(&self, req: http::Request<Buffer>) -> 
Result<http::Response<HttpBody>> {
+        // Extract context from the http request.
+        let size = req.body().len();
+        let op = req.extensions().get::<Operation>().copied();
+
+        let start = Instant::now();
+        let res = self.inner.fetch(req).await;
+        let duration = start.elapsed();
+
+        // We only inject the metrics when the operation exists.
+        if let Some(op) = op {
+            if res.is_ok() {
+                self.interceptor.observe_http_request_duration_seconds(
+                    self.info.clone(),
+                    op,
+                    duration,
+                );
+                self.interceptor
+                    .observe_http_request_bytes(self.info.clone(), op, size);
+            }
+        }
+
+        res
+    }
+}
+
 /// The metrics accessor for opendal.
-#[derive(Clone)]
 pub struct MetricsAccessor<A: Access, I: MetricsIntercept> {
-    inner: Arc<A>,
+    inner: A,
     info: Arc<AccessorInfo>,
     interceptor: I,
 }
diff --git a/core/src/layers/observe/mod.rs b/core/src/layers/observe/mod.rs
index 5465bcd0a..96aca4b8d 100644
--- a/core/src/layers/observe/mod.rs
+++ b/core/src/layers/observe/mod.rs
@@ -19,15 +19,17 @@
 //!
 //! This module offers essential components to facilitate the implementation 
of observability in OpenDAL.
 //!
-//! # Prometheus Metrics
+//! # Metrics
 //!
 //! These metrics are essential for understanding the behavior and performance 
of our applications.
 //!
-//! | Metric Name                  | Type      | Description                   
                               | Labels                                         
 |
-//! 
|------------------------------|-----------|--------------------------------------------------------------|-------------------------------------------------|
-//! | operation_duration_seconds   | Histogram | Histogram of time spent 
during opendal operations            | scheme, namespace, root, operation, path 
       |
-//! | operation_bytes.             | Histogram | Histogram of the bytes 
transferred during opendal operations | scheme, operation, root, operation, 
path        |
-//! | operation_errors_total       | Counter   | Error counter during opendal 
operations                      | scheme, operation, root, operation, path, 
error |
+//! | Metric Name                   | Type      | Description                  
                                | Labels                                        
  |
+//! 
|-------------------------------|-----------|--------------------------------------------------------------|-------------------------------------------------|
+//! | operation_duration_seconds    | Histogram | Histogram of time spent 
during opendal operations            | scheme, namespace, root, operation, path 
       |
+//! | operation_bytes.              | Histogram | Histogram of the bytes 
transferred during opendal operations | scheme, namespace, root, operation, 
path        |
+//! | operation_errors_total        | Counter   | Error counter during opendal 
operations                      | scheme, namespace, root, operation, path, 
error |
+//! | http_request_duration_seconds | Histogram | Histogram of time spent 
during http requests                 | scheme, namespace, root, operation       
       |
+//! | http_request_bytes.           | Histogram | Histogram of the bytes 
transferred during http requests      | scheme, namespace, root, operation      
        |
 
 mod metrics;
 
@@ -41,6 +43,8 @@ pub use metrics::LABEL_OPERATION;
 pub use metrics::LABEL_PATH;
 pub use metrics::LABEL_ROOT;
 pub use metrics::LABEL_SCHEME;
+pub use metrics::METRIC_HTTP_REQUEST_BYTES;
+pub use metrics::METRIC_HTTP_REQUEST_DURATION_SECONDS;
 pub use metrics::METRIC_OPERATION_BYTES;
 pub use metrics::METRIC_OPERATION_DURATION_SECONDS;
 pub use metrics::METRIC_OPERATION_ERRORS_TOTAL;
diff --git a/core/src/layers/prometheus_client.rs 
b/core/src/layers/prometheus_client.rs
index 2b56b4808..de5c1f4a1 100644
--- a/core/src/layers/prometheus_client.rs
+++ b/core/src/layers/prometheus_client.rs
@@ -128,9 +128,13 @@ impl PrometheusClientLayer {
     pub fn builder() -> PrometheusClientLayerBuilder {
         let operation_duration_seconds_buckets = exponential_buckets(0.01, 
2.0, 16).collect();
         let operation_bytes_buckets = exponential_buckets(1.0, 2.0, 
16).collect();
+        let http_request_duration_seconds_buckets = exponential_buckets(0.01, 
2.0, 16).collect();
+        let http_request_bytes_buckets = exponential_buckets(1.0, 2.0, 
16).collect();
         PrometheusClientLayerBuilder::new(
             operation_duration_seconds_buckets,
             operation_bytes_buckets,
+            http_request_duration_seconds_buckets,
+            http_request_bytes_buckets,
             0,
         )
     }
@@ -148,6 +152,8 @@ impl<A: Access> Layer<A> for PrometheusClientLayer {
 pub struct PrometheusClientLayerBuilder {
     operation_duration_seconds_buckets: Vec<f64>,
     operation_bytes_buckets: Vec<f64>,
+    http_request_duration_seconds_buckets: Vec<f64>,
+    http_request_bytes_buckets: Vec<f64>,
     path_label_level: usize,
 }
 
@@ -155,11 +161,15 @@ impl PrometheusClientLayerBuilder {
     fn new(
         operation_duration_seconds_buckets: Vec<f64>,
         operation_bytes_buckets: Vec<f64>,
+        http_request_duration_seconds_buckets: Vec<f64>,
+        http_request_bytes_buckets: Vec<f64>,
         path_label_level: usize,
     ) -> Self {
         Self {
             operation_duration_seconds_buckets,
             operation_bytes_buckets,
+            http_request_duration_seconds_buckets,
+            http_request_bytes_buckets,
             path_label_level,
         }
     }
@@ -240,6 +250,82 @@ impl PrometheusClientLayerBuilder {
         self
     }
 
+    /// Set buckets for `http_request_duration_seconds` histogram.
+    ///
+    /// # Examples
+    ///
+    /// ```no_run
+    /// # use log::debug;
+    /// # use opendal::layers::PrometheusClientLayer;
+    /// # use opendal::services;
+    /// # use opendal::Operator;
+    /// # use opendal::Result;
+    ///
+    /// # #[tokio::main]
+    /// # async fn main() -> Result<()> {
+    /// // Pick a builder and configure it.
+    /// let builder = services::Memory::default();
+    /// let mut registry = prometheus_client::registry::Registry::default();
+    ///
+    /// let buckets =
+    ///     prometheus_client::metrics::histogram::exponential_buckets(0.01, 
2.0, 16).collect();
+    /// let op = Operator::new(builder)?
+    ///     .layer(
+    ///         PrometheusClientLayer::builder()
+    ///             .http_request_duration_seconds_buckets(buckets)
+    ///             .register(&mut registry),
+    ///     )
+    ///     .finish();
+    /// debug!("operator: {op:?}");
+    ///
+    /// Ok(())
+    /// # }
+    /// ```
+    pub fn http_request_duration_seconds_buckets(mut self, buckets: Vec<f64>) 
-> Self {
+        if !buckets.is_empty() {
+            self.http_request_duration_seconds_buckets = buckets;
+        }
+        self
+    }
+
+    /// Set buckets for `operation_bytes` histogram.
+    ///
+    /// # Examples
+    ///
+    /// ```no_run
+    /// # use log::debug;
+    /// # use opendal::layers::PrometheusClientLayer;
+    /// # use opendal::services;
+    /// # use opendal::Operator;
+    /// # use opendal::Result;
+    ///
+    /// # #[tokio::main]
+    /// # async fn main() -> Result<()> {
+    /// // Pick a builder and configure it.
+    /// let builder = services::Memory::default();
+    /// let mut registry = prometheus_client::registry::Registry::default();
+    ///
+    /// let buckets =
+    ///     prometheus_client::metrics::histogram::exponential_buckets(1.0, 
2.0, 16).collect();
+    /// let op = Operator::new(builder)?
+    ///     .layer(
+    ///         PrometheusClientLayer::builder()
+    ///             .http_request_bytes_buckets(buckets)
+    ///             .register(&mut registry),
+    ///     )
+    ///     .finish();
+    /// debug!("operator: {op:?}");
+    ///
+    /// Ok(())
+    /// # }
+    /// ```
+    pub fn http_request_bytes_buckets(mut self, buckets: Vec<f64>) -> Self {
+        if !buckets.is_empty() {
+            self.http_request_bytes_buckets = buckets;
+        }
+        self
+    }
+
     /// Set the level of path label.
     ///
     /// - level = 0: we will ignore the path label.
@@ -313,6 +399,14 @@ impl PrometheusClientLayerBuilder {
                 buckets: self.operation_bytes_buckets,
             });
         let operation_errors_total = Family::<OperationLabels, 
Counter>::default();
+        let http_request_duration_seconds =
+            Family::<OperationLabels, Histogram, 
_>::new_with_constructor(HistogramConstructor {
+                buckets: self.http_request_duration_seconds_buckets,
+            });
+        let http_request_bytes =
+            Family::<OperationLabels, Histogram, 
_>::new_with_constructor(HistogramConstructor {
+                buckets: self.http_request_bytes_buckets,
+            });
 
         registry.register(
             observe::METRIC_OPERATION_DURATION_SECONDS.name(),
@@ -331,12 +425,24 @@ impl PrometheusClientLayerBuilder {
             observe::METRIC_OPERATION_ERRORS_TOTAL.help(),
             operation_errors_total.clone(),
         );
+        registry.register(
+            observe::METRIC_HTTP_REQUEST_DURATION_SECONDS.name(),
+            observe::METRIC_HTTP_REQUEST_DURATION_SECONDS.help(),
+            operation_duration_seconds.clone(),
+        );
+        registry.register(
+            observe::METRIC_HTTP_REQUEST_BYTES.name(),
+            observe::METRIC_HTTP_REQUEST_BYTES.help(),
+            operation_bytes.clone(),
+        );
 
         PrometheusClientLayer {
             interceptor: PrometheusClientInterceptor {
                 operation_duration_seconds,
                 operation_bytes,
                 operation_errors_total,
+                http_request_duration_seconds,
+                http_request_bytes,
                 path_label_level: self.path_label_level,
             },
         }
@@ -359,6 +465,8 @@ pub struct PrometheusClientInterceptor {
     operation_duration_seconds: Family<OperationLabels, Histogram, 
HistogramConstructor>,
     operation_bytes: Family<OperationLabels, Histogram, HistogramConstructor>,
     operation_errors_total: Family<OperationLabels, Counter>,
+    http_request_duration_seconds: Family<OperationLabels, Histogram, 
HistogramConstructor>,
+    http_request_bytes: Family<OperationLabels, Histogram, 
HistogramConstructor>,
     path_label_level: usize,
 }
 
@@ -413,6 +521,33 @@ impl observe::MetricsIntercept for 
PrometheusClientInterceptor {
             })
             .inc();
     }
+
+    fn observe_http_request_duration_seconds(
+        &self,
+        info: Arc<AccessorInfo>,
+        op: Operation,
+        duration: Duration,
+    ) {
+        self.http_request_duration_seconds
+            .get_or_create(&OperationLabels {
+                info,
+                operation: op,
+                path: None,
+                error: None,
+            })
+            .observe(duration.as_secs_f64())
+    }
+
+    fn observe_http_request_bytes(&self, info: Arc<AccessorInfo>, op: 
Operation, bytes: usize) {
+        self.http_request_bytes
+            .get_or_create(&OperationLabels {
+                info,
+                operation: op,
+                path: None,
+                error: None,
+            })
+            .observe(bytes as f64)
+    }
 }
 
 #[derive(Clone, Debug, PartialEq, Eq, Hash)]
diff --git a/core/src/raw/http_util/client.rs b/core/src/raw/http_util/client.rs
index 117973194..1e302cc0f 100644
--- a/core/src/raw/http_util/client.rs
+++ b/core/src/raw/http_util/client.rs
@@ -82,6 +82,11 @@ impl HttpClient {
         Self { fetcher }
     }
 
+    /// Get the inner http client.
+    pub(crate) fn into_inner(self) -> HttpFetcher {
+        self.fetcher
+    }
+
     /// Build a new http client in async context.
     #[deprecated]
     pub fn build(builder: reqwest::ClientBuilder) -> Result<Self> {
diff --git a/core/src/raw/http_util/mod.rs b/core/src/raw/http_util/mod.rs
index 86ebce174..8aaab3f00 100644
--- a/core/src/raw/http_util/mod.rs
+++ b/core/src/raw/http_util/mod.rs
@@ -25,6 +25,7 @@
 mod client;
 pub use client::HttpClient;
 pub use client::HttpFetch;
+pub use client::HttpFetcher;
 
 /// temporary client used by several features
 #[allow(unused_imports)]

Reply via email to