This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 682391610 [#1307] feat(rust): make each thread listen the socket to
improve throughput in tonic (#1306)
682391610 is described below
commit 6823916106fff19a5ce8414247bf8a63ecae0304
Author: Junfan Zhang <[email protected]>
AuthorDate: Fri Nov 10 16:59:19 2023 +0800
[#1307] feat(rust): make each thread listen the socket to improve
throughput in tonic (#1306)
### What changes were proposed in this pull request?
Make each grpc service thread listens the socket to improve request
throughput.
### Why are the changes needed?
I‘m always digging the way to solve the request throughput when the writing
concurrency is 400.
And then I found the few people talk about this, which makes me confused
until I see this
https://github.com/hyperium/tonic/issues/1405#issuecomment-1805099818 .
From this blog
https://medium.com/@fujita.tomonori/scalable-server-design-in-rust-with-tokio-4c81a5f350a3,
I try to do this optimization, and then do some terasort test. I found the
peek receive data speed from 1.2G/s -> 2.0G/s, it works ! And the latency
becomes low. And the whole processing time reduces from 20min -> 8.5min
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
No need
---
rust/experimental/server/Cargo.lock | 1 +
rust/experimental/server/Cargo.toml | 1 +
rust/experimental/server/src/main.rs | 95 ++++++++++++++++++++++++----------
rust/experimental/server/src/signal.rs | 2 +-
4 files changed, 70 insertions(+), 29 deletions(-)
diff --git a/rust/experimental/server/Cargo.lock
b/rust/experimental/server/Cargo.lock
index 3d21a2437..ec8ed2930 100644
--- a/rust/experimental/server/Cargo.lock
+++ b/rust/experimental/server/Cargo.lock
@@ -3197,6 +3197,7 @@ dependencies = [
"prost-build",
"serde",
"signal-hook",
+ "socket2 0.4.9",
"tempdir",
"tempfile",
"thiserror",
diff --git a/rust/experimental/server/Cargo.toml
b/rust/experimental/server/Cargo.toml
index ba8f20ee5..62a784265 100644
--- a/rust/experimental/server/Cargo.toml
+++ b/rust/experimental/server/Cargo.toml
@@ -94,6 +94,7 @@ console-subscriber = "0.1.9"
pin-project-lite = "0.2.8"
signal-hook = "0.3.17"
clap = "3.0.14"
+socket2 = { version="0.4", features = ["all"]}
[dependencies.hdrs]
version = "0.3.0"
diff --git a/rust/experimental/server/src/main.rs
b/rust/experimental/server/src/main.rs
index 44eedc744..f8cc817c4 100644
--- a/rust/experimental/server/src/main.rs
+++ b/rust/experimental/server/src/main.rs
@@ -34,10 +34,11 @@ use crate::util::{gen_worker_uid, get_local_ip};
use anyhow::Result;
use clap::{App, Arg};
-use log::{error, info};
+use log::{debug, error, info};
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::time::Duration;
-use tokio::sync::oneshot;
+use tokio::net::TcpListener;
+use tokio::sync::broadcast;
use tonic::transport::{Channel, Server};
use tracing_appender::non_blocking::WorkerGuard;
use tracing_subscriber::layer::SubscriberExt;
@@ -217,40 +218,78 @@ fn main() -> Result<()> {
);
HTTP_SERVICE.start(runtime_manager.clone(), http_port);
- let (tx, rx) = oneshot::channel::<()>();
+ let (tx, _) = broadcast::channel(1);
info!("Starting GRpc server with port:[{}] ......", rpc_port);
- let shuffle_server = DefaultShuffleServer::from(app_manager_ref);
- let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), rpc_port
as u16);
- let service = ShuffleServerServer::new(shuffle_server)
- .max_decoding_message_size(usize::MAX)
- .max_encoding_message_size(usize::MAX);
-
- let _grpc_service_handle = runtime_manager.grpc_runtime.spawn(async move {
- Server::builder()
- .initial_connection_window_size(MAX_CONNECTION_WINDOW_SIZE)
- .initial_stream_window_size(STREAM_WINDOW_SIZE)
- .tcp_nodelay(true)
- .layer(MetricsMiddlewareLayer::new(GRPC_LATENCY_TIME_SEC.clone()))
- .layer(AwaitTreeMiddlewareLayer::new_optional(Some(
- AWAIT_TREE_REGISTRY.clone(),
- )))
- .add_service(service)
- .serve_with_shutdown(addr, async {
- if let Err(err) = rx.await {
- error!("Errors on stopping the GRPC service, err: {:?}.",
err);
- } else {
- info!("GRPC service has been graceful stopped.");
- }
- })
- .await
- });
+
+ let available_cores = std::thread::available_parallelism()?;
+ debug!("GRpc service with parallelism: [{}]", &available_cores);
+
+ for _ in 0..available_cores.into() {
+ let shuffle_server =
DefaultShuffleServer::from(app_manager_ref.clone());
+ let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
rpc_port as u16);
+ let service = ShuffleServerServer::new(shuffle_server)
+ .max_decoding_message_size(usize::MAX)
+ .max_encoding_message_size(usize::MAX);
+ let service_tx = tx.subscribe();
+ std::thread::spawn(move || {
+ tokio::runtime::Builder::new_current_thread()
+ .enable_all()
+ .build()
+ .unwrap()
+ .block_on(grpc_serve(service, addr, service_tx));
+ });
+ }
graceful_wait_for_signal(tx);
Ok(())
}
+async fn grpc_serve(
+ service: ShuffleServerServer<DefaultShuffleServer>,
+ addr: SocketAddr,
+ mut rx: broadcast::Receiver<()>,
+) {
+ let sock = socket2::Socket::new(
+ match addr {
+ SocketAddr::V4(_) => socket2::Domain::IPV4,
+ SocketAddr::V6(_) => socket2::Domain::IPV6,
+ },
+ socket2::Type::STREAM,
+ None,
+ )
+ .unwrap();
+
+ sock.set_reuse_address(true).unwrap();
+ sock.set_reuse_port(true).unwrap();
+ sock.set_nonblocking(true).unwrap();
+ sock.bind(&addr.into()).unwrap();
+ sock.listen(8192).unwrap();
+
+ let incoming =
+
tokio_stream::wrappers::TcpListenerStream::new(TcpListener::from_std(sock.into()).unwrap());
+
+ Server::builder()
+ .initial_connection_window_size(MAX_CONNECTION_WINDOW_SIZE)
+ .initial_stream_window_size(STREAM_WINDOW_SIZE)
+ .tcp_nodelay(true)
+ .layer(MetricsMiddlewareLayer::new(GRPC_LATENCY_TIME_SEC.clone()))
+ .layer(AwaitTreeMiddlewareLayer::new_optional(Some(
+ AWAIT_TREE_REGISTRY.clone(),
+ )))
+ .add_service(service)
+ .serve_with_incoming_shutdown(incoming, async {
+ if let Err(err) = rx.recv().await {
+ error!("Errors on stopping the GRPC service, err: {:?}.", err);
+ } else {
+ info!("GRPC service has been graceful stopped.");
+ }
+ })
+ .await
+ .unwrap();
+}
+
#[cfg(test)]
mod test {
use crate::get_local_ip;
diff --git a/rust/experimental/server/src/signal.rs
b/rust/experimental/server/src/signal.rs
index d34006557..da8b2faf6 100644
--- a/rust/experimental/server/src/signal.rs
+++ b/rust/experimental/server/src/signal.rs
@@ -31,7 +31,7 @@ pub mod details {
}
}
- pub fn graceful_wait_for_signal(tx: tokio::sync::oneshot::Sender<()>) {
+ pub fn graceful_wait_for_signal(tx: tokio::sync::broadcast::Sender<()>) {
let mut sigs = Signals::new(TERM_SIGNALS).expect("Failed to register
signal handlers");
for signal in &mut sigs {