This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 91a4fca38 [#1221] feat(rust): Support grpc server graceful shutdown
(#1292)
91a4fca38 is described below
commit 91a4fca38a6d685aa6ea1395ca3be552867c50fd
Author: wenlongbrother <[email protected]>
AuthorDate: Mon Nov 6 13:10:33 2023 +0800
[#1221] feat(rust): Support grpc server graceful shutdown (#1292)
### What changes were proposed in this pull request?
Add grpc graceful shutdown
### Why are the changes needed?
**Data Integrity**: To ensure that any ongoing write operations or
transactions are properly handled to prevent data corruption or loss.
**Resource Cleanup**: To release resources such as file handles, network
connections, and memory to prevent resource leaks.
**User Experience**: To provide feedback to users, where possible,
informing them that the program is shutting down, to avoid sudden service
interruptions causing distress.
**State Preservation**: In some applications, it may be necessary to save
the current state so that the next start can continue from the same point.
**Dependency Service Coordination**: If your service is a dependency for
other services, a graceful shutdown can ensure dependent services are notified
to avoid causing cascading failures.
**Error Tracking and Logging**: Recording sufficient information during the
shutdown process is very helpful for subsequent error analysis and
troubleshooting.
**Compliance with Laws and Regulations**: Some applications process
sensitive data and need to ensure compliance with legal and regulatory
requirements to prevent data breaches during the shutdown process.
In distributed systems and microservices architectures, graceful shutdown
becomes particularly important, as the interactions between services are more
complex, and improper shutdowns can trigger a range of issues. For example, if
a database service shuts down abruptly without gracefully handling current
operations, it could lead to data inconsistencies that affect the stability of
the entire system.
Overall, graceful shutdown is about ensuring the robustness and reliability
of systems, maintaining service quality, and protecting data security even when
service termination is necessary.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
<img width="835" alt="image"
src="https://github.com/apache/incubator-uniffle/assets/50658515/34bccef1-dc81-4981-8a81-d5d89024f583">
<img width="906" alt="image"
src="https://github.com/apache/incubator-uniffle/assets/50658515/3c6ef339-f7e9-445d-8a04-ebc1115757d9">
---------
Co-authored-by: ”qingge“ <“[email protected]”>
---
rust/experimental/server/src/lib.rs | 22 ++++-
rust/experimental/server/src/main.rs | 17 +++-
rust/experimental/server/src/signal.rs | 12 +++
.../experimental/server/tests/graceful_shutdown.rs | 104 +++++++++++++++++++++
4 files changed, 150 insertions(+), 5 deletions(-)
diff --git a/rust/experimental/server/src/lib.rs
b/rust/experimental/server/src/lib.rs
index 2ec1657b5..4cf340392 100644
--- a/rust/experimental/server/src/lib.rs
+++ b/rust/experimental/server/src/lib.rs
@@ -50,6 +50,8 @@ use bytes::{Buf, Bytes, BytesMut};
use log::info;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::time::Duration;
+use tokio::signal::unix::{signal, SignalKind};
+use tokio::sync::oneshot;
use tonic::transport::{Channel, Server};
pub async fn start_uniffle_worker(config: config::Config) -> Result<()> {
@@ -64,6 +66,8 @@ pub async fn start_uniffle_worker(config: config::Config) ->
Result<()> {
let http_port = config.http_monitor_service_port.unwrap_or(20010);
HTTP_SERVICE.start(runtime_manager.clone(), http_port);
+ let (tx, rx) = oneshot::channel::<()>();
+
// implement server startup
let cloned_runtime_manager = runtime_manager.clone();
runtime_manager.grpc_runtime.spawn(async move {
@@ -75,8 +79,24 @@ pub async fn start_uniffle_worker(config: config::Config) ->
Result<()> {
let service = ShuffleServerServer::new(shuffle_server)
.max_decoding_message_size(usize::MAX)
.max_encoding_message_size(usize::MAX);
- let _ = Server::builder().add_service(service).serve(addr).await;
+ let _ = Server::builder()
+ .add_service(service)
+ .serve_with_shutdown(addr, async {
+ rx.await.expect("graceful_shutdown fail");
+ println!("Successfully received the shutdown signal.");
+ })
+ .await;
+ });
+
+ runtime_manager.default_runtime.spawn(async move {
+ let _ = signal(SignalKind::terminate())
+ .expect("Failed to register signal handlers")
+ .recv()
+ .await;
+
+ let _ = tx.send(());
});
+
Ok(())
}
diff --git a/rust/experimental/server/src/main.rs
b/rust/experimental/server/src/main.rs
index e2399dd22..44eedc744 100644
--- a/rust/experimental/server/src/main.rs
+++ b/rust/experimental/server/src/main.rs
@@ -29,14 +29,15 @@ use
crate::proto::uniffle::coordinator_server_client::CoordinatorServerClient;
use crate::proto::uniffle::shuffle_server_server::ShuffleServerServer;
use crate::proto::uniffle::{ShuffleServerHeartBeatRequest, ShuffleServerId};
use crate::runtime::manager::RuntimeManager;
-use crate::signal::details::wait_for_signal;
+use crate::signal::details::graceful_wait_for_signal;
use crate::util::{gen_worker_uid, get_local_ip};
use anyhow::Result;
use clap::{App, Arg};
-use log::info;
+use log::{error, info};
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::time::Duration;
+use tokio::sync::oneshot;
use tonic::transport::{Channel, Server};
use tracing_appender::non_blocking::WorkerGuard;
use tracing_subscriber::layer::SubscriberExt;
@@ -216,6 +217,8 @@ fn main() -> Result<()> {
);
HTTP_SERVICE.start(runtime_manager.clone(), http_port);
+ let (tx, rx) = oneshot::channel::<()>();
+
info!("Starting GRpc server with port:[{}] ......", rpc_port);
let shuffle_server = DefaultShuffleServer::from(app_manager_ref);
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), rpc_port
as u16);
@@ -233,11 +236,17 @@ fn main() -> Result<()> {
AWAIT_TREE_REGISTRY.clone(),
)))
.add_service(service)
- .serve(addr)
+ .serve_with_shutdown(addr, async {
+ if let Err(err) = rx.await {
+ error!("Errors on stopping the GRPC service, err: {:?}.",
err);
+ } else {
+ info!("GRPC service has been graceful stopped.");
+ }
+ })
.await
});
- wait_for_signal();
+ graceful_wait_for_signal(tx);
Ok(())
}
diff --git a/rust/experimental/server/src/signal.rs
b/rust/experimental/server/src/signal.rs
index aa96c3ec7..d34006557 100644
--- a/rust/experimental/server/src/signal.rs
+++ b/rust/experimental/server/src/signal.rs
@@ -30,6 +30,18 @@ pub mod details {
}
}
}
+
+ pub fn graceful_wait_for_signal(tx: tokio::sync::oneshot::Sender<()>) {
+ let mut sigs = Signals::new(TERM_SIGNALS).expect("Failed to register
signal handlers");
+
+ for signal in &mut sigs {
+ if TERM_SIGNALS.contains(&signal) {
+ info!("Received signal {}, stopping server...", signal);
+ let _ = tx.send(());
+ break;
+ }
+ }
+ }
}
#[cfg(not(unix))]
diff --git a/rust/experimental/server/tests/graceful_shutdown.rs
b/rust/experimental/server/tests/graceful_shutdown.rs
new file mode 100644
index 000000000..54f09046b
--- /dev/null
+++ b/rust/experimental/server/tests/graceful_shutdown.rs
@@ -0,0 +1,104 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#[cfg(test)]
+mod test {
+ use std::time::Duration;
+
+ use signal_hook::consts::SIGTERM;
+ use signal_hook::low_level::raise;
+ use tonic::transport::Channel;
+
+ use uniffle_worker::config::{
+ Config, HybridStoreConfig, LocalfileStoreConfig, MemoryStoreConfig,
MetricsConfig,
+ StorageType,
+ };
+ use
uniffle_worker::proto::uniffle::shuffle_server_client::ShuffleServerClient;
+ use uniffle_worker::{start_uniffle_worker, write_read_for_one_time};
+
+ fn create_mocked_config(grpc_port: i32, capacity: String, local_data_path:
String) -> Config {
+ Config {
+ memory_store: Some(MemoryStoreConfig::new(capacity)),
+ localfile_store: Some(LocalfileStoreConfig {
+ data_paths: vec![local_data_path],
+ healthy_check_min_disks: Some(0),
+ disk_high_watermark: None,
+ disk_low_watermark: None,
+ disk_max_concurrency: None,
+ }),
+ hybrid_store: Some(HybridStoreConfig::new(0.9, 0.5, None)),
+ hdfs_store: None,
+ store_type: Some(StorageType::MEMORY_LOCALFILE),
+ runtime_config: Default::default(),
+ metrics: Some(MetricsConfig {
+ push_gateway_endpoint: None,
+ push_interval_sec: None,
+ }),
+ grpc_port: Some(grpc_port),
+ coordinator_quorum: vec![],
+ tags: None,
+ log: None,
+ app_heartbeat_timeout_min: None,
+ huge_partition_marked_threshold: None,
+ huge_partition_memory_max_used_percent: None,
+ http_monitor_service_port: None,
+ }
+ }
+
+ async fn get_data_from_remote(
+ _client: &ShuffleServerClient<Channel>,
+ _app_id: &str,
+ _shuffle_id: i32,
+ _partitions: Vec<i32>,
+ ) {
+ }
+
+ async fn start_embedded_worker(path: String, port: i32) {
+ let config = create_mocked_config(port, "1G".to_string(), path);
+ let _ = start_uniffle_worker(config).await;
+ tokio::time::sleep(Duration::from_secs(1)).await;
+ }
+
+ #[tokio::test]
+ async fn
graceful_shutdown_test_with_embedded_worker_successfully_shutdown() {
+ let temp_dir = tempdir::TempDir::new("test_write_read").unwrap();
+ let temp_path = temp_dir.path().to_str().unwrap().to_string();
+ println!("created the temp file path: {}", &temp_path);
+
+ let port = 21101;
+ let _ = start_embedded_worker(temp_path, port).await;
+
+ let client =
+ match ShuffleServerClient::connect(format!("http://{}:{}",
"0.0.0.0", port)).await {
+ Ok(client) => client,
+ Err(e) => {
+ // Handle the error, e.g., by panicking or logging it.
+ panic!("Failed to connect: {}", e);
+ }
+ };
+
+ let jh = tokio::spawn(async move {
write_read_for_one_time(client).await });
+
+ // raise shutdown signal
+ tokio::spawn(async {
+ raise(SIGTERM).expect("failed to raise shutdown signal");
+ eprintln!("successfully raised shutdown signal");
+ });
+
+ let _ = jh.await.expect("Task panicked or failed.");
+ }
+}