This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
The following commit(s) were added to refs/heads/main by this push:
new 30de95ed6 feat: allow using `prometheus-client` crate with
PrometheusClientLayer (#3134)
30de95ed6 is described below
commit 30de95ed67581cf2265c053971800aef6f7fddc7
Author: flaneur <[email protected]>
AuthorDate: Tue Sep 19 20:04:42 2023 +0800
feat: allow using `prometheus-client` crate with PrometheusClientLayer
(#3134)
* refactor prometheus layer
* add prometheus-client to deps
* chore: simplify imports
* refactor the metrics into a trait
* feat: add implementation with prometheus-client
* fix: allow using different trait
* cargo fmt
* refactor: add a seperate layer
* fix: docs
* fix typo
* fix: cargo fmt
* add a prefix
* use structed labels
* use labels in array
* remove the unused metrics
* rename stats to metrics
* record request duration in wrapper
* fix fmt
---
Cargo.lock | 30 ++
core/Cargo.toml | 5 +-
core/src/layers/mod.rs | 5 +
core/src/layers/prometheus_client.rs | 675 +++++++++++++++++++++++++++++++++++
4 files changed, 714 insertions(+), 1 deletion(-)
diff --git a/Cargo.lock b/Cargo.lock
index 24bc8a76f..10e8741c7 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1572,6 +1572,12 @@ version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9ea835d29036a4087793836fa931b08837ad5e957da9e23886b29586fb9b6650"
+[[package]]
+name = "dtoa"
+version = "1.0.9"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "dcbb2bf8e87535c23f7a8a321e364ce21462d0ff10cb6407820e8e96dfff6653"
+
[[package]]
name = "either"
version = "1.8.1"
@@ -3580,6 +3586,7 @@ dependencies = [
"pin-project",
"pretty_assertions",
"prometheus",
+ "prometheus-client",
"prost",
"quick-xml",
"rand 0.8.5",
@@ -4493,6 +4500,29 @@ dependencies = [
"thiserror",
]
+[[package]]
+name = "prometheus-client"
+version = "0.21.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "3c99afa9a01501019ac3a14d71d9f94050346f55ca471ce90c799a15c58f61e2"
+dependencies = [
+ "dtoa",
+ "itoa",
+ "parking_lot 0.12.1",
+ "prometheus-client-derive-encode",
+]
+
+[[package]]
+name = "prometheus-client-derive-encode"
+version = "0.4.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "440f724eba9f6996b75d63681b0a92b06947f1457076d503a4d2e2c8f56442b8"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn 2.0.23",
+]
+
[[package]]
name = "prost"
version = "0.11.9"
diff --git a/core/Cargo.toml b/core/Cargo.toml
index 33232299d..e09d94c51 100644
--- a/core/Cargo.toml
+++ b/core/Cargo.toml
@@ -83,8 +83,10 @@ layers-all = [
layers-chaos = ["dep:rand"]
# Enable layers metrics support
layers-metrics = ["dep:metrics"]
-# Enable layers prometheus support
+# Enable layers prometheus support, with tikv/prometheus-rs crate
layers-prometheus = ["dep:prometheus"]
+# Enable layers prometheus support, with prometheus-client crate
+layers-prometheus-client = ["dep:prometheus-client"]
# Enable layers madsim support
layers-madsim = ["dep:madsim"]
# Enable layers minitrace support.
@@ -243,6 +245,7 @@ percent-encoding = "2"
persy = { version = "1.4.4", optional = true }
pin-project = "1"
prometheus = { version = "0.13", features = ["process"], optional = true }
+prometheus-client = { version = "0.21.2", optional = true }
prost = { version = "0.11", optional = true }
quick-xml = { version = "0.29", features = ["serialize", "overlapped-lists"] }
rand = { version = "0.8", optional = true }
diff --git a/core/src/layers/mod.rs b/core/src/layers/mod.rs
index cd9414ba6..7c064cdcf 100644
--- a/core/src/layers/mod.rs
+++ b/core/src/layers/mod.rs
@@ -56,6 +56,11 @@ mod prometheus;
#[cfg(feature = "layers-prometheus")]
pub use self::prometheus::PrometheusLayer;
+#[cfg(feature = "layers-prometheus-client")]
+mod prometheus_client;
+#[cfg(feature = "layers-prometheus-client")]
+pub use self::prometheus_client::PrometheusClientLayer;
+
mod retry;
pub use self::retry::RetryInterceptor;
pub use self::retry::RetryLayer;
diff --git a/core/src/layers/prometheus_client.rs
b/core/src/layers/prometheus_client.rs
new file mode 100644
index 000000000..b08b50a28
--- /dev/null
+++ b/core/src/layers/prometheus_client.rs
@@ -0,0 +1,675 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::fmt::Debug;
+use std::fmt::Formatter;
+use std::io;
+use std::sync::Arc;
+use std::task::Context;
+use std::task::Poll;
+use std::time::{Duration, Instant};
+
+use async_trait::async_trait;
+use bytes::Bytes;
+use futures::FutureExt;
+use futures::TryFutureExt;
+use prometheus_client::metrics::counter::Counter;
+use prometheus_client::metrics::family::Family;
+use prometheus_client::metrics::histogram;
+use prometheus_client::metrics::histogram::Histogram;
+use prometheus_client::registry::Registry;
+
+use crate::raw::Accessor;
+use crate::raw::*;
+use crate::*;
+
+/// Add [prometheus](https://docs.rs/prometheus) for every operations.
+///
+/// # Examples
+///
+/// ```
+/// use log::debug;
+/// use log::info;
+/// use opendal::layers::PrometheusClientLayer;
+/// use opendal::services;
+/// use opendal::Operator;
+/// use opendal::Result;
+///
+/// /// Visit [`opendal::services`] for more service related config.
+/// /// Visit [`opendal::Operator`] for more operator level APIs.
+/// #[tokio::main]
+/// async fn main() -> Result<()> {
+/// // Pick a builder and configure it.
+/// let builder = services::Memory::default();
+/// let mut registry = prometheus_client::registry::Registry::default();
+///
+/// let op = Operator::new(builder)
+/// .expect("must init")
+/// .layer(PrometheusClientLayer::with_registry(&mut registry))
+/// .finish();
+/// debug!("operator: {op:?}");
+///
+/// // Write data into object test.
+/// op.write("test", "Hello, World!").await?;
+/// // Read data from object.
+/// let bs = op.read("test").await?;
+/// info!("content: {}", String::from_utf8_lossy(&bs));
+///
+/// // Get object metadata.
+/// let meta = op.stat("test").await?;
+/// info!("meta: {:?}", meta);
+///
+/// // Export prometheus metrics.
+/// let mut buf = String::new();
+/// prometheus_client::encoding::text::encode(&mut buf,
®istry).unwrap();
+/// println!("## Prometheus Metrics");
+/// println!("{}", buf);
+/// Ok(())
+/// }
+/// ```
+#[derive(Debug)]
+pub struct PrometheusClientLayer {
+ metrics: PrometheusClientMetrics,
+}
+
+impl PrometheusClientLayer {
+ /// create PrometheusClientLayer while registering itself to this registry.
+ pub fn new(registry: &mut Registry) -> Self {
+ let metrics = PrometheusClientMetrics::register(registry);
+ Self { metrics }
+ }
+}
+
+impl<A: Accessor> Layer<A> for PrometheusClientLayer {
+ type LayeredAccessor = PrometheusAccessor<A>;
+
+ fn layer(&self, inner: A) -> Self::LayeredAccessor {
+ let meta = inner.info();
+ let scheme = meta.scheme();
+
+ let metrics = Arc::new(self.metrics.clone());
+ PrometheusAccessor {
+ inner,
+ metrics,
+ scheme,
+ }
+ }
+}
+
+type OperationLabels = [(&'static str, &'static str); 2];
+type ErrorLabels = [(&'static str, &'static str); 3];
+
+/// [`PrometheusClientMetrics`] provide the performance and IO metrics with
the `prometheus-client` crate.
+#[derive(Debug, Clone)]
+struct PrometheusClientMetrics {
+ /// Total counter of the specific operation be called.
+ requests_total: Family<OperationLabels, Counter>,
+ /// Total counter of the errors.
+ errors_total: Family<ErrorLabels, Counter>,
+ /// Latency of the specific operation be called.
+ request_duration_seconds: Family<OperationLabels, Histogram>,
+ /// The histogram of bytes
+ bytes_histogram: Family<OperationLabels, Histogram>,
+}
+
+impl PrometheusClientMetrics {
+ pub fn register(registry: &mut Registry) -> Self {
+ let requests_total = Family::default();
+ let errors_total = Family::default();
+ let request_duration_seconds = Family::<OperationLabels,
_>::new_with_constructor(|| {
+ let buckets = histogram::exponential_buckets(0.01, 2.0, 16);
+ Histogram::new(buckets)
+ });
+ let bytes_histogram = Family::<OperationLabels,
_>::new_with_constructor(|| {
+ let buckets = histogram::exponential_buckets(1.0, 2.0, 16);
+ Histogram::new(buckets)
+ });
+
+ registry.register("opendal_requests_total", "",
requests_total.clone());
+ registry.register("opendal_errors_total", "", errors_total.clone());
+ registry.register(
+ "opendal_request_duration_seconds",
+ "",
+ request_duration_seconds.clone(),
+ );
+ registry.register("opendal_bytes_histogram", "",
bytes_histogram.clone());
+ Self {
+ requests_total,
+ errors_total,
+ request_duration_seconds,
+ bytes_histogram,
+ }
+ }
+
+ fn increment_errors_total(&self, scheme: Scheme, op: Operation, err:
ErrorKind) {
+ let labels = [
+ ("scheme", scheme.into_static()),
+ ("op", op.into_static()),
+ ("err", err.into_static()),
+ ];
+ self.errors_total.get_or_create(&labels).inc();
+ }
+
+ fn increment_request_total(&self, scheme: Scheme, op: Operation) {
+ let labels = [("scheme", scheme.into_static()), ("op",
op.into_static())];
+ self.requests_total.get_or_create(&labels).inc();
+ }
+
+ fn observe_bytes_total(&self, scheme: Scheme, op: Operation, bytes: usize)
{
+ let labels = [("scheme", scheme.into_static()), ("op",
op.into_static())];
+ self.bytes_histogram
+ .get_or_create(&labels)
+ .observe(bytes as f64);
+ }
+
+ fn observe_request_duration(&self, scheme: Scheme, op: Operation,
duration: Duration) {
+ let labels = [("scheme", scheme.into_static()), ("op",
op.into_static())];
+ self.request_duration_seconds
+ .get_or_create(&labels)
+ .observe(duration.as_secs_f64());
+ }
+}
+
+#[derive(Clone)]
+pub struct PrometheusAccessor<A: Accessor> {
+ inner: A,
+ metrics: Arc<PrometheusClientMetrics>,
+ scheme: Scheme,
+}
+
+impl<A: Accessor> Debug for PrometheusAccessor<A> {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("PrometheusAccessor")
+ .field("inner", &self.inner)
+ .finish_non_exhaustive()
+ }
+}
+
+#[async_trait]
+impl<A: Accessor> LayeredAccessor for PrometheusAccessor<A> {
+ type Inner = A;
+ type Reader = PrometheusMetricWrapper<A::Reader>;
+ type BlockingReader = PrometheusMetricWrapper<A::BlockingReader>;
+ type Writer = PrometheusMetricWrapper<A::Writer>;
+ type BlockingWriter = PrometheusMetricWrapper<A::BlockingWriter>;
+ type Pager = A::Pager;
+ type BlockingPager = A::BlockingPager;
+
+ fn inner(&self) -> &Self::Inner {
+ &self.inner
+ }
+
+ async fn create_dir(&self, path: &str, args: OpCreateDir) ->
Result<RpCreateDir> {
+ self.metrics
+ .increment_request_total(self.scheme, Operation::CreateDir);
+
+ let start_time = Instant::now();
+ let create_res = self.inner.create_dir(path, args).await;
+
+ self.metrics.observe_request_duration(
+ self.scheme,
+ Operation::CreateDir,
+ start_time.elapsed(),
+ );
+ create_res.map_err(|e| {
+ self.metrics
+ .increment_errors_total(self.scheme, Operation::CreateDir,
e.kind());
+ e
+ })
+ }
+
+ async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead,
Self::Reader)> {
+ self.metrics
+ .increment_request_total(self.scheme, Operation::Read);
+
+ let read_res = self
+ .inner
+ .read(path, args)
+ .map(|v| {
+ v.map(|(rp, r)| {
+ (
+ rp,
+ PrometheusMetricWrapper::new(
+ r,
+ Operation::Read,
+ self.metrics.clone(),
+ self.scheme,
+ ),
+ )
+ })
+ })
+ .await;
+ read_res.map_err(|e| {
+ self.metrics
+ .increment_errors_total(self.scheme, Operation::Read,
e.kind());
+ e
+ })
+ }
+
+ async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite,
Self::Writer)> {
+ self.metrics
+ .increment_request_total(self.scheme, Operation::Write);
+
+ let write_res = self
+ .inner
+ .write(path, args)
+ .map(|v| {
+ v.map(|(rp, r)| {
+ (
+ rp,
+ PrometheusMetricWrapper::new(
+ r,
+ Operation::Write,
+ self.metrics.clone(),
+ self.scheme,
+ ),
+ )
+ })
+ })
+ .await;
+
+ write_res.map_err(|e| {
+ self.metrics
+ .increment_errors_total(self.scheme, Operation::Write,
e.kind());
+ e
+ })
+ }
+
+ async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
+ self.metrics
+ .increment_request_total(self.scheme, Operation::Stat);
+ let start_time = Instant::now();
+
+ let stat_res = self
+ .inner
+ .stat(path, args)
+ .inspect_err(|e| {
+ self.metrics
+ .increment_errors_total(self.scheme, Operation::Stat,
e.kind());
+ })
+ .await;
+
+ self.metrics
+ .observe_request_duration(self.scheme, Operation::Stat,
start_time.elapsed());
+ stat_res.map_err(|e| {
+ self.metrics
+ .increment_errors_total(self.scheme, Operation::Stat,
e.kind());
+ e
+ })
+ }
+
+ async fn delete(&self, path: &str, args: OpDelete) -> Result<RpDelete> {
+ self.metrics
+ .increment_request_total(self.scheme, Operation::Delete);
+ let start_time = Instant::now();
+
+ let delete_res = self.inner.delete(path, args).await;
+
+ self.metrics
+ .observe_request_duration(self.scheme, Operation::Delete,
start_time.elapsed());
+ delete_res.map_err(|e| {
+ self.metrics
+ .increment_errors_total(self.scheme, Operation::Delete,
e.kind());
+ e
+ })
+ }
+
+ async fn list(&self, path: &str, args: OpList) -> Result<(RpList,
Self::Pager)> {
+ self.metrics
+ .increment_request_total(self.scheme, Operation::List);
+ let start_time = Instant::now();
+
+ let list_res = self.inner.list(path, args).await;
+
+ self.metrics
+ .observe_request_duration(self.scheme, Operation::List,
start_time.elapsed());
+ list_res.map_err(|e| {
+ self.metrics
+ .increment_errors_total(self.scheme, Operation::List,
e.kind());
+ e
+ })
+ }
+
+ async fn batch(&self, args: OpBatch) -> Result<RpBatch> {
+ self.metrics
+ .increment_request_total(self.scheme, Operation::Batch);
+ let start_time = Instant::now();
+
+ let result = self.inner.batch(args).await;
+
+ self.metrics
+ .observe_request_duration(self.scheme, Operation::Batch,
start_time.elapsed());
+ result.map_err(|e| {
+ self.metrics
+ .increment_errors_total(self.scheme, Operation::Batch,
e.kind());
+ e
+ })
+ }
+
+ async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
+ self.metrics
+ .increment_request_total(self.scheme, Operation::Presign);
+ let start_time = Instant::now();
+
+ let result = self.inner.presign(path, args).await;
+
+ self.metrics.observe_request_duration(
+ self.scheme,
+ Operation::Presign,
+ start_time.elapsed(),
+ );
+ result.map_err(|e| {
+ self.metrics
+ .increment_errors_total(self.scheme, Operation::Presign,
e.kind());
+ e
+ })
+ }
+
+ fn blocking_create_dir(&self, path: &str, args: OpCreateDir) ->
Result<RpCreateDir> {
+ self.metrics
+ .increment_request_total(self.scheme,
Operation::BlockingCreateDir);
+ let start_time = Instant::now();
+
+ let result = self.inner.blocking_create_dir(path, args);
+
+ self.metrics.observe_request_duration(
+ self.scheme,
+ Operation::BlockingCreateDir,
+ start_time.elapsed(),
+ );
+ result.map_err(|e| {
+ self.metrics.increment_errors_total(
+ self.scheme,
+ Operation::BlockingCreateDir,
+ e.kind(),
+ );
+ e
+ })
+ }
+
+ fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead,
Self::BlockingReader)> {
+ self.metrics
+ .increment_request_total(self.scheme, Operation::BlockingRead);
+
+ let result = self.inner.blocking_read(path, args).map(|(rp, r)| {
+ (
+ rp,
+ PrometheusMetricWrapper::new(
+ r,
+ Operation::BlockingRead,
+ self.metrics.clone(),
+ self.scheme,
+ ),
+ )
+ });
+
+ result.map_err(|e| {
+ self.metrics
+ .increment_errors_total(self.scheme, Operation::BlockingRead,
e.kind());
+ e
+ })
+ }
+
+ fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite,
Self::BlockingWriter)> {
+ self.metrics
+ .increment_request_total(self.scheme, Operation::BlockingWrite);
+
+ let result = self.inner.blocking_write(path, args).map(|(rp, r)| {
+ (
+ rp,
+ PrometheusMetricWrapper::new(
+ r,
+ Operation::BlockingWrite,
+ self.metrics.clone(),
+ self.scheme,
+ ),
+ )
+ });
+
+ result.map_err(|e| {
+ self.metrics
+ .increment_errors_total(self.scheme, Operation::BlockingWrite,
e.kind());
+ e
+ })
+ }
+
+ fn blocking_stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
+ self.metrics
+ .increment_request_total(self.scheme, Operation::BlockingStat);
+ let start_time = Instant::now();
+
+ let result = self.inner.blocking_stat(path, args);
+ self.metrics.observe_request_duration(
+ self.scheme,
+ Operation::BlockingStat,
+ start_time.elapsed(),
+ );
+
+ result.map_err(|e| {
+ self.metrics
+ .increment_errors_total(self.scheme, Operation::BlockingStat,
e.kind());
+ e
+ })
+ }
+
+ fn blocking_delete(&self, path: &str, args: OpDelete) -> Result<RpDelete> {
+ self.metrics
+ .increment_request_total(self.scheme, Operation::BlockingDelete);
+ let start_time = Instant::now();
+
+ let result = self.inner.blocking_delete(path, args);
+
+ self.metrics.observe_request_duration(
+ self.scheme,
+ Operation::BlockingDelete,
+ start_time.elapsed(),
+ );
+ result.map_err(|e| {
+ self.metrics
+ .increment_errors_total(self.scheme,
Operation::BlockingDelete, e.kind());
+ e
+ })
+ }
+
+ fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList,
Self::BlockingPager)> {
+ self.metrics
+ .increment_request_total(self.scheme, Operation::BlockingList);
+ let start_time = Instant::now();
+
+ let result = self.inner.blocking_list(path, args);
+
+ self.metrics.observe_request_duration(
+ self.scheme,
+ Operation::BlockingList,
+ start_time.elapsed(),
+ );
+ result.map_err(|e| {
+ self.metrics
+ .increment_errors_total(self.scheme, Operation::BlockingList,
e.kind());
+ e
+ })
+ }
+}
+
+pub struct PrometheusMetricWrapper<R> {
+ inner: R,
+
+ op: Operation,
+ metrics: Arc<PrometheusClientMetrics>,
+ scheme: Scheme,
+ bytes_total: usize,
+ start_time: Instant,
+}
+
+impl<R> PrometheusMetricWrapper<R> {
+ fn new(inner: R, op: Operation, metrics: Arc<PrometheusClientMetrics>,
scheme: Scheme) -> Self {
+ Self {
+ inner,
+ op,
+ metrics,
+ scheme,
+ bytes_total: 0,
+ start_time: Instant::now(),
+ }
+ }
+}
+
+impl<R: oio::Read> oio::Read for PrometheusMetricWrapper<R> {
+ fn poll_read(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) ->
Poll<Result<usize>> {
+ self.inner.poll_read(cx, buf).map(|res| match res {
+ Ok(bytes) => {
+ self.bytes_total += bytes;
+ Ok(bytes)
+ }
+ Err(e) => {
+ self.metrics
+ .increment_errors_total(self.scheme, self.op, e.kind());
+ Err(e)
+ }
+ })
+ }
+
+ fn poll_seek(&mut self, cx: &mut Context<'_>, pos: io::SeekFrom) ->
Poll<Result<u64>> {
+ self.inner.poll_seek(cx, pos).map(|res| match res {
+ Ok(n) => Ok(n),
+ Err(e) => {
+ self.metrics
+ .increment_errors_total(self.scheme, self.op, e.kind());
+ Err(e)
+ }
+ })
+ }
+
+ fn poll_next(&mut self, cx: &mut Context<'_>) ->
Poll<Option<Result<Bytes>>> {
+ self.inner.poll_next(cx).map(|res| match res {
+ Some(Ok(bytes)) => {
+ self.bytes_total += bytes.len();
+ Some(Ok(bytes))
+ }
+ Some(Err(e)) => {
+ self.metrics
+ .increment_errors_total(self.scheme, self.op, e.kind());
+ Some(Err(e))
+ }
+ None => None,
+ })
+ }
+}
+
+impl<R: oio::BlockingRead> oio::BlockingRead for PrometheusMetricWrapper<R> {
+ fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
+ self.inner
+ .read(buf)
+ .map(|n| {
+ self.bytes_total += n;
+ n
+ })
+ .map_err(|e| {
+ self.metrics
+ .increment_errors_total(self.scheme, self.op, e.kind());
+ e
+ })
+ }
+
+ fn seek(&mut self, pos: io::SeekFrom) -> Result<u64> {
+ self.inner.seek(pos).map_err(|err| {
+ self.metrics
+ .increment_errors_total(self.scheme, self.op, err.kind());
+ err
+ })
+ }
+
+ fn next(&mut self) -> Option<Result<Bytes>> {
+ self.inner.next().map(|res| match res {
+ Ok(bytes) => {
+ self.bytes_total += bytes.len();
+ Ok(bytes)
+ }
+ Err(e) => {
+ self.metrics
+ .increment_errors_total(self.scheme, self.op, e.kind());
+ Err(e)
+ }
+ })
+ }
+}
+
+#[async_trait]
+impl<R: oio::Write> oio::Write for PrometheusMetricWrapper<R> {
+ fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) ->
Poll<Result<usize>> {
+ self.inner
+ .poll_write(cx, bs)
+ .map_ok(|n| {
+ self.bytes_total += n;
+ n
+ })
+ .map_err(|err| {
+ self.metrics
+ .increment_errors_total(self.scheme, self.op, err.kind());
+ err
+ })
+ }
+
+ fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
+ self.inner.poll_abort(cx).map_err(|err| {
+ self.metrics
+ .increment_errors_total(self.scheme, self.op, err.kind());
+ err
+ })
+ }
+
+ fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
+ self.inner.poll_close(cx).map_err(|err| {
+ self.metrics
+ .increment_errors_total(self.scheme, self.op, err.kind());
+ err
+ })
+ }
+}
+
+impl<R: oio::BlockingWrite> oio::BlockingWrite for PrometheusMetricWrapper<R> {
+ fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
+ self.inner
+ .write(bs)
+ .map(|n| {
+ self.bytes_total += n;
+ n
+ })
+ .map_err(|err| {
+ self.metrics
+ .increment_errors_total(self.scheme, self.op, err.kind());
+ err
+ })
+ }
+
+ fn close(&mut self) -> Result<()> {
+ self.inner.close().map_err(|err| {
+ self.metrics
+ .increment_errors_total(self.scheme, self.op, err.kind());
+ err
+ })
+ }
+}
+
+impl<R> Drop for PrometheusMetricWrapper<R> {
+ fn drop(&mut self) {
+ self.metrics
+ .observe_bytes_total(self.scheme, self.op, self.bytes_total);
+ self.metrics
+ .observe_request_duration(self.scheme, self.op,
self.start_time.elapsed());
+ }
+}