This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 682391610 [#1307] feat(rust): make each thread listen the socket to 
improve throughput in tonic (#1306)
682391610 is described below

commit 6823916106fff19a5ce8414247bf8a63ecae0304
Author: Junfan Zhang <[email protected]>
AuthorDate: Fri Nov 10 16:59:19 2023 +0800

    [#1307] feat(rust): make each thread listen the socket to improve 
throughput in tonic (#1306)
    
    ### What changes were proposed in this pull request?
    
    Make each grpc service thread listens the socket to improve request 
throughput.
    
    ### Why are the changes needed?
    
    I‘m always digging the way to solve the request throughput when the writing 
concurrency is 400.
    And then I found the few people talk about this, which makes me confused 
until I see this 
https://github.com/hyperium/tonic/issues/1405#issuecomment-1805099818 .
    
    From this blog 
https://medium.com/@fujita.tomonori/scalable-server-design-in-rust-with-tokio-4c81a5f350a3,
     I try to do this optimization, and then do some terasort test. I found the 
peek receive data speed from 1.2G/s -> 2.0G/s, it works ! And the latency 
becomes low. And the whole processing time reduces from 20min -> 8.5min
    
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    No need
---
 rust/experimental/server/Cargo.lock    |  1 +
 rust/experimental/server/Cargo.toml    |  1 +
 rust/experimental/server/src/main.rs   | 95 ++++++++++++++++++++++++----------
 rust/experimental/server/src/signal.rs |  2 +-
 4 files changed, 70 insertions(+), 29 deletions(-)

diff --git a/rust/experimental/server/Cargo.lock 
b/rust/experimental/server/Cargo.lock
index 3d21a2437..ec8ed2930 100644
--- a/rust/experimental/server/Cargo.lock
+++ b/rust/experimental/server/Cargo.lock
@@ -3197,6 +3197,7 @@ dependencies = [
  "prost-build",
  "serde",
  "signal-hook",
+ "socket2 0.4.9",
  "tempdir",
  "tempfile",
  "thiserror",
diff --git a/rust/experimental/server/Cargo.toml 
b/rust/experimental/server/Cargo.toml
index ba8f20ee5..62a784265 100644
--- a/rust/experimental/server/Cargo.toml
+++ b/rust/experimental/server/Cargo.toml
@@ -94,6 +94,7 @@ console-subscriber = "0.1.9"
 pin-project-lite = "0.2.8"
 signal-hook = "0.3.17"
 clap = "3.0.14"
+socket2 = { version="0.4", features = ["all"]}
 
 [dependencies.hdrs]
 version = "0.3.0"
diff --git a/rust/experimental/server/src/main.rs 
b/rust/experimental/server/src/main.rs
index 44eedc744..f8cc817c4 100644
--- a/rust/experimental/server/src/main.rs
+++ b/rust/experimental/server/src/main.rs
@@ -34,10 +34,11 @@ use crate::util::{gen_worker_uid, get_local_ip};
 
 use anyhow::Result;
 use clap::{App, Arg};
-use log::{error, info};
+use log::{debug, error, info};
 use std::net::{IpAddr, Ipv4Addr, SocketAddr};
 use std::time::Duration;
-use tokio::sync::oneshot;
+use tokio::net::TcpListener;
+use tokio::sync::broadcast;
 use tonic::transport::{Channel, Server};
 use tracing_appender::non_blocking::WorkerGuard;
 use tracing_subscriber::layer::SubscriberExt;
@@ -217,40 +218,78 @@ fn main() -> Result<()> {
     );
     HTTP_SERVICE.start(runtime_manager.clone(), http_port);
 
-    let (tx, rx) = oneshot::channel::<()>();
+    let (tx, _) = broadcast::channel(1);
 
     info!("Starting GRpc server with port:[{}] ......", rpc_port);
-    let shuffle_server = DefaultShuffleServer::from(app_manager_ref);
-    let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), rpc_port 
as u16);
-    let service = ShuffleServerServer::new(shuffle_server)
-        .max_decoding_message_size(usize::MAX)
-        .max_encoding_message_size(usize::MAX);
-
-    let _grpc_service_handle = runtime_manager.grpc_runtime.spawn(async move {
-        Server::builder()
-            .initial_connection_window_size(MAX_CONNECTION_WINDOW_SIZE)
-            .initial_stream_window_size(STREAM_WINDOW_SIZE)
-            .tcp_nodelay(true)
-            .layer(MetricsMiddlewareLayer::new(GRPC_LATENCY_TIME_SEC.clone()))
-            .layer(AwaitTreeMiddlewareLayer::new_optional(Some(
-                AWAIT_TREE_REGISTRY.clone(),
-            )))
-            .add_service(service)
-            .serve_with_shutdown(addr, async {
-                if let Err(err) = rx.await {
-                    error!("Errors on stopping the GRPC service, err: {:?}.", 
err);
-                } else {
-                    info!("GRPC service has been graceful stopped.");
-                }
-            })
-            .await
-    });
+
+    let available_cores = std::thread::available_parallelism()?;
+    debug!("GRpc service with parallelism: [{}]", &available_cores);
+
+    for _ in 0..available_cores.into() {
+        let shuffle_server = 
DefaultShuffleServer::from(app_manager_ref.clone());
+        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 
rpc_port as u16);
+        let service = ShuffleServerServer::new(shuffle_server)
+            .max_decoding_message_size(usize::MAX)
+            .max_encoding_message_size(usize::MAX);
+        let service_tx = tx.subscribe();
+        std::thread::spawn(move || {
+            tokio::runtime::Builder::new_current_thread()
+                .enable_all()
+                .build()
+                .unwrap()
+                .block_on(grpc_serve(service, addr, service_tx));
+        });
+    }
 
     graceful_wait_for_signal(tx);
 
     Ok(())
 }
 
+async fn grpc_serve(
+    service: ShuffleServerServer<DefaultShuffleServer>,
+    addr: SocketAddr,
+    mut rx: broadcast::Receiver<()>,
+) {
+    let sock = socket2::Socket::new(
+        match addr {
+            SocketAddr::V4(_) => socket2::Domain::IPV4,
+            SocketAddr::V6(_) => socket2::Domain::IPV6,
+        },
+        socket2::Type::STREAM,
+        None,
+    )
+    .unwrap();
+
+    sock.set_reuse_address(true).unwrap();
+    sock.set_reuse_port(true).unwrap();
+    sock.set_nonblocking(true).unwrap();
+    sock.bind(&addr.into()).unwrap();
+    sock.listen(8192).unwrap();
+
+    let incoming =
+        
tokio_stream::wrappers::TcpListenerStream::new(TcpListener::from_std(sock.into()).unwrap());
+
+    Server::builder()
+        .initial_connection_window_size(MAX_CONNECTION_WINDOW_SIZE)
+        .initial_stream_window_size(STREAM_WINDOW_SIZE)
+        .tcp_nodelay(true)
+        .layer(MetricsMiddlewareLayer::new(GRPC_LATENCY_TIME_SEC.clone()))
+        .layer(AwaitTreeMiddlewareLayer::new_optional(Some(
+            AWAIT_TREE_REGISTRY.clone(),
+        )))
+        .add_service(service)
+        .serve_with_incoming_shutdown(incoming, async {
+            if let Err(err) = rx.recv().await {
+                error!("Errors on stopping the GRPC service, err: {:?}.", err);
+            } else {
+                info!("GRPC service has been graceful stopped.");
+            }
+        })
+        .await
+        .unwrap();
+}
+
 #[cfg(test)]
 mod test {
     use crate::get_local_ip;
diff --git a/rust/experimental/server/src/signal.rs 
b/rust/experimental/server/src/signal.rs
index d34006557..da8b2faf6 100644
--- a/rust/experimental/server/src/signal.rs
+++ b/rust/experimental/server/src/signal.rs
@@ -31,7 +31,7 @@ pub mod details {
         }
     }
 
-    pub fn graceful_wait_for_signal(tx: tokio::sync::oneshot::Sender<()>) {
+    pub fn graceful_wait_for_signal(tx: tokio::sync::broadcast::Sender<()>) {
         let mut sigs = Signals::new(TERM_SIGNALS).expect("Failed to register 
signal handlers");
 
         for signal in &mut sigs {

Reply via email to