This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new 30de95ed6 feat: allow using `prometheus-client` crate with 
PrometheusClientLayer (#3134)
30de95ed6 is described below

commit 30de95ed67581cf2265c053971800aef6f7fddc7
Author: flaneur <[email protected]>
AuthorDate: Tue Sep 19 20:04:42 2023 +0800

    feat: allow using `prometheus-client` crate with PrometheusClientLayer 
(#3134)
    
    * refactor prometheus layer
    
    * add prometheus-client to deps
    
    * chore: simplify imports
    
    * refactor the metrics into a trait
    
    * feat: add implementation with prometheus-client
    
    * fix: allow using different trait
    
    * cargo fmt
    
    * refactor: add a seperate layer
    
    * fix: docs
    
    * fix typo
    
    * fix: cargo fmt
    
    * add a prefix
    
    * use structed labels
    
    * use labels in array
    
    * remove the unused metrics
    
    * rename stats to metrics
    
    * record request duration in wrapper
    
    * fix fmt
---
 Cargo.lock                           |  30 ++
 core/Cargo.toml                      |   5 +-
 core/src/layers/mod.rs               |   5 +
 core/src/layers/prometheus_client.rs | 675 +++++++++++++++++++++++++++++++++++
 4 files changed, 714 insertions(+), 1 deletion(-)

diff --git a/Cargo.lock b/Cargo.lock
index 24bc8a76f..10e8741c7 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1572,6 +1572,12 @@ version = "1.2.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "9ea835d29036a4087793836fa931b08837ad5e957da9e23886b29586fb9b6650"
 
+[[package]]
+name = "dtoa"
+version = "1.0.9"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "dcbb2bf8e87535c23f7a8a321e364ce21462d0ff10cb6407820e8e96dfff6653"
+
 [[package]]
 name = "either"
 version = "1.8.1"
@@ -3580,6 +3586,7 @@ dependencies = [
  "pin-project",
  "pretty_assertions",
  "prometheus",
+ "prometheus-client",
  "prost",
  "quick-xml",
  "rand 0.8.5",
@@ -4493,6 +4500,29 @@ dependencies = [
  "thiserror",
 ]
 
+[[package]]
+name = "prometheus-client"
+version = "0.21.2"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "3c99afa9a01501019ac3a14d71d9f94050346f55ca471ce90c799a15c58f61e2"
+dependencies = [
+ "dtoa",
+ "itoa",
+ "parking_lot 0.12.1",
+ "prometheus-client-derive-encode",
+]
+
+[[package]]
+name = "prometheus-client-derive-encode"
+version = "0.4.2"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "440f724eba9f6996b75d63681b0a92b06947f1457076d503a4d2e2c8f56442b8"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn 2.0.23",
+]
+
 [[package]]
 name = "prost"
 version = "0.11.9"
diff --git a/core/Cargo.toml b/core/Cargo.toml
index 33232299d..e09d94c51 100644
--- a/core/Cargo.toml
+++ b/core/Cargo.toml
@@ -83,8 +83,10 @@ layers-all = [
 layers-chaos = ["dep:rand"]
 # Enable layers metrics support
 layers-metrics = ["dep:metrics"]
-# Enable layers prometheus support
+# Enable layers prometheus support, with tikv/prometheus-rs crate
 layers-prometheus = ["dep:prometheus"]
+# Enable layers prometheus support, with prometheus-client crate
+layers-prometheus-client = ["dep:prometheus-client"]
 # Enable layers madsim support
 layers-madsim = ["dep:madsim"]
 # Enable layers minitrace support.
@@ -243,6 +245,7 @@ percent-encoding = "2"
 persy = { version = "1.4.4", optional = true }
 pin-project = "1"
 prometheus = { version = "0.13", features = ["process"], optional = true }
+prometheus-client = { version = "0.21.2", optional = true }
 prost = { version = "0.11", optional = true }
 quick-xml = { version = "0.29", features = ["serialize", "overlapped-lists"] }
 rand = { version = "0.8", optional = true }
diff --git a/core/src/layers/mod.rs b/core/src/layers/mod.rs
index cd9414ba6..7c064cdcf 100644
--- a/core/src/layers/mod.rs
+++ b/core/src/layers/mod.rs
@@ -56,6 +56,11 @@ mod prometheus;
 #[cfg(feature = "layers-prometheus")]
 pub use self::prometheus::PrometheusLayer;
 
+#[cfg(feature = "layers-prometheus-client")]
+mod prometheus_client;
+#[cfg(feature = "layers-prometheus-client")]
+pub use self::prometheus_client::PrometheusClientLayer;
+
 mod retry;
 pub use self::retry::RetryInterceptor;
 pub use self::retry::RetryLayer;
diff --git a/core/src/layers/prometheus_client.rs 
b/core/src/layers/prometheus_client.rs
new file mode 100644
index 000000000..b08b50a28
--- /dev/null
+++ b/core/src/layers/prometheus_client.rs
@@ -0,0 +1,675 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::fmt::Debug;
+use std::fmt::Formatter;
+use std::io;
+use std::sync::Arc;
+use std::task::Context;
+use std::task::Poll;
+use std::time::{Duration, Instant};
+
+use async_trait::async_trait;
+use bytes::Bytes;
+use futures::FutureExt;
+use futures::TryFutureExt;
+use prometheus_client::metrics::counter::Counter;
+use prometheus_client::metrics::family::Family;
+use prometheus_client::metrics::histogram;
+use prometheus_client::metrics::histogram::Histogram;
+use prometheus_client::registry::Registry;
+
+use crate::raw::Accessor;
+use crate::raw::*;
+use crate::*;
+
+/// Add [prometheus](https://docs.rs/prometheus) for every operations.
+///
+/// # Examples
+///
+/// ```
+/// use log::debug;
+/// use log::info;
+/// use opendal::layers::PrometheusClientLayer;
+/// use opendal::services;
+/// use opendal::Operator;
+/// use opendal::Result;
+///
+/// /// Visit [`opendal::services`] for more service related config.
+/// /// Visit [`opendal::Operator`] for more operator level APIs.
+/// #[tokio::main]
+/// async fn main() -> Result<()> {
+///     // Pick a builder and configure it.
+///     let builder = services::Memory::default();
+///     let mut registry = prometheus_client::registry::Registry::default();
+///
+///     let op = Operator::new(builder)
+///         .expect("must init")
+///         .layer(PrometheusClientLayer::with_registry(&mut registry))
+///         .finish();
+///     debug!("operator: {op:?}");
+///
+///     // Write data into object test.
+///     op.write("test", "Hello, World!").await?;
+///     // Read data from object.
+///     let bs = op.read("test").await?;
+///     info!("content: {}", String::from_utf8_lossy(&bs));
+///
+///     // Get object metadata.
+///     let meta = op.stat("test").await?;
+///     info!("meta: {:?}", meta);
+///
+///     // Export prometheus metrics.
+///     let mut buf = String::new();
+///     prometheus_client::encoding::text::encode(&mut buf, 
&registry).unwrap();
+///     println!("## Prometheus Metrics");
+///     println!("{}", buf);
+///     Ok(())
+/// }
+/// ```
+#[derive(Debug)]
+pub struct PrometheusClientLayer {
+    metrics: PrometheusClientMetrics,
+}
+
+impl PrometheusClientLayer {
+    /// create PrometheusClientLayer while registering itself to this registry.
+    pub fn new(registry: &mut Registry) -> Self {
+        let metrics = PrometheusClientMetrics::register(registry);
+        Self { metrics }
+    }
+}
+
+impl<A: Accessor> Layer<A> for PrometheusClientLayer {
+    type LayeredAccessor = PrometheusAccessor<A>;
+
+    fn layer(&self, inner: A) -> Self::LayeredAccessor {
+        let meta = inner.info();
+        let scheme = meta.scheme();
+
+        let metrics = Arc::new(self.metrics.clone());
+        PrometheusAccessor {
+            inner,
+            metrics,
+            scheme,
+        }
+    }
+}
+
+type OperationLabels = [(&'static str, &'static str); 2];
+type ErrorLabels = [(&'static str, &'static str); 3];
+
+/// [`PrometheusClientMetrics`] provide the performance and IO metrics with 
the `prometheus-client` crate.
+#[derive(Debug, Clone)]
+struct PrometheusClientMetrics {
+    /// Total counter of the specific operation be called.
+    requests_total: Family<OperationLabels, Counter>,
+    /// Total counter of the errors.
+    errors_total: Family<ErrorLabels, Counter>,
+    /// Latency of the specific operation be called.
+    request_duration_seconds: Family<OperationLabels, Histogram>,
+    /// The histogram of bytes
+    bytes_histogram: Family<OperationLabels, Histogram>,
+}
+
+impl PrometheusClientMetrics {
+    pub fn register(registry: &mut Registry) -> Self {
+        let requests_total = Family::default();
+        let errors_total = Family::default();
+        let request_duration_seconds = Family::<OperationLabels, 
_>::new_with_constructor(|| {
+            let buckets = histogram::exponential_buckets(0.01, 2.0, 16);
+            Histogram::new(buckets)
+        });
+        let bytes_histogram = Family::<OperationLabels, 
_>::new_with_constructor(|| {
+            let buckets = histogram::exponential_buckets(1.0, 2.0, 16);
+            Histogram::new(buckets)
+        });
+
+        registry.register("opendal_requests_total", "", 
requests_total.clone());
+        registry.register("opendal_errors_total", "", errors_total.clone());
+        registry.register(
+            "opendal_request_duration_seconds",
+            "",
+            request_duration_seconds.clone(),
+        );
+        registry.register("opendal_bytes_histogram", "", 
bytes_histogram.clone());
+        Self {
+            requests_total,
+            errors_total,
+            request_duration_seconds,
+            bytes_histogram,
+        }
+    }
+
+    fn increment_errors_total(&self, scheme: Scheme, op: Operation, err: 
ErrorKind) {
+        let labels = [
+            ("scheme", scheme.into_static()),
+            ("op", op.into_static()),
+            ("err", err.into_static()),
+        ];
+        self.errors_total.get_or_create(&labels).inc();
+    }
+
+    fn increment_request_total(&self, scheme: Scheme, op: Operation) {
+        let labels = [("scheme", scheme.into_static()), ("op", 
op.into_static())];
+        self.requests_total.get_or_create(&labels).inc();
+    }
+
+    fn observe_bytes_total(&self, scheme: Scheme, op: Operation, bytes: usize) 
{
+        let labels = [("scheme", scheme.into_static()), ("op", 
op.into_static())];
+        self.bytes_histogram
+            .get_or_create(&labels)
+            .observe(bytes as f64);
+    }
+
+    fn observe_request_duration(&self, scheme: Scheme, op: Operation, 
duration: Duration) {
+        let labels = [("scheme", scheme.into_static()), ("op", 
op.into_static())];
+        self.request_duration_seconds
+            .get_or_create(&labels)
+            .observe(duration.as_secs_f64());
+    }
+}
+
+#[derive(Clone)]
+pub struct PrometheusAccessor<A: Accessor> {
+    inner: A,
+    metrics: Arc<PrometheusClientMetrics>,
+    scheme: Scheme,
+}
+
+impl<A: Accessor> Debug for PrometheusAccessor<A> {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("PrometheusAccessor")
+            .field("inner", &self.inner)
+            .finish_non_exhaustive()
+    }
+}
+
+#[async_trait]
+impl<A: Accessor> LayeredAccessor for PrometheusAccessor<A> {
+    type Inner = A;
+    type Reader = PrometheusMetricWrapper<A::Reader>;
+    type BlockingReader = PrometheusMetricWrapper<A::BlockingReader>;
+    type Writer = PrometheusMetricWrapper<A::Writer>;
+    type BlockingWriter = PrometheusMetricWrapper<A::BlockingWriter>;
+    type Pager = A::Pager;
+    type BlockingPager = A::BlockingPager;
+
+    fn inner(&self) -> &Self::Inner {
+        &self.inner
+    }
+
+    async fn create_dir(&self, path: &str, args: OpCreateDir) -> 
Result<RpCreateDir> {
+        self.metrics
+            .increment_request_total(self.scheme, Operation::CreateDir);
+
+        let start_time = Instant::now();
+        let create_res = self.inner.create_dir(path, args).await;
+
+        self.metrics.observe_request_duration(
+            self.scheme,
+            Operation::CreateDir,
+            start_time.elapsed(),
+        );
+        create_res.map_err(|e| {
+            self.metrics
+                .increment_errors_total(self.scheme, Operation::CreateDir, 
e.kind());
+            e
+        })
+    }
+
+    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, 
Self::Reader)> {
+        self.metrics
+            .increment_request_total(self.scheme, Operation::Read);
+
+        let read_res = self
+            .inner
+            .read(path, args)
+            .map(|v| {
+                v.map(|(rp, r)| {
+                    (
+                        rp,
+                        PrometheusMetricWrapper::new(
+                            r,
+                            Operation::Read,
+                            self.metrics.clone(),
+                            self.scheme,
+                        ),
+                    )
+                })
+            })
+            .await;
+        read_res.map_err(|e| {
+            self.metrics
+                .increment_errors_total(self.scheme, Operation::Read, 
e.kind());
+            e
+        })
+    }
+
+    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
+        self.metrics
+            .increment_request_total(self.scheme, Operation::Write);
+
+        let write_res = self
+            .inner
+            .write(path, args)
+            .map(|v| {
+                v.map(|(rp, r)| {
+                    (
+                        rp,
+                        PrometheusMetricWrapper::new(
+                            r,
+                            Operation::Write,
+                            self.metrics.clone(),
+                            self.scheme,
+                        ),
+                    )
+                })
+            })
+            .await;
+
+        write_res.map_err(|e| {
+            self.metrics
+                .increment_errors_total(self.scheme, Operation::Write, 
e.kind());
+            e
+        })
+    }
+
+    async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
+        self.metrics
+            .increment_request_total(self.scheme, Operation::Stat);
+        let start_time = Instant::now();
+
+        let stat_res = self
+            .inner
+            .stat(path, args)
+            .inspect_err(|e| {
+                self.metrics
+                    .increment_errors_total(self.scheme, Operation::Stat, 
e.kind());
+            })
+            .await;
+
+        self.metrics
+            .observe_request_duration(self.scheme, Operation::Stat, 
start_time.elapsed());
+        stat_res.map_err(|e| {
+            self.metrics
+                .increment_errors_total(self.scheme, Operation::Stat, 
e.kind());
+            e
+        })
+    }
+
+    async fn delete(&self, path: &str, args: OpDelete) -> Result<RpDelete> {
+        self.metrics
+            .increment_request_total(self.scheme, Operation::Delete);
+        let start_time = Instant::now();
+
+        let delete_res = self.inner.delete(path, args).await;
+
+        self.metrics
+            .observe_request_duration(self.scheme, Operation::Delete, 
start_time.elapsed());
+        delete_res.map_err(|e| {
+            self.metrics
+                .increment_errors_total(self.scheme, Operation::Delete, 
e.kind());
+            e
+        })
+    }
+
+    async fn list(&self, path: &str, args: OpList) -> Result<(RpList, 
Self::Pager)> {
+        self.metrics
+            .increment_request_total(self.scheme, Operation::List);
+        let start_time = Instant::now();
+
+        let list_res = self.inner.list(path, args).await;
+
+        self.metrics
+            .observe_request_duration(self.scheme, Operation::List, 
start_time.elapsed());
+        list_res.map_err(|e| {
+            self.metrics
+                .increment_errors_total(self.scheme, Operation::List, 
e.kind());
+            e
+        })
+    }
+
+    async fn batch(&self, args: OpBatch) -> Result<RpBatch> {
+        self.metrics
+            .increment_request_total(self.scheme, Operation::Batch);
+        let start_time = Instant::now();
+
+        let result = self.inner.batch(args).await;
+
+        self.metrics
+            .observe_request_duration(self.scheme, Operation::Batch, 
start_time.elapsed());
+        result.map_err(|e| {
+            self.metrics
+                .increment_errors_total(self.scheme, Operation::Batch, 
e.kind());
+            e
+        })
+    }
+
+    async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
+        self.metrics
+            .increment_request_total(self.scheme, Operation::Presign);
+        let start_time = Instant::now();
+
+        let result = self.inner.presign(path, args).await;
+
+        self.metrics.observe_request_duration(
+            self.scheme,
+            Operation::Presign,
+            start_time.elapsed(),
+        );
+        result.map_err(|e| {
+            self.metrics
+                .increment_errors_total(self.scheme, Operation::Presign, 
e.kind());
+            e
+        })
+    }
+
+    fn blocking_create_dir(&self, path: &str, args: OpCreateDir) -> 
Result<RpCreateDir> {
+        self.metrics
+            .increment_request_total(self.scheme, 
Operation::BlockingCreateDir);
+        let start_time = Instant::now();
+
+        let result = self.inner.blocking_create_dir(path, args);
+
+        self.metrics.observe_request_duration(
+            self.scheme,
+            Operation::BlockingCreateDir,
+            start_time.elapsed(),
+        );
+        result.map_err(|e| {
+            self.metrics.increment_errors_total(
+                self.scheme,
+                Operation::BlockingCreateDir,
+                e.kind(),
+            );
+            e
+        })
+    }
+
+    fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, 
Self::BlockingReader)> {
+        self.metrics
+            .increment_request_total(self.scheme, Operation::BlockingRead);
+
+        let result = self.inner.blocking_read(path, args).map(|(rp, r)| {
+            (
+                rp,
+                PrometheusMetricWrapper::new(
+                    r,
+                    Operation::BlockingRead,
+                    self.metrics.clone(),
+                    self.scheme,
+                ),
+            )
+        });
+
+        result.map_err(|e| {
+            self.metrics
+                .increment_errors_total(self.scheme, Operation::BlockingRead, 
e.kind());
+            e
+        })
+    }
+
+    fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::BlockingWriter)> {
+        self.metrics
+            .increment_request_total(self.scheme, Operation::BlockingWrite);
+
+        let result = self.inner.blocking_write(path, args).map(|(rp, r)| {
+            (
+                rp,
+                PrometheusMetricWrapper::new(
+                    r,
+                    Operation::BlockingWrite,
+                    self.metrics.clone(),
+                    self.scheme,
+                ),
+            )
+        });
+
+        result.map_err(|e| {
+            self.metrics
+                .increment_errors_total(self.scheme, Operation::BlockingWrite, 
e.kind());
+            e
+        })
+    }
+
+    fn blocking_stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
+        self.metrics
+            .increment_request_total(self.scheme, Operation::BlockingStat);
+        let start_time = Instant::now();
+
+        let result = self.inner.blocking_stat(path, args);
+        self.metrics.observe_request_duration(
+            self.scheme,
+            Operation::BlockingStat,
+            start_time.elapsed(),
+        );
+
+        result.map_err(|e| {
+            self.metrics
+                .increment_errors_total(self.scheme, Operation::BlockingStat, 
e.kind());
+            e
+        })
+    }
+
+    fn blocking_delete(&self, path: &str, args: OpDelete) -> Result<RpDelete> {
+        self.metrics
+            .increment_request_total(self.scheme, Operation::BlockingDelete);
+        let start_time = Instant::now();
+
+        let result = self.inner.blocking_delete(path, args);
+
+        self.metrics.observe_request_duration(
+            self.scheme,
+            Operation::BlockingDelete,
+            start_time.elapsed(),
+        );
+        result.map_err(|e| {
+            self.metrics
+                .increment_errors_total(self.scheme, 
Operation::BlockingDelete, e.kind());
+            e
+        })
+    }
+
+    fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, 
Self::BlockingPager)> {
+        self.metrics
+            .increment_request_total(self.scheme, Operation::BlockingList);
+        let start_time = Instant::now();
+
+        let result = self.inner.blocking_list(path, args);
+
+        self.metrics.observe_request_duration(
+            self.scheme,
+            Operation::BlockingList,
+            start_time.elapsed(),
+        );
+        result.map_err(|e| {
+            self.metrics
+                .increment_errors_total(self.scheme, Operation::BlockingList, 
e.kind());
+            e
+        })
+    }
+}
+
+pub struct PrometheusMetricWrapper<R> {
+    inner: R,
+
+    op: Operation,
+    metrics: Arc<PrometheusClientMetrics>,
+    scheme: Scheme,
+    bytes_total: usize,
+    start_time: Instant,
+}
+
+impl<R> PrometheusMetricWrapper<R> {
+    fn new(inner: R, op: Operation, metrics: Arc<PrometheusClientMetrics>, 
scheme: Scheme) -> Self {
+        Self {
+            inner,
+            op,
+            metrics,
+            scheme,
+            bytes_total: 0,
+            start_time: Instant::now(),
+        }
+    }
+}
+
+impl<R: oio::Read> oio::Read for PrometheusMetricWrapper<R> {
+    fn poll_read(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> 
Poll<Result<usize>> {
+        self.inner.poll_read(cx, buf).map(|res| match res {
+            Ok(bytes) => {
+                self.bytes_total += bytes;
+                Ok(bytes)
+            }
+            Err(e) => {
+                self.metrics
+                    .increment_errors_total(self.scheme, self.op, e.kind());
+                Err(e)
+            }
+        })
+    }
+
+    fn poll_seek(&mut self, cx: &mut Context<'_>, pos: io::SeekFrom) -> 
Poll<Result<u64>> {
+        self.inner.poll_seek(cx, pos).map(|res| match res {
+            Ok(n) => Ok(n),
+            Err(e) => {
+                self.metrics
+                    .increment_errors_total(self.scheme, self.op, e.kind());
+                Err(e)
+            }
+        })
+    }
+
+    fn poll_next(&mut self, cx: &mut Context<'_>) -> 
Poll<Option<Result<Bytes>>> {
+        self.inner.poll_next(cx).map(|res| match res {
+            Some(Ok(bytes)) => {
+                self.bytes_total += bytes.len();
+                Some(Ok(bytes))
+            }
+            Some(Err(e)) => {
+                self.metrics
+                    .increment_errors_total(self.scheme, self.op, e.kind());
+                Some(Err(e))
+            }
+            None => None,
+        })
+    }
+}
+
+impl<R: oio::BlockingRead> oio::BlockingRead for PrometheusMetricWrapper<R> {
+    fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
+        self.inner
+            .read(buf)
+            .map(|n| {
+                self.bytes_total += n;
+                n
+            })
+            .map_err(|e| {
+                self.metrics
+                    .increment_errors_total(self.scheme, self.op, e.kind());
+                e
+            })
+    }
+
+    fn seek(&mut self, pos: io::SeekFrom) -> Result<u64> {
+        self.inner.seek(pos).map_err(|err| {
+            self.metrics
+                .increment_errors_total(self.scheme, self.op, err.kind());
+            err
+        })
+    }
+
+    fn next(&mut self) -> Option<Result<Bytes>> {
+        self.inner.next().map(|res| match res {
+            Ok(bytes) => {
+                self.bytes_total += bytes.len();
+                Ok(bytes)
+            }
+            Err(e) => {
+                self.metrics
+                    .increment_errors_total(self.scheme, self.op, e.kind());
+                Err(e)
+            }
+        })
+    }
+}
+
+#[async_trait]
+impl<R: oio::Write> oio::Write for PrometheusMetricWrapper<R> {
+    fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> 
Poll<Result<usize>> {
+        self.inner
+            .poll_write(cx, bs)
+            .map_ok(|n| {
+                self.bytes_total += n;
+                n
+            })
+            .map_err(|err| {
+                self.metrics
+                    .increment_errors_total(self.scheme, self.op, err.kind());
+                err
+            })
+    }
+
+    fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
+        self.inner.poll_abort(cx).map_err(|err| {
+            self.metrics
+                .increment_errors_total(self.scheme, self.op, err.kind());
+            err
+        })
+    }
+
+    fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
+        self.inner.poll_close(cx).map_err(|err| {
+            self.metrics
+                .increment_errors_total(self.scheme, self.op, err.kind());
+            err
+        })
+    }
+}
+
+impl<R: oio::BlockingWrite> oio::BlockingWrite for PrometheusMetricWrapper<R> {
+    fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
+        self.inner
+            .write(bs)
+            .map(|n| {
+                self.bytes_total += n;
+                n
+            })
+            .map_err(|err| {
+                self.metrics
+                    .increment_errors_total(self.scheme, self.op, err.kind());
+                err
+            })
+    }
+
+    fn close(&mut self) -> Result<()> {
+        self.inner.close().map_err(|err| {
+            self.metrics
+                .increment_errors_total(self.scheme, self.op, err.kind());
+            err
+        })
+    }
+}
+
+impl<R> Drop for PrometheusMetricWrapper<R> {
+    fn drop(&mut self) {
+        self.metrics
+            .observe_bytes_total(self.scheme, self.op, self.bytes_total);
+        self.metrics
+            .observe_request_duration(self.scheme, self.op, 
self.start_time.elapsed());
+    }
+}

Reply via email to