This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 19a20484f [#1294] feat(rust): introduce the unified grpc latency 
metrics for all requests (#1295)
19a20484f is described below

commit 19a20484ffa9a6849748d76c85246de077b8b881
Author: Junfan Zhang <[email protected]>
AuthorDate: Fri Nov 3 16:54:07 2023 +0800

    [#1294] feat(rust): introduce the unified grpc latency metrics for all 
requests (#1295)
    
    ### What changes were proposed in this pull request?
    
     introduce the unified grpc latency metrics for all requests
    
    ### Why are the changes needed?
    
    For #1294
    
    Currently, we calculate the request duration for every request in the 
service codebase,
    which is ugly and scattered everywhere.
    BTW, the current request duration metrics is limited to critical paths.
    So, it's necessary to use the tonic's layer mechanism to summary the grpc 
latency metrics.
    
    And the reason of no deleting original latency metrics is the difference of 
each other,
    the metrics embedded in the layer will be greater than in the service code,
    because request will spend sometime to decoding the message, the diff time 
is valuable for observability.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes. After applying this patch, the latency metrics of request like P99 
could be gotten like this.
    
    `histogram_quantile(0.99, 
sum(rate(grpc_duration_seconds_bucket{path="/rss.common.ShuffleServer/sendShuffleData",
 job="uniffle-worker"}[2m])) by (le))`
    
    ### How was this patch tested?
    
    Tested in the test env and validated by the promql
---
 rust/experimental/server/src/grpc.rs   | 73 +++++++++++++++++++++++++++++++++-
 rust/experimental/server/src/main.rs   |  6 ++-
 rust/experimental/server/src/metric.rs | 14 ++++++-
 3 files changed, 88 insertions(+), 5 deletions(-)

diff --git a/rust/experimental/server/src/grpc.rs 
b/rust/experimental/server/src/grpc.rs
index 42f515046..138e0f656 100644
--- a/rust/experimental/server/src/grpc.rs
+++ b/rust/experimental/server/src/grpc.rs
@@ -620,7 +620,78 @@ impl ShuffleServer for DefaultShuffleServer {
     }
 }
 
-pub mod grpc_middleware {
+pub mod metrics_middleware {
+    use hyper::service::Service;
+    use hyper::Body;
+    use prometheus::HistogramVec;
+    use std::task::{Context, Poll};
+    use tower::Layer;
+
+    #[derive(Clone)]
+    pub struct MetricsMiddlewareLayer {
+        metric: HistogramVec,
+    }
+
+    impl MetricsMiddlewareLayer {
+        pub fn new(metric: HistogramVec) -> Self {
+            Self { metric }
+        }
+    }
+
+    impl<S> Layer<S> for MetricsMiddlewareLayer {
+        type Service = MetricsMiddleware<S>;
+
+        fn layer(&self, service: S) -> Self::Service {
+            MetricsMiddleware {
+                inner: service,
+                metric: self.metric.clone(),
+            }
+        }
+    }
+
+    #[derive(Clone)]
+    pub struct MetricsMiddleware<S> {
+        inner: S,
+        metric: HistogramVec,
+    }
+
+    impl<S> Service<hyper::Request<Body>> for MetricsMiddleware<S>
+    where
+        S: Service<hyper::Request<Body>> + Clone + Send + 'static,
+        S::Future: Send + 'static,
+    {
+        type Response = S::Response;
+        type Error = S::Error;
+        type Future = futures::future::BoxFuture<'static, 
Result<Self::Response, Self::Error>>;
+
+        fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), 
Self::Error>> {
+            self.inner.poll_ready(cx)
+        }
+
+        fn call(&mut self, req: hyper::Request<Body>) -> Self::Future {
+            // This is necessary because tonic internally uses 
`tower::buffer::Buffer`.
+            // See 
https://github.com/tower-rs/tower/issues/547#issuecomment-767629149
+            // for details on why this is necessary
+            let clone = self.inner.clone();
+            let mut inner = std::mem::replace(&mut self.inner, clone);
+
+            let metrics = self.metric.clone();
+
+            Box::pin(async move {
+                let path = req.uri().path();
+                let timer = metrics.with_label_values(&[path]).start_timer();
+
+                let response = inner.call(req).await?;
+
+                timer.observe_duration();
+
+                Ok(response)
+            })
+        }
+    }
+}
+
+pub mod await_tree_middleware {
     use std::task::{Context, Poll};
 
     use crate::await_tree::AwaitTreeInner;
diff --git a/rust/experimental/server/src/main.rs 
b/rust/experimental/server/src/main.rs
index b92c453ac..e2399dd22 100644
--- a/rust/experimental/server/src/main.rs
+++ b/rust/experimental/server/src/main.rs
@@ -20,10 +20,11 @@
 use crate::app::{AppManager, AppManagerRef};
 use crate::await_tree::AWAIT_TREE_REGISTRY;
 use crate::config::{Config, LogConfig, RotationConfig};
-use crate::grpc::grpc_middleware::AwaitTreeMiddlewareLayer;
+use crate::grpc::await_tree_middleware::AwaitTreeMiddlewareLayer;
+use crate::grpc::metrics_middleware::MetricsMiddlewareLayer;
 use crate::grpc::{DefaultShuffleServer, MAX_CONNECTION_WINDOW_SIZE, 
STREAM_WINDOW_SIZE};
 use crate::http::{HTTPServer, HTTP_SERVICE};
-use crate::metric::init_metric_service;
+use crate::metric::{init_metric_service, GRPC_LATENCY_TIME_SEC};
 use crate::proto::uniffle::coordinator_server_client::CoordinatorServerClient;
 use crate::proto::uniffle::shuffle_server_server::ShuffleServerServer;
 use crate::proto::uniffle::{ShuffleServerHeartBeatRequest, ShuffleServerId};
@@ -227,6 +228,7 @@ fn main() -> Result<()> {
             .initial_connection_window_size(MAX_CONNECTION_WINDOW_SIZE)
             .initial_stream_window_size(STREAM_WINDOW_SIZE)
             .tcp_nodelay(true)
+            .layer(MetricsMiddlewareLayer::new(GRPC_LATENCY_TIME_SEC.clone()))
             .layer(AwaitTreeMiddlewareLayer::new_optional(Some(
                 AWAIT_TREE_REGISTRY.clone(),
             )))
diff --git a/rust/experimental/server/src/metric.rs 
b/rust/experimental/server/src/metric.rs
index e7aea859e..22c91e7db 100644
--- a/rust/experimental/server/src/metric.rs
+++ b/rust/experimental/server/src/metric.rs
@@ -20,8 +20,8 @@ use crate::runtime::manager::RuntimeManager;
 use log::{error, info};
 use once_cell::sync::Lazy;
 use prometheus::{
-    labels, register_int_gauge_vec, Histogram, HistogramOpts, IntCounter, 
IntGauge, IntGaugeVec,
-    Registry,
+    histogram_opts, labels, register_histogram_vec_with_registry, 
register_int_gauge_vec,
+    Histogram, HistogramOpts, HistogramVec, IntCounter, IntGauge, IntGaugeVec, 
Registry,
 };
 use std::time::Duration;
 
@@ -90,6 +90,16 @@ pub static GRPC_BUFFER_REQUIRE_PROCESS_TIME: Lazy<Histogram> 
= Lazy::new(|| {
     histogram
 });
 
+pub static GRPC_LATENCY_TIME_SEC: Lazy<HistogramVec> = Lazy::new(|| {
+    let opts = histogram_opts!(
+        "grpc_duration_seconds",
+        "gRPC latency",
+        Vec::from(DEFAULT_BUCKETS as &'static [f64])
+    );
+    let grpc_latency = register_histogram_vec_with_registry!(opts, &["path"], 
REGISTRY).unwrap();
+    grpc_latency
+});
+
 pub static TOTAL_MEMORY_USED: Lazy<IntCounter> = Lazy::new(|| {
     IntCounter::new("total_memory_used", "Total memory used").expect("metric 
should be created")
 });

Reply via email to