This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 03358b418 [#1234] improvement(rust): separate runtimes for different
overload (#1233)
03358b418 is described below
commit 03358b418356ed4e917d73a1acac7a1baeb990da
Author: Junfan Zhang <[email protected]>
AuthorDate: Mon Oct 16 10:04:02 2023 +0800
[#1234] improvement(rust): separate runtimes for different overload (#1233)
### What changes were proposed in this pull request?
In current codebase, we use the same tokio runtime for
grpc/http/reading/flushing service, this is not
stable for the different load.
This PR is to introduce runtime manager to separate.
### Why are the changes needed?
For #1234 , separate runtimes for different overload
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
UTs
---
.gitignore | 1 +
rust/experimental/server/Cargo.lock | 2 +
rust/experimental/server/Cargo.toml | 2 +
rust/experimental/server/src/app.rs | 19 +-
rust/experimental/server/src/config.rs | 38 +++-
rust/experimental/server/src/http/http_service.rs | 6 +-
rust/experimental/server/src/http/mod.rs | 4 +-
rust/experimental/server/src/lib.rs | 18 +-
rust/experimental/server/src/main.rs | 57 +++--
rust/experimental/server/src/metric.rs | 40 +++-
rust/experimental/server/src/runtime/manager.rs | 70 ++++++
rust/experimental/server/src/runtime/metrics.rs | 54 +++++
rust/experimental/server/src/runtime/mod.rs | 264 ++++++++++++++++++++++
rust/experimental/server/src/signal.rs | 46 ++++
rust/experimental/server/src/store/hybrid.rs | 130 ++++++-----
rust/experimental/server/src/store/localfile.rs | 141 ++++++++----
rust/experimental/server/src/store/memory.rs | 133 +++++++----
rust/experimental/server/src/store/mod.rs | 5 +-
rust/experimental/server/tests/write_read.rs | 1 +
19 files changed, 831 insertions(+), 200 deletions(-)
diff --git a/.gitignore b/.gitignore
index ac86fc6c4..359b8e83c 100644
--- a/.gitignore
+++ b/.gitignore
@@ -48,4 +48,5 @@ deploy/kubernetes/integration-test/e2e/rss-controller.yaml
deploy/kubernetes/integration-test/e2e/rss-webhook.yaml
rust/experimental/server/target
rust/experimental/server/.idea
+rust/experimental/server/._target
rust/experimental/server/src/proto/uniffle.rs
diff --git a/rust/experimental/server/Cargo.lock
b/rust/experimental/server/Cargo.lock
index 172363b76..73344c2bb 100644
--- a/rust/experimental/server/Cargo.lock
+++ b/rust/experimental/server/Cargo.lock
@@ -3188,12 +3188,14 @@ dependencies = [
"hyper",
"log",
"once_cell",
+ "pin-project-lite",
"poem",
"pprof",
"prometheus",
"prost",
"prost-build",
"serde",
+ "signal-hook",
"tempdir",
"tempfile",
"thiserror",
diff --git a/rust/experimental/server/Cargo.toml
b/rust/experimental/server/Cargo.toml
index a71aa497d..9f03bd468 100644
--- a/rust/experimental/server/Cargo.toml
+++ b/rust/experimental/server/Cargo.toml
@@ -91,6 +91,8 @@ tower = { version = "0.4", features = ["util", "load-shed"] }
hyper = "0.14"
tokio-console = "0.1.8"
console-subscriber = "0.1.9"
+pin-project-lite = "0.2.8"
+signal-hook = "0.3.17"
[dependencies.hdrs]
version = "0.3.0"
diff --git a/rust/experimental/server/src/app.rs
b/rust/experimental/server/src/app.rs
index 9afbdeeeb..9c1151dfc 100644
--- a/rust/experimental/server/src/app.rs
+++ b/rust/experimental/server/src/app.rs
@@ -50,6 +50,7 @@ use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Duration;
+use crate::runtime::manager::RuntimeManager;
use tokio::sync::RwLock;
#[derive(Debug, Clone)]
@@ -356,10 +357,10 @@ pub struct AppManager {
}
impl AppManager {
- fn new(config: Config) -> Self {
+ fn new(runtime_manager: RuntimeManager, config: Config) -> Self {
let (sender, receiver) = async_channel::unbounded();
let app_heartbeat_timeout_min =
config.app_heartbeat_timeout_min.unwrap_or(10);
- let store = Arc::new(StoreProvider::get(config.clone()));
+ let store = Arc::new(StoreProvider::get(runtime_manager.clone(),
config.clone()));
store.clone().start();
let manager = AppManager {
apps: DashMap::new(),
@@ -374,11 +375,11 @@ impl AppManager {
}
impl AppManager {
- pub fn get_ref(config: Config) -> AppManagerRef {
- let app_ref = Arc::new(AppManager::new(config));
+ pub fn get_ref(runtime_manager: RuntimeManager, config: Config) ->
AppManagerRef {
+ let app_ref = Arc::new(AppManager::new(runtime_manager.clone(),
config));
let app_manager_ref_cloned = app_ref.clone();
- tokio::spawn(async move {
+ runtime_manager.default_runtime.spawn(async move {
info!("Starting app heartbeat checker...");
loop {
// task1: find out heartbeat timeout apps
@@ -409,7 +410,7 @@ impl AppManager {
});
let app_manager_cloned = app_ref.clone();
- tokio::spawn(async move {
+ runtime_manager.default_runtime.spawn(async move {
info!("Starting purge event handler...");
while let Ok(event) = app_manager_cloned.receiver.recv().await {
GAUGE_APP_NUMBER.dec();
@@ -587,7 +588,7 @@ mod test {
async fn app_put_get_purge_test() {
let app_id = "app_put_get_purge_test-----id";
- let app_manager_ref = AppManager::get_ref(mock_config()).clone();
+ let app_manager_ref = AppManager::get_ref(Default::default(),
mock_config()).clone();
app_manager_ref.register(app_id.clone().into(), 1).unwrap();
if let Some(app) = app_manager_ref.get_app("app_id".into()) {
@@ -653,7 +654,7 @@ mod test {
#[tokio::test]
async fn app_manager_test() {
- let app_manager_ref = AppManager::get_ref(mock_config()).clone();
+ let app_manager_ref = AppManager::get_ref(Default::default(),
mock_config()).clone();
app_manager_ref.register("app_id".into(), 1).unwrap();
if let Some(app) = app_manager_ref.get_app("app_id".into()) {
assert_eq!("app_id", app.app_id);
@@ -664,7 +665,7 @@ mod test {
async fn test_get_or_put_block_ids() {
let app_id = "test_get_or_put_block_ids-----id".to_string();
- let app_manager_ref = AppManager::get_ref(mock_config()).clone();
+ let app_manager_ref = AppManager::get_ref(Default::default(),
mock_config()).clone();
app_manager_ref.register(app_id.clone().into(), 1).unwrap();
let app = app_manager_ref.get_app(app_id.as_ref()).unwrap();
diff --git a/rust/experimental/server/src/config.rs
b/rust/experimental/server/src/config.rs
index 8a5dedc30..ed2864693 100644
--- a/rust/experimental/server/src/config.rs
+++ b/rust/experimental/server/src/config.rs
@@ -74,6 +74,30 @@ impl LocalfileStoreConfig {
// =========================================================
+#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
+#[serde(default)]
+pub struct RuntimeConfig {
+ pub read_thread_num: usize,
+ pub write_thread_num: usize,
+ pub grpc_thread_num: usize,
+ pub http_thread_num: usize,
+ pub default_thread_num: usize,
+}
+
+impl Default for RuntimeConfig {
+ fn default() -> Self {
+ RuntimeConfig {
+ read_thread_num: 10,
+ write_thread_num: 10,
+ grpc_thread_num: 20,
+ http_thread_num: 5,
+ default_thread_num: 5,
+ }
+ }
+}
+
+// =========================================================
+
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct HybridStoreConfig {
pub memory_spill_high_watermark: f32,
@@ -112,6 +136,10 @@ impl Default for HybridStoreConfig {
}
}
+fn as_default_runtime_config() -> RuntimeConfig {
+ RuntimeConfig::default()
+}
+
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Default)]
pub struct Config {
pub memory_store: Option<MemoryStoreConfig>,
@@ -121,6 +149,9 @@ pub struct Config {
pub store_type: Option<StorageType>,
+ #[serde(default = "as_default_runtime_config")]
+ pub runtime_config: RuntimeConfig,
+
pub metrics: Option<MetricsConfig>,
pub grpc_port: Option<i32>,
@@ -221,7 +252,7 @@ impl Config {
#[cfg(test)]
mod test {
- use crate::config::{Config, StorageType};
+ use crate::config::{Config, RuntimeConfig, StorageType};
use crate::readable_size::ReadableSize;
use std::str::FromStr;
@@ -261,5 +292,10 @@ mod test {
let capacity =
ReadableSize::from_str(&decoded.memory_store.unwrap().capacity).unwrap();
assert_eq!(1024 * 1024 * 1024, capacity.as_bytes());
+
+ assert_eq!(
+ decoded.runtime_config.read_thread_num,
+ RuntimeConfig::default().read_thread_num
+ );
}
}
diff --git a/rust/experimental/server/src/http/http_service.rs
b/rust/experimental/server/src/http/http_service.rs
index 0b6fd86dc..5e83dcd5e 100644
--- a/rust/experimental/server/src/http/http_service.rs
+++ b/rust/experimental/server/src/http/http_service.rs
@@ -26,7 +26,7 @@ use poem::{get, Route, RouteMethod, Server};
use std::sync::Mutex;
use crate::http::{HTTPServer, Handler};
-
+use crate::runtime::manager::RuntimeManager;
impl ResponseError for WorkerError {
fn status(&self) -> StatusCode {
StatusCode::BAD_REQUEST
@@ -61,13 +61,13 @@ impl PoemHTTPServer {
}
impl HTTPServer for PoemHTTPServer {
- fn start(&self, port: u16) {
+ fn start(&self, runtime_manager: RuntimeManager, port: u16) {
let mut app = Route::new();
let handlers = self.handlers.lock().unwrap();
for handler in handlers.iter() {
app = app.at(handler.get_route_path(), handler.get_route_method());
}
- tokio::spawn(async move {
+ runtime_manager.http_runtime.spawn(async move {
let _ = Server::new(TcpListener::bind(format!("0.0.0.0:{}", port)))
.name("uniffle-server-http-service")
.run(app)
diff --git a/rust/experimental/server/src/http/mod.rs
b/rust/experimental/server/src/http/mod.rs
index b9a31887f..a5e121972 100644
--- a/rust/experimental/server/src/http/mod.rs
+++ b/rust/experimental/server/src/http/mod.rs
@@ -26,9 +26,9 @@ use crate::http::http_service::PoemHTTPServer;
use crate::http::jeprof::JeProfHandler;
use crate::http::metrics::MetricsHTTPHandler;
use crate::http::pprof::PProfHandler;
+use crate::runtime::manager::RuntimeManager;
use once_cell::sync::Lazy;
use poem::RouteMethod;
-
pub static HTTP_SERVICE: Lazy<Box<PoemHTTPServer>> = Lazy::new(||
new_server());
/// Implement the own handlers for concrete components
@@ -38,7 +38,7 @@ pub trait Handler {
}
pub trait HTTPServer: Send + Sync {
- fn start(&self, port: u16);
+ fn start(&self, runtime_manager: RuntimeManager, port: u16);
fn register_handler(&self, handler: impl Handler + 'static);
}
diff --git a/rust/experimental/server/src/lib.rs
b/rust/experimental/server/src/lib.rs
index b8946ce45..2ec1657b5 100644
--- a/rust/experimental/server/src/lib.rs
+++ b/rust/experimental/server/src/lib.rs
@@ -27,13 +27,15 @@ mod mem_allocator;
pub mod metric;
pub mod proto;
pub mod readable_size;
+pub mod runtime;
+pub mod signal;
pub mod store;
pub mod util;
use crate::app::AppManager;
use crate::grpc::DefaultShuffleServer;
use crate::http::{HTTPServer, HTTP_SERVICE};
-use crate::metric::configure_metric_service;
+use crate::metric::init_metric_service;
use crate::proto::uniffle::shuffle_server_client::ShuffleServerClient;
use crate::proto::uniffle::shuffle_server_server::ShuffleServerServer;
use crate::proto::uniffle::{
@@ -41,6 +43,7 @@ use crate::proto::uniffle::{
RequireBufferRequest, SendShuffleDataRequest, ShuffleBlock, ShuffleData,
ShuffleRegisterRequest,
};
+use crate::runtime::manager::RuntimeManager;
use crate::util::gen_worker_uid;
use anyhow::Result;
use bytes::{Buf, Bytes, BytesMut};
@@ -53,13 +56,18 @@ pub async fn start_uniffle_worker(config: config::Config)
-> Result<()> {
let rpc_port = config.grpc_port.unwrap_or(19999);
let worker_uid = gen_worker_uid(rpc_port);
let metric_config = config.metrics.clone();
- configure_metric_service(&metric_config, worker_uid.clone());
+
+ let runtime_manager = RuntimeManager::from(config.runtime_config.clone());
+
+ init_metric_service(runtime_manager.clone(), &metric_config,
worker_uid.clone());
// start the http monitor service
let http_port = config.http_monitor_service_port.unwrap_or(20010);
- HTTP_SERVICE.start(http_port);
+ HTTP_SERVICE.start(runtime_manager.clone(), http_port);
+
// implement server startup
- tokio::spawn(async move {
- let app_manager_ref = AppManager::get_ref(config.clone());
+ let cloned_runtime_manager = runtime_manager.clone();
+ runtime_manager.grpc_runtime.spawn(async move {
+ let app_manager_ref = AppManager::get_ref(cloned_runtime_manager,
config.clone());
let rpc_port = config.grpc_port.unwrap_or(19999);
info!("Starting GRpc server with port:[{}] ......", rpc_port);
let shuffle_server = DefaultShuffleServer::from(app_manager_ref);
diff --git a/rust/experimental/server/src/main.rs
b/rust/experimental/server/src/main.rs
index af7a1bde0..5191e4084 100644
--- a/rust/experimental/server/src/main.rs
+++ b/rust/experimental/server/src/main.rs
@@ -18,19 +18,21 @@
#![feature(impl_trait_in_assoc_type)]
use crate::app::{AppManager, AppManagerRef};
+use crate::await_tree::AWAIT_TREE_REGISTRY;
use crate::config::{Config, LogConfig, RotationConfig};
use crate::grpc::grpc_middleware::AwaitTreeMiddlewareLayer;
use crate::grpc::{DefaultShuffleServer, MAX_CONNECTION_WINDOW_SIZE,
STREAM_WINDOW_SIZE};
use crate::http::{HTTPServer, HTTP_SERVICE};
-use crate::metric::configure_metric_service;
+use crate::metric::init_metric_service;
use crate::proto::uniffle::coordinator_server_client::CoordinatorServerClient;
use crate::proto::uniffle::shuffle_server_server::ShuffleServerServer;
use crate::proto::uniffle::{ShuffleServerHeartBeatRequest, ShuffleServerId};
+use crate::runtime::manager::RuntimeManager;
+use crate::signal::details::wait_for_signal;
use crate::util::{gen_worker_uid, get_local_ip};
+
use anyhow::Result;
use log::info;
-
-use crate::await_tree::AWAIT_TREE_REGISTRY;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::time::Duration;
use tonic::transport::{Channel, Server};
@@ -49,19 +51,22 @@ mod mem_allocator;
mod metric;
pub mod proto;
mod readable_size;
+pub mod runtime;
+pub mod signal;
pub mod store;
mod util;
const DEFAULT_SHUFFLE_SERVER_TAG: &str = "ss_v4";
-async fn schedule_coordinator_report(
+fn start_coordinator_report(
+ runtime_manager: RuntimeManager,
app_manager: AppManagerRef,
coordinator_quorum: Vec<String>,
grpc_port: i32,
tags: Vec<String>,
worker_uid: String,
) -> anyhow::Result<()> {
- tokio::spawn(async move {
+ runtime_manager.default_runtime.spawn(async move {
let ip = get_local_ip().unwrap().to_string();
info!("machine ip: {}", ip.clone());
@@ -160,10 +165,11 @@ fn init_log(log: &LogConfig) -> WorkerGuard {
_guard
}
-#[tokio::main]
-async fn main() -> Result<()> {
+fn main() -> Result<()> {
let config = Config::create_from_env();
+ let runtime_manager = RuntimeManager::from(config.runtime_config.clone());
+
// init log
let log_config = &config.log.clone().unwrap_or(Default::default());
let _guard = init_log(log_config);
@@ -172,26 +178,26 @@ async fn main() -> Result<()> {
let worker_uid = gen_worker_uid(rpc_port);
let metric_config = config.metrics.clone();
- configure_metric_service(&metric_config, worker_uid.clone());
+ init_metric_service(runtime_manager.clone(), &metric_config,
worker_uid.clone());
let coordinator_quorum = config.coordinator_quorum.clone();
let tags = config.tags.clone().unwrap_or(vec![]);
- let app_manager_ref = AppManager::get_ref(config.clone());
- let _ = schedule_coordinator_report(
+ let app_manager_ref = AppManager::get_ref(runtime_manager.clone(),
config.clone());
+ let _ = start_coordinator_report(
+ runtime_manager.clone(),
app_manager_ref.clone(),
coordinator_quorum,
rpc_port,
tags,
worker_uid,
- )
- .await;
+ );
let http_port = config.http_monitor_service_port.unwrap_or(20010);
info!(
"Starting http monitor service with port:[{}] ......",
http_port
);
- HTTP_SERVICE.start(http_port);
+ HTTP_SERVICE.start(runtime_manager.clone(), http_port);
info!("Starting GRpc server with port:[{}] ......", rpc_port);
let shuffle_server = DefaultShuffleServer::from(app_manager_ref);
@@ -199,16 +205,21 @@ async fn main() -> Result<()> {
let service = ShuffleServerServer::new(shuffle_server)
.max_decoding_message_size(usize::MAX)
.max_encoding_message_size(usize::MAX);
- Server::builder()
- .initial_connection_window_size(MAX_CONNECTION_WINDOW_SIZE)
- .initial_stream_window_size(STREAM_WINDOW_SIZE)
- .tcp_nodelay(true)
- .layer(AwaitTreeMiddlewareLayer::new_optional(Some(
- AWAIT_TREE_REGISTRY.clone(),
- )))
- .add_service(service)
- .serve(addr)
- .await?;
+
+ let _grpc_service_handle = runtime_manager.grpc_runtime.spawn(async move {
+ Server::builder()
+ .initial_connection_window_size(MAX_CONNECTION_WINDOW_SIZE)
+ .initial_stream_window_size(STREAM_WINDOW_SIZE)
+ .tcp_nodelay(true)
+ .layer(AwaitTreeMiddlewareLayer::new_optional(Some(
+ AWAIT_TREE_REGISTRY.clone(),
+ )))
+ .add_service(service)
+ .serve(addr)
+ .await
+ });
+
+ wait_for_signal();
Ok(())
}
diff --git a/rust/experimental/server/src/metric.rs
b/rust/experimental/server/src/metric.rs
index 8a7d811d6..e7aea859e 100644
--- a/rust/experimental/server/src/metric.rs
+++ b/rust/experimental/server/src/metric.rs
@@ -16,9 +16,13 @@
// under the License.
use crate::config::MetricsConfig;
+use crate::runtime::manager::RuntimeManager;
use log::{error, info};
use once_cell::sync::Lazy;
-use prometheus::{labels, Histogram, HistogramOpts, IntCounter, IntGauge,
Registry};
+use prometheus::{
+ labels, register_int_gauge_vec, Histogram, HistogramOpts, IntCounter,
IntGauge, IntGaugeVec,
+ Registry,
+};
use std::time::Duration;
const DEFAULT_BUCKETS: &[f64; 16] = &[
@@ -157,7 +161,33 @@ pub static TOTAL_HUGE_PARTITION_REQUIRE_BUFFER_FAILED:
Lazy<IntCounter> = Lazy::
.expect("metrics should be created")
});
+pub static GAUGE_RUNTIME_ALIVE_THREAD_NUM: Lazy<IntGaugeVec> = Lazy::new(|| {
+ register_int_gauge_vec!(
+ "runtime_thread_alive_gauge",
+ "alive thread number for runtime",
+ &["name"]
+ )
+ .unwrap()
+});
+
+pub static GAUGE_RUNTIME_IDLE_THREAD_NUM: Lazy<IntGaugeVec> = Lazy::new(|| {
+ register_int_gauge_vec!(
+ "runtime_thread_idle_gauge",
+ "idle thread number for runtime",
+ &["name"]
+ )
+ .unwrap()
+});
+
fn register_custom_metrics() {
+ REGISTRY
+ .register(Box::new(GAUGE_RUNTIME_ALIVE_THREAD_NUM.clone()))
+ .expect("");
+
+ REGISTRY
+ .register(Box::new(GAUGE_RUNTIME_IDLE_THREAD_NUM.clone()))
+ .expect("");
+
REGISTRY
.register(Box::new(TOTAL_RECEIVED_DATA.clone()))
.expect("total_received_data must be registered");
@@ -243,7 +273,11 @@ fn register_custom_metrics() {
.expect("grpc_get_memory_data_transport_time must be registered");
}
-pub fn configure_metric_service(metric_config: &Option<MetricsConfig>,
worker_uid: String) -> bool {
+pub fn init_metric_service(
+ runtime_manager: RuntimeManager,
+ metric_config: &Option<MetricsConfig>,
+ worker_uid: String,
+) -> bool {
if metric_config.is_none() {
return false;
}
@@ -258,7 +292,7 @@ pub fn configure_metric_service(metric_config:
&Option<MetricsConfig>, worker_ui
if let Some(ref _endpoint) = push_gateway_endpoint {
let push_interval_sec = cfg.push_interval_sec.unwrap_or(60);
- tokio::spawn(async move {
+ runtime_manager.default_runtime.spawn(async move {
info!("Starting prometheus metrics exporter...");
loop {
tokio::time::sleep(Duration::from_secs(push_interval_sec as
u64)).await;
diff --git a/rust/experimental/server/src/runtime/manager.rs
b/rust/experimental/server/src/runtime/manager.rs
new file mode 100644
index 000000000..0dbee3caa
--- /dev/null
+++ b/rust/experimental/server/src/runtime/manager.rs
@@ -0,0 +1,70 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::config::RuntimeConfig;
+use crate::runtime::{Builder, RuntimeRef};
+use std::future::Future;
+use std::sync::Arc;
+
+#[derive(Clone, Debug)]
+pub struct RuntimeManager {
+ // for reading data
+ pub read_runtime: RuntimeRef,
+ // for writing data
+ pub write_runtime: RuntimeRef,
+ // for grpc service
+ pub grpc_runtime: RuntimeRef,
+ // for http monitor service
+ pub http_runtime: RuntimeRef,
+ // the default runtime for not important tasks.
+ // like the data purging/ heartbeat / metric push
+ pub default_runtime: RuntimeRef,
+}
+
+impl Default for RuntimeManager {
+ fn default() -> Self {
+ RuntimeManager::from(Default::default())
+ }
+}
+
+impl RuntimeManager {
+ pub fn from(config: RuntimeConfig) -> Self {
+ fn create_runtime(pool_size: usize, name: &str) -> RuntimeRef {
+ Arc::new(
+ Builder::default()
+ .worker_threads(pool_size as usize)
+ .thread_name(name)
+ .enable_all()
+ .build()
+ .unwrap(),
+ )
+ }
+
+ Self {
+ read_runtime: create_runtime(config.read_thread_num,
"read_thread_pool"),
+ write_runtime: create_runtime(config.write_thread_num,
"write_thread_pool"),
+ grpc_runtime: create_runtime(config.grpc_thread_num,
"grpc_thread_pool"),
+ http_runtime: create_runtime(config.http_thread_num,
"http_thread_pool"),
+ default_runtime: create_runtime(config.default_thread_num,
"default_thread_pool"),
+ }
+ }
+
+ // for test cases to wait the future
+ pub fn wait<F: Future>(&self, future: F) -> F::Output {
+ self.default_runtime.block_on(future)
+ }
+}
diff --git a/rust/experimental/server/src/runtime/metrics.rs
b/rust/experimental/server/src/runtime/metrics.rs
new file mode 100644
index 000000000..9a40edd25
--- /dev/null
+++ b/rust/experimental/server/src/runtime/metrics.rs
@@ -0,0 +1,54 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::metric::{GAUGE_RUNTIME_ALIVE_THREAD_NUM,
GAUGE_RUNTIME_IDLE_THREAD_NUM};
+use prometheus::IntGauge;
+
+#[derive(Debug)]
+pub struct Metrics {
+ pub thread_alive_gauge: IntGauge,
+ pub thread_idle_gauge: IntGauge,
+}
+
+impl Metrics {
+ pub fn new(name: &str) -> Self {
+ Self {
+ thread_alive_gauge:
GAUGE_RUNTIME_ALIVE_THREAD_NUM.with_label_values(&[name]),
+ thread_idle_gauge:
GAUGE_RUNTIME_IDLE_THREAD_NUM.with_label_values(&[name]),
+ }
+ }
+
+ #[inline]
+ pub fn on_thread_start(&self) {
+ self.thread_alive_gauge.inc();
+ }
+
+ #[inline]
+ pub fn on_thread_stop(&self) {
+ self.thread_alive_gauge.dec();
+ }
+
+ #[inline]
+ pub fn on_thread_park(&self) {
+ self.thread_idle_gauge.inc();
+ }
+
+ #[inline]
+ pub fn on_thread_unpark(&self) {
+ self.thread_idle_gauge.dec();
+ }
+}
diff --git a/rust/experimental/server/src/runtime/mod.rs
b/rust/experimental/server/src/runtime/mod.rs
new file mode 100644
index 000000000..257f29ffa
--- /dev/null
+++ b/rust/experimental/server/src/runtime/mod.rs
@@ -0,0 +1,264 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+pub mod manager;
+mod metrics;
+
+use crate::runtime::metrics::Metrics;
+use anyhow::anyhow;
+use pin_project_lite::pin_project;
+use std::fmt::Debug;
+use std::{
+ future::Future,
+ pin::Pin,
+ sync::Arc,
+ task::{Context, Poll},
+};
+use tokio::runtime::Builder as TokioRuntimeBuilder;
+use tokio::runtime::Runtime as TokioRuntime;
+use tokio::task::JoinHandle as TokioJoinHandle;
+
+pub type RuntimeRef = Arc<Runtime>;
+
+#[derive(Debug)]
+pub struct Runtime {
+ rt: TokioRuntime,
+ metrics: Arc<Metrics>,
+}
+
+impl Runtime {
+ pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
+ where
+ F: Future + Send + 'static,
+ F::Output: Send + 'static,
+ {
+ JoinHandle {
+ inner: self.rt.spawn(future),
+ }
+ }
+
+ pub fn spawn_blocking<F, R>(&self, func: F) -> JoinHandle<R>
+ where
+ F: FnOnce() -> R + Send + 'static,
+ R: Send + 'static,
+ {
+ JoinHandle {
+ inner: self.rt.spawn_blocking(func),
+ }
+ }
+
+ pub fn block_on<F: Future>(&self, future: F) -> F::Output {
+ self.rt.block_on(future)
+ }
+
+ pub fn stats(&self) -> RuntimeStats {
+ RuntimeStats {
+ alive_thread_num: self.metrics.thread_alive_gauge.get(),
+ idle_thread_num: self.metrics.thread_idle_gauge.get(),
+ }
+ }
+}
+
+#[derive(Debug)]
+pub struct RuntimeStats {
+ pub alive_thread_num: i64,
+ pub idle_thread_num: i64,
+}
+
+pin_project! {
+ #[derive(Debug)]
+ pub struct JoinHandle<T> {
+ #[pin]
+ inner: TokioJoinHandle<T>,
+ }
+}
+
+impl<T> JoinHandle<T> {
+ pub fn abort(&self) {
+ self.inner.abort();
+ }
+}
+
+impl<T> Future for JoinHandle<T> {
+ type Output = Result<T, anyhow::Error>;
+
+ fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output>
{
+ let this = self.project();
+ this.inner
+ .poll(ctx)
+ .map_err(|_source| anyhow!("errors on polling for future."))
+ }
+}
+
+pub struct Builder {
+ thread_name: String,
+ builder: TokioRuntimeBuilder,
+}
+
+impl Default for Builder {
+ fn default() -> Self {
+ Self {
+ thread_name: "runtime-worker".to_string(),
+ builder: TokioRuntimeBuilder::new_multi_thread(),
+ }
+ }
+}
+
+fn with_metrics<F>(metrics: &Arc<Metrics>, f: F) -> impl Fn()
+where
+ F: Fn(&Arc<Metrics>) + 'static,
+{
+ let m = metrics.clone();
+ move || {
+ f(&m);
+ }
+}
+
+impl Builder {
+ /// Sets the number of worker threads the Runtime will use.
+ ///
+ /// This can be any number above 0
+ pub fn worker_threads(&mut self, val: usize) -> &mut Self {
+ self.builder.worker_threads(val);
+ self
+ }
+
+ /// Sets name of threads spawned by the Runtime thread pool
+ pub fn thread_name(&mut self, val: impl Into<String>) -> &mut Self {
+ self.thread_name = val.into();
+ self
+ }
+
+ /// Enable all feature of the underlying runtime
+ pub fn enable_all(&mut self) -> &mut Self {
+ self.builder.enable_all();
+ self
+ }
+
+ pub fn build(&mut self) -> anyhow::Result<Runtime> {
+ let metrics = Arc::new(Metrics::new(&self.thread_name));
+
+ let rt = self
+ .builder
+ .thread_name(self.thread_name.clone())
+ .on_thread_start(with_metrics(&metrics, |m| {
+ m.on_thread_start();
+ }))
+ .on_thread_stop(with_metrics(&metrics, |m| {
+ m.on_thread_stop();
+ }))
+ .on_thread_park(with_metrics(&metrics, |m| {
+ m.on_thread_park();
+ }))
+ .on_thread_unpark(with_metrics(&metrics, |m| {
+ m.on_thread_unpark();
+ }))
+ .build()?;
+
+ Ok(Runtime { rt, metrics })
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use crate::runtime::{Builder, Runtime};
+ use std::sync::Arc;
+ use std::thread;
+ use std::time::Duration;
+
+ fn create_runtime(pool_size: usize, name: &str) -> Arc<Runtime> {
+ let runtime = Builder::default()
+ .worker_threads(pool_size as usize)
+ .thread_name(name)
+ .enable_all()
+ .build();
+ assert!(runtime.is_ok());
+ Arc::new(runtime.unwrap())
+ }
+
+ #[test]
+ fn test_metrics() {
+ let runtime = create_runtime(2usize, "test_metrics");
+ thread::sleep(Duration::from_millis(60));
+
+ let stats = runtime.stats();
+ assert_eq!(2, stats.idle_thread_num);
+ assert_eq!(2, stats.alive_thread_num);
+
+ runtime.spawn(async {
+ thread::sleep(Duration::from_millis(1000));
+ });
+
+ // waiting the task is invoked.
+ thread::sleep(Duration::from_millis(50));
+
+ let stats = runtime.stats();
+ assert_eq!(2, stats.alive_thread_num);
+ assert_eq!(1, stats.idle_thread_num);
+ }
+
+ #[test]
+ fn test_nested_spawn() {
+ let runtime = create_runtime(4usize, "test_nested_spawn");
+ let cloned_rt = runtime.clone();
+
+ let handle = runtime.spawn(async move {
+ let mut counter = 0;
+ for _ in 0..3 {
+ counter += cloned_rt
+ .spawn(async move {
+ thread::sleep(Duration::from_millis(50));
+ 1
+ })
+ .await
+ .unwrap()
+ }
+ counter
+ });
+
+ assert_eq!(3, runtime.block_on(handle).unwrap())
+ }
+
+ #[test]
+ fn test_spawn_block() {
+ let runtime = create_runtime(2usize, "test_spawn_block");
+
+ let handle = runtime.spawn(async {
+ thread::sleep(Duration::from_millis(50));
+ 1
+ });
+
+ assert_eq!(1, runtime.block_on(handle).unwrap());
+ }
+
+ #[test]
+ fn test_spawn() {
+ let runtime = create_runtime(2usize, "test_spawn");
+
+ let rt_cloned = runtime.clone();
+
+ let res = runtime.block_on(async {
+ rt_cloned
+ .spawn_blocking(move || {
+ thread::sleep(Duration::from_millis(100));
+ 2
+ })
+ .await
+ });
+ assert_eq!(res.unwrap(), 2);
+ }
+}
diff --git a/rust/experimental/server/src/signal.rs
b/rust/experimental/server/src/signal.rs
new file mode 100644
index 000000000..aa96c3ec7
--- /dev/null
+++ b/rust/experimental/server/src/signal.rs
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#[cfg(unix)]
+pub mod details {
+ use log::info;
+ use signal_hook::{consts::TERM_SIGNALS, iterator::Signals};
+
+ pub fn wait_for_signal() {
+ let mut sigs = Signals::new(TERM_SIGNALS).expect("Failed to register
signal handlers");
+
+ for signal in &mut sigs {
+ if TERM_SIGNALS.contains(&signal) {
+ info!("Received signal {}, stopping server...", signal);
+ break;
+ }
+ }
+ }
+}
+
+#[cfg(not(unix))]
+pub mod details {
+ use std::thread;
+ use std::time::Duration;
+
+ // todo: this should be handled elegantly.
+ pub fn wait_for_signal() {
+ while 1 {
+ thread::sleep(Duration::from_millis(20));
+ }
+ }
+}
diff --git a/rust/experimental/server/src/store/hybrid.rs
b/rust/experimental/server/src/store/hybrid.rs
index 6dca1b009..93dcc9016 100644
--- a/rust/experimental/server/src/store/hybrid.rs
+++ b/rust/experimental/server/src/store/hybrid.rs
@@ -50,6 +50,7 @@ use await_tree::InstrumentAwait;
use std::str::FromStr;
use std::sync::Arc;
+use crate::runtime::manager::RuntimeManager;
use tokio::sync::{Mutex, Semaphore};
trait PersistentStore: Store + Persistent + Send + Sync {}
@@ -79,6 +80,8 @@ pub struct HybridStore {
memory_watermark_flush_trigger_sender: async_channel::Sender<()>,
memory_watermark_flush_trigger_recv: async_channel::Receiver<()>,
+
+ runtime_manager: RuntimeManager,
}
struct SpillMessage {
@@ -90,7 +93,7 @@ unsafe impl Send for HybridStore {}
unsafe impl Sync for HybridStore {}
impl HybridStore {
- pub fn from(config: Config) -> Self {
+ pub fn from(config: Config, runtime_manager: RuntimeManager) -> Self {
let store_type = &config.store_type.unwrap_or(StorageType::MEMORY);
if !StorageType::contains_memory(&store_type) {
panic!("Storage type must contains memory.");
@@ -98,7 +101,8 @@ impl HybridStore {
let mut persistent_stores: VecDeque<Box<dyn PersistentStore>> =
VecDeque::with_capacity(2);
if StorageType::contains_localfile(&store_type) {
- let localfile_store =
LocalFileStore::from(config.localfile_store.unwrap());
+ let localfile_store =
+ LocalFileStore::from(config.localfile_store.unwrap(),
runtime_manager.clone());
persistent_stores.push_back(Box::new(localfile_store));
}
@@ -126,7 +130,10 @@ impl HybridStore {
let (watermark_flush_send, watermark_flush_recv) =
async_channel::unbounded();
let store = HybridStore {
- hot_store:
Box::new(MemoryStore::from(config.memory_store.unwrap())),
+ hot_store: Box::new(MemoryStore::from(
+ config.memory_store.unwrap(),
+ runtime_manager.clone(),
+ )),
warm_store: persistent_stores.pop_front(),
cold_store: persistent_stores.pop_front(),
config: hybrid_conf,
@@ -138,6 +145,7 @@ impl HybridStore {
memory_spill_max_concurrency,
memory_watermark_flush_trigger_sender: watermark_flush_send,
memory_watermark_flush_trigger_recv: watermark_flush_recv,
+ runtime_manager,
};
store
}
@@ -318,7 +326,7 @@ impl Store for HybridStore {
// the handler to accept watermark flush trigger
let hybrid_store = self.clone();
- tokio::spawn(async move {
+ self.runtime_manager.default_runtime.spawn(async move {
let store = hybrid_store.clone();
while let Ok(_) =
&store.memory_watermark_flush_trigger_recv.recv().await {
if let Err(e) = watermark_flush(store.clone()).await {
@@ -331,7 +339,7 @@ impl Store for HybridStore {
let store = self.clone();
let concurrency_limiter =
Arc::new(Semaphore::new(store.memory_spill_max_concurrency as
usize));
- tokio::spawn(async move {
+ self.runtime_manager.default_runtime.spawn(async move {
while let Ok(message) = store.memory_spill_recv.recv().await {
let await_root = await_tree_registry
.register(format!("hot->warm flush."))
@@ -344,33 +352,36 @@ impl Store for HybridStore {
TOTAL_MEMORY_SPILL_OPERATION.inc();
GAUGE_MEMORY_SPILL_OPERATION.inc();
let store_cloned = store.clone();
- tokio::spawn(await_root.instrument(async move {
- let mut size = 0u64;
- for block in &message.ctx.data_blocks {
- size += block.length as u64;
- }
- match store_cloned
- .memory_spill_to_persistent_store(message.ctx.clone(),
message.id)
- .await
- {
- Ok(msg) => {
-
store_cloned.hot_store.desc_to_in_flight_buffer_size(size);
- debug!("{}", msg)
+ store
+ .runtime_manager
+ .write_runtime
+ .spawn(await_root.instrument(async move {
+ let mut size = 0u64;
+ for block in &message.ctx.data_blocks {
+ size += block.length as u64;
}
- Err(error) => {
- TOTAL_MEMORY_SPILL_OPERATION_FAILED.inc();
- error!(
+ match store_cloned
+
.memory_spill_to_persistent_store(message.ctx.clone(), message.id)
+ .await
+ {
+ Ok(msg) => {
+
store_cloned.hot_store.desc_to_in_flight_buffer_size(size);
+ debug!("{}", msg)
+ }
+ Err(error) => {
+ TOTAL_MEMORY_SPILL_OPERATION_FAILED.inc();
+ error!(
"Errors on spill memory data to persistent
storage. error: {:#?}",
error
);
- // re-push to the queue to execute
- let _ =
store_cloned.memory_spill_send.send(message).await;
+ // re-push to the queue to execute
+ let _ =
store_cloned.memory_spill_send.send(message).await;
+ }
}
- }
- store_cloned.memory_spill_event_num.dec_by(1);
- GAUGE_MEMORY_SPILL_OPERATION.dec();
- drop(concurrency_guarder);
- }));
+ store_cloned.memory_spill_event_num.dec_by(1);
+ GAUGE_MEMORY_SPILL_OPERATION.dec();
+ drop(concurrency_guarder);
+ }));
}
});
}
@@ -535,6 +546,7 @@ mod tests {
use std::collections::VecDeque;
use std::sync::Arc;
+ use std::thread;
use std::time::Duration;
@@ -556,14 +568,16 @@ mod tests {
assert_eq!(false, is_apple(&Banana {}));
}
- #[tokio::test]
- async fn test_only_memory() {
+ #[test]
+ fn test_only_memory() {
let mut config = Config::default();
config.memory_store = Some(MemoryStoreConfig::new("20M".to_string()));
config.hybrid_store = Some(HybridStoreConfig::new(0.8, 0.2, None));
config.store_type = Some(StorageType::MEMORY);
- let store = HybridStore::from(config);
- assert_eq!(true, store.is_healthy().await.unwrap());
+ let store = HybridStore::from(config, Default::default());
+
+ let runtime = store.runtime_manager.clone();
+ assert_eq!(true, runtime.wait(store.is_healthy()).unwrap());
}
#[test]
@@ -599,7 +613,7 @@ mod tests {
// The hybrid store will flush the memory data to file when
// the data reaches the number of 4
- let store = Arc::new(HybridStore::from(config));
+ let store = Arc::new(HybridStore::from(config, Default::default()));
store
}
@@ -630,8 +644,8 @@ mod tests {
block_ids
}
- #[tokio::test]
- async fn single_buffer_spill_test() -> anyhow::Result<()> {
+ #[test]
+ fn single_buffer_spill_test() -> anyhow::Result<()> {
let data = b"hello world!";
let data_len = data.len();
@@ -641,33 +655,37 @@ mod tests {
);
store.clone().start();
+ let runtime = store.runtime_manager.clone();
+
let uid = PartitionedUId {
app_id: "1000".to_string(),
shuffle_id: 0,
partition_id: 0,
};
- let expected_block_ids =
- write_some_data(store.clone(), uid.clone(), data_len as i32, data,
100).await;
- tokio::time::sleep(Duration::from_secs(1)).await;
+ let expected_block_ids = runtime.wait(write_some_data(
+ store.clone(),
+ uid.clone(),
+ data_len as i32,
+ data,
+ 100,
+ ));
+
+ thread::sleep(Duration::from_secs(1));
// read from memory and then from localfile
- let response_data = store
- .get(ReadingViewContext {
- uid: uid.clone(),
- reading_options: MEMORY_LAST_BLOCK_ID_AND_MAX_SIZE(-1, 1024 *
1024 * 1024),
- })
- .await?;
+ let response_data = runtime.wait(store.get(ReadingViewContext {
+ uid: uid.clone(),
+ reading_options: MEMORY_LAST_BLOCK_ID_AND_MAX_SIZE(-1, 1024 * 1024
* 1024),
+ }))?;
let mut accepted_block_ids = vec![];
for segment in response_data.from_memory().shuffle_data_block_segments
{
accepted_block_ids.push(segment.block_id);
}
- let local_index_data = store
- .get_index(ReadingIndexViewContext {
- partition_id: uid.clone(),
- })
- .await?;
+ let local_index_data =
runtime.wait(store.get_index(ReadingIndexViewContext {
+ partition_id: uid.clone(),
+ }))?;
match local_index_data {
ResponseDataIndex::Local(index) => {
@@ -691,6 +709,7 @@ mod tests {
}
}
+ accepted_block_ids.sort();
assert_eq!(accepted_block_ids, expected_block_ids);
Ok(())
@@ -781,19 +800,26 @@ mod tests {
// then again.
}
- #[tokio::test]
- async fn test_insert_and_get_from_memory() {
+ #[test]
+ fn test_insert_and_get_from_memory() {
let data = b"hello world!";
let data_len = data.len();
let store = start_store(None, ((data_len * 1) as i64).to_string());
+ let runtime = store.runtime_manager.clone();
let uid = PartitionedUId {
app_id: "1000".to_string(),
shuffle_id: 0,
partition_id: 0,
};
- write_some_data(store.clone(), uid.clone(), data_len as i32, data,
4).await;
+ runtime.wait(write_some_data(
+ store.clone(),
+ uid.clone(),
+ data_len as i32,
+ data,
+ 4,
+ ));
let mut last_block_id = -1;
// read data one by one
for idx in 0..=10 {
@@ -805,7 +831,7 @@ mod tests {
),
};
- let read_data = store.get(reading_view_ctx).await;
+ let read_data = runtime.wait(store.get(reading_view_ctx));
if read_data.is_err() {
panic!();
}
diff --git a/rust/experimental/server/src/store/localfile.rs
b/rust/experimental/server/src/store/localfile.rs
index 2391a2642..e85745851 100644
--- a/rust/experimental/server/src/store/localfile.rs
+++ b/rust/experimental/server/src/store/localfile.rs
@@ -37,6 +37,7 @@ use dashmap::DashMap;
use log::{debug, error, info, warn};
+use crate::runtime::manager::RuntimeManager;
use std::io::SeekFrom;
use std::path::Path;
use std::sync::atomic::{AtomicBool, Ordering};
@@ -57,6 +58,8 @@ pub struct LocalFileStore {
partition_written_disk_map: DashMap<String, DashMap<i32, DashMap<i32,
Arc<LocalDisk>>>>,
partition_file_locks: DashMap<String, Arc<RwLock<()>>>,
healthy_check_min_disks: i32,
+
+ runtime_manager: RuntimeManager,
}
impl Persistent for LocalFileStore {}
@@ -65,20 +68,27 @@ unsafe impl Send for LocalFileStore {}
unsafe impl Sync for LocalFileStore {}
impl LocalFileStore {
+ // only for test cases
pub fn new(local_disks: Vec<String>) -> Self {
let mut local_disk_instances = vec![];
+ let runtime_manager: RuntimeManager = Default::default();
for path in local_disks {
- local_disk_instances.push(LocalDisk::new(path,
LocalDiskConfig::default()));
+ local_disk_instances.push(LocalDisk::new(
+ path,
+ LocalDiskConfig::default(),
+ runtime_manager.clone(),
+ ));
}
LocalFileStore {
local_disks: local_disk_instances,
partition_written_disk_map: DashMap::new(),
partition_file_locks: DashMap::new(),
healthy_check_min_disks: 1,
+ runtime_manager,
}
}
- pub fn from(localfile_config: LocalfileStoreConfig) -> Self {
+ pub fn from(localfile_config: LocalfileStoreConfig, runtime_manager:
RuntimeManager) -> Self {
let mut local_disk_instances = vec![];
for path in localfile_config.data_paths {
let config = LocalDiskConfig {
@@ -87,13 +97,14 @@ impl LocalFileStore {
max_concurrency:
localfile_config.disk_max_concurrency.unwrap_or(40),
};
- local_disk_instances.push(LocalDisk::new(path, config));
+ local_disk_instances.push(LocalDisk::new(path, config,
runtime_manager.clone()));
}
LocalFileStore {
local_disks: local_disk_instances,
partition_written_disk_map: DashMap::new(),
partition_file_locks: DashMap::new(),
healthy_check_min_disks:
localfile_config.healthy_check_min_disks.unwrap_or(1),
+ runtime_manager,
}
}
@@ -485,7 +496,7 @@ struct LocalDisk {
}
impl LocalDisk {
- fn new(path: String, config: LocalDiskConfig) -> Arc<Self> {
+ fn new(path: String, config: LocalDiskConfig, runtime_manager:
RuntimeManager) -> Arc<Self> {
create_directory_if_not_exists(&path);
let instance = LocalDisk {
base_path: path,
@@ -496,8 +507,9 @@ impl LocalDisk {
};
let instance = Arc::new(instance);
+ let runtime = runtime_manager.default_runtime.clone();
let cloned = instance.clone();
- tokio::spawn(async {
+ runtime.spawn(async {
info!(
"Starting the disk healthy checking, base path: {}",
&cloned.base_path
@@ -736,16 +748,20 @@ mod test {
use bytes::{Buf, Bytes, BytesMut};
use log::info;
+ use crate::runtime::manager::RuntimeManager;
use std::io::Read;
+ use std::thread;
use std::time::Duration;
- #[tokio::test]
- async fn purge_test() -> anyhow::Result<()> {
+ #[test]
+ fn purge_test() -> anyhow::Result<()> {
let temp_dir = tempdir::TempDir::new("test_local_store").unwrap();
let temp_path = temp_dir.path().to_str().unwrap().to_string();
println!("init local file path: {}", &temp_path);
let local_store = LocalFileStore::new(vec![temp_path.clone()]);
+ let runtime = local_store.runtime_manager.clone();
+
let app_id = "purge_test-app-id".to_string();
let uid = PartitionedUId {
app_id: app_id.clone(),
@@ -777,35 +793,36 @@ mod test {
],
};
- let insert_result = local_store.insert(writing_ctx).await;
+ let insert_result = runtime.wait(local_store.insert(writing_ctx));
if insert_result.is_err() {
println!("{:?}", insert_result.err());
panic!()
}
assert_eq!(
true,
- tokio::fs::try_exists(format!(
+ runtime.wait(tokio::fs::try_exists(format!(
"{}/{}/{}/partition-{}.data",
&temp_path, &app_id, "0", "0"
- ))
- .await?
+ )))?
);
- local_store.purge(app_id.clone()).await?;
+ runtime.wait(local_store.purge(app_id.clone()))?;
assert_eq!(
false,
- tokio::fs::try_exists(format!("{}/{}", &temp_path, &app_id)).await?
+ runtime.wait(tokio::fs::try_exists(format!("{}/{}", &temp_path,
&app_id)))?
);
Ok(())
}
- #[tokio::test]
- async fn local_store_test() {
+ #[test]
+ fn local_store_test() {
let temp_dir = tempdir::TempDir::new("test_local_store").unwrap();
let temp_path = temp_dir.path().to_str().unwrap().to_string();
info!("init local file path: {}", temp_path);
let mut local_store = LocalFileStore::new(vec![temp_path]);
+ let runtime = local_store.runtime_manager.clone();
+
let uid = PartitionedUId {
app_id: "100".to_string(),
shuffle_id: 0,
@@ -836,7 +853,7 @@ mod test {
],
};
- let insert_result = local_store.insert(writing_ctx).await;
+ let insert_result = runtime.wait(local_store.insert(writing_ctx));
if insert_result.is_err() {
println!("{:?}", insert_result.err());
panic!()
@@ -867,25 +884,29 @@ mod test {
}
// case1: read the one partition block data
- get_and_check_partitial_data(&mut local_store, uid.clone(), size as
i64, data).await;
+ runtime.wait(get_and_check_partitial_data(
+ &mut local_store,
+ uid.clone(),
+ size as i64,
+ data,
+ ));
// case2: read the complete block data
let mut expected = BytesMut::with_capacity(size * 2);
expected.extend_from_slice(data);
expected.extend_from_slice(data);
- get_and_check_partitial_data(
+ runtime.wait(get_and_check_partitial_data(
&mut local_store,
uid.clone(),
size as i64 * 2,
expected.freeze().as_ref(),
- )
- .await;
+ ));
// case3: get the index data
let reading_index_view_ctx = ReadingIndexViewContext {
partition_id: uid.clone(),
};
- let result = local_store.get_index(reading_index_view_ctx).await;
+ let result =
runtime.wait(local_store.get_index(reading_index_view_ctx));
if result.is_err() {
panic!()
}
@@ -913,65 +934,84 @@ mod test {
temp_dir.close().unwrap();
}
- #[tokio::test]
- async fn test_local_disk_delete_operation() {
+ #[test]
+ fn test_local_disk_delete_operation() {
let temp_dir =
tempdir::TempDir::new("test_local_disk_delete_operation-dir").unwrap();
let temp_path = temp_dir.path().to_str().unwrap().to_string();
println!("init the path: {}", &temp_path);
- let local_disk = LocalDisk::new(temp_path.clone(),
LocalDiskConfig::default());
+ let runtime: RuntimeManager = Default::default();
+ let local_disk = LocalDisk::new(
+ temp_path.clone(),
+ LocalDiskConfig::default(),
+ runtime.clone(),
+ );
let data = b"hello!";
- local_disk
- .write(Bytes::copy_from_slice(data), "a/b".to_string())
- .await
+ runtime
+ .wait(local_disk.write(Bytes::copy_from_slice(data),
"a/b".to_string()))
.unwrap();
assert_eq!(
true,
- tokio::fs::try_exists(format!("{}/{}", &temp_path,
"a/b".to_string()))
- .await
+ runtime
+ .wait(tokio::fs::try_exists(format!(
+ "{}/{}",
+ &temp_path,
+ "a/b".to_string()
+ )))
.unwrap()
);
- local_disk
- .delete("a/".to_string())
- .await
+ runtime
+ .wait(local_disk.delete("a/".to_string()))
.expect("TODO: panic message");
assert_eq!(
false,
- tokio::fs::try_exists(format!("{}/{}", &temp_path,
"a/b".to_string()))
- .await
+ runtime
+ .wait(tokio::fs::try_exists(format!(
+ "{}/{}",
+ &temp_path,
+ "a/b".to_string()
+ )))
.unwrap()
);
}
- #[tokio::test]
- async fn local_disk_corruption_healthy_check() {
+ #[test]
+ fn local_disk_corruption_healthy_check() {
let temp_dir = tempdir::TempDir::new("test_directory").unwrap();
let temp_path = temp_dir.path().to_str().unwrap().to_string();
- let local_disk = LocalDisk::new(temp_path.clone(),
LocalDiskConfig::create_mocked_config());
+ let local_disk = LocalDisk::new(
+ temp_path.clone(),
+ LocalDiskConfig::create_mocked_config(),
+ Default::default(),
+ );
- tokio::time::sleep(Duration::from_secs(12)).await;
+ thread::sleep(Duration::from_secs(12));
assert_eq!(true, local_disk.is_healthy().unwrap());
assert_eq!(false, local_disk.is_corrupted().unwrap());
}
- #[tokio::test]
- async fn local_disk_test() {
+ #[test]
+ fn local_disk_test() {
let temp_dir = tempdir::TempDir::new("test_directory").unwrap();
let temp_path = temp_dir.path().to_str().unwrap().to_string();
- let local_disk = LocalDisk::new(temp_path.clone(),
LocalDiskConfig::default());
+ let runtime: RuntimeManager = Default::default();
+ let local_disk = LocalDisk::new(
+ temp_path.clone(),
+ LocalDiskConfig::default(),
+ runtime.clone(),
+ );
let data = b"Hello, World!";
let relative_path = "app-id/test_file.txt";
- let write_result = local_disk
- .write(Bytes::copy_from_slice(data), relative_path.to_string())
- .await;
+ let write_result =
+ runtime.wait(local_disk.write(Bytes::copy_from_slice(data),
relative_path.to_string()));
assert!(write_result.is_ok());
// test whether the content is written
@@ -982,14 +1022,15 @@ mod test {
assert_eq!(file_content, data);
// if the file has been created, append some content
- let write_result = local_disk
- .write(Bytes::copy_from_slice(data), relative_path.to_string())
- .await;
+ let write_result =
+ runtime.wait(local_disk.write(Bytes::copy_from_slice(data),
relative_path.to_string()));
assert!(write_result.is_ok());
- let read_result = local_disk
- .read(relative_path.to_string(), 0, Some(data.len() as i64 * 2))
- .await;
+ let read_result = runtime.wait(local_disk.read(
+ relative_path.to_string(),
+ 0,
+ Some(data.len() as i64 * 2),
+ ));
assert!(read_result.is_ok());
let read_data = read_result.unwrap();
let expected = b"Hello, World!Hello, World!";
diff --git a/rust/experimental/server/src/store/memory.rs
b/rust/experimental/server/src/store/memory.rs
index a380a7520..0d0852d0b 100644
--- a/rust/experimental/server/src/store/memory.rs
+++ b/rust/experimental/server/src/store/memory.rs
@@ -58,12 +58,14 @@ pub struct MemoryStore {
buffer_ticket_timeout_sec: i64,
buffer_ticket_check_interval_sec: i64,
in_flush_buffer_size: AtomicU64,
+ runtime_manager: RuntimeManager,
}
unsafe impl Send for MemoryStore {}
unsafe impl Sync for MemoryStore {}
impl MemoryStore {
+ // only for test cases
pub fn new(max_memory_size: i64) -> Self {
MemoryStore {
state: DashMap::new(),
@@ -73,10 +75,11 @@ impl MemoryStore {
buffer_ticket_timeout_sec: 5 * 60,
buffer_ticket_check_interval_sec: 10,
in_flush_buffer_size: Default::default(),
+ runtime_manager: Default::default(),
}
}
- pub fn from(conf: MemoryStoreConfig) -> Self {
+ pub fn from(conf: MemoryStoreConfig, runtime_manager: RuntimeManager) ->
Self {
let capacity = ReadableSize::from_str(&conf.capacity).unwrap();
MemoryStore {
state: DashMap::new(),
@@ -86,6 +89,7 @@ impl MemoryStore {
buffer_ticket_timeout_sec:
conf.buffer_ticket_timeout_sec.unwrap_or(5 * 60),
buffer_ticket_check_interval_sec: 10,
in_flush_buffer_size: Default::default(),
+ runtime_manager,
}
}
@@ -298,7 +302,7 @@ impl Store for MemoryStore {
fn start(self: Arc<Self>) {
// schedule check to find out the timeout allocated buffer ticket
let mem_store = self.clone();
- tokio::spawn(async move {
+ self.runtime_manager.default_runtime.spawn(async move {
loop {
mem_store.check_allocated_tickets().await;
delay_for(Duration::from_secs(
@@ -718,17 +722,19 @@ mod test {
use bytes::BytesMut;
use core::panic;
use std::sync::Arc;
+ use std::thread;
use std::time::Duration;
use crate::config::MemoryStoreConfig;
+ use crate::runtime::manager::RuntimeManager;
use anyhow::Result;
- use tokio::time::sleep as delay_for;
- #[tokio::test]
- async fn test_ticket_timeout() -> Result<()> {
+ #[test]
+ fn test_ticket_timeout() -> Result<()> {
let cfg = MemoryStoreConfig::from("2M".to_string(), 1);
+ let runtime_manager: RuntimeManager = Default::default();
+ let mut store = MemoryStore::from(cfg, runtime_manager.clone());
- let mut store = MemoryStore::from(cfg);
store.refresh_buffer_ticket_check_interval_sec(1);
let store = Arc::new(store);
@@ -736,69 +742,76 @@ mod test {
let app_id = "mocked-app-id";
let ctx =
RequireBufferContext::new(PartitionedUId::from(app_id.to_string(), 1, 1), 1000);
- let resp = store.require_buffer(ctx.clone()).await?;
+ let resp = runtime_manager.wait(store.require_buffer(ctx.clone()))?;
assert!(store.is_ticket_exist(app_id, resp.ticket_id));
- let snapshot = store.budget.snapshot().await;
+ let snapshot = runtime_manager.wait(store.budget.snapshot());
assert_eq!(snapshot.allocated, 1000);
assert_eq!(snapshot.used, 0);
- delay_for(Duration::from_secs(5)).await;
+ thread::sleep(Duration::from_secs(5));
assert!(!store.is_ticket_exist(app_id, resp.ticket_id));
- let snapshot = store.budget.snapshot().await;
+ let snapshot = runtime_manager.wait(store.budget.snapshot());
assert_eq!(snapshot.allocated, 0);
assert_eq!(snapshot.used, 0);
Ok(())
}
- #[tokio::test]
- async fn test_memory_buffer_ticket() -> Result<()> {
+ #[test]
+ fn test_memory_buffer_ticket() -> Result<()> {
let store = MemoryStore::new(1024 * 1000);
+ let runtime = store.runtime_manager.clone();
let app_id = "mocked-app-id";
let ctx =
RequireBufferContext::new(PartitionedUId::from(app_id.to_string(), 1, 1), 1000);
- let resp = store.require_buffer(ctx.clone()).await?;
+ let resp = runtime.wait(store.require_buffer(ctx.clone()))?;
let ticket_id_1 = resp.ticket_id;
- let resp = store.require_buffer(ctx.clone()).await?;
+ let resp = runtime.wait(store.require_buffer(ctx.clone()))?;
let ticket_id_2 = resp.ticket_id;
assert!(store.is_ticket_exist(app_id, ticket_id_1));
assert!(store.is_ticket_exist(app_id, ticket_id_2));
assert!(!store.is_ticket_exist(app_id, 100239));
- let snapshot = store.budget.snapshot().await;
+ let snapshot = runtime.wait(store.budget.snapshot());
assert_eq!(snapshot.allocated, 1000 * 2);
assert_eq!(snapshot.used, 0);
- store.purge(app_id.to_string()).await?;
+ runtime.wait(store.purge(app_id.to_string()))?;
- let snapshot = store.budget.snapshot().await;
+ let snapshot = runtime.wait(store.budget.snapshot());
assert_eq!(snapshot.allocated, 0);
assert_eq!(snapshot.used, 0);
Ok(())
}
- #[tokio::test]
- async fn test_read_buffer_in_flight() {
+ #[test]
+ fn test_read_buffer_in_flight() {
let store = MemoryStore::new(1024);
+ let runtime = store.runtime_manager.clone();
+
let uid = PartitionedUId {
app_id: "100".to_string(),
shuffle_id: 0,
partition_id: 0,
};
let writing_view_ctx = create_writing_ctx_with_blocks(10, 10,
uid.clone());
- let _ = store.insert(writing_view_ctx).await;
+ let _ = runtime.wait(store.insert(writing_view_ctx));
let default_single_read_size = 20;
// case1: read from -1
- let mem_data =
- get_data_with_last_block_id(default_single_read_size, -1, &store,
uid.clone()).await;
+ let mem_data = runtime.wait(get_data_with_last_block_id(
+ default_single_read_size,
+ -1,
+ &store,
+ uid.clone(),
+ ));
assert_eq!(2, mem_data.shuffle_data_block_segments.len());
assert_eq!(
0,
@@ -818,8 +831,12 @@ mod test {
);
// case2: when the last_block_id doesn't exist, it should return the
data like when last_block_id=-1
- let mem_data =
- get_data_with_last_block_id(default_single_read_size, 100, &store,
uid.clone()).await;
+ let mem_data = runtime.wait(get_data_with_last_block_id(
+ default_single_read_size,
+ 100,
+ &store,
+ uid.clone(),
+ ));
assert_eq!(2, mem_data.shuffle_data_block_segments.len());
assert_eq!(
0,
@@ -839,8 +856,12 @@ mod test {
);
// case3: read from 3
- let mem_data =
- get_data_with_last_block_id(default_single_read_size, 3, &store,
uid.clone()).await;
+ let mem_data = runtime.wait(get_data_with_last_block_id(
+ default_single_read_size,
+ 3,
+ &store,
+ uid.clone(),
+ ));
assert_eq!(2, mem_data.shuffle_data_block_segments.len());
assert_eq!(
4,
@@ -861,7 +882,7 @@ mod test {
// case4: some data are in inflight blocks
let buffer =
store.get_or_create_underlying_staging_buffer(uid.clone());
- let mut buffer = buffer.lock().await;
+ let mut buffer = runtime.wait(buffer.lock());
let owned = buffer.staging.to_owned();
buffer.staging.clear();
let mut idx = 0;
@@ -872,8 +893,12 @@ mod test {
drop(buffer);
// all data will be fetched from in_flight data
- let mem_data =
- get_data_with_last_block_id(default_single_read_size, 3, &store,
uid.clone()).await;
+ let mem_data = runtime.wait(get_data_with_last_block_id(
+ default_single_read_size,
+ 3,
+ &store,
+ uid.clone(),
+ ));
assert_eq!(2, mem_data.shuffle_data_block_segments.len());
assert_eq!(
4,
@@ -895,7 +920,7 @@ mod test {
// case5: old data in in_flight and latest data in staging.
// read it from the block id 9, and read size of 30
let buffer =
store.get_or_create_underlying_staging_buffer(uid.clone());
- let mut buffer = buffer.lock().await;
+ let mut buffer = runtime.wait(buffer.lock());
buffer.staging.push(PartitionedDataBlock {
block_id: 20,
length: 10,
@@ -906,7 +931,7 @@ mod test {
});
drop(buffer);
- let mem_data = get_data_with_last_block_id(30, 7, &store,
uid.clone()).await;
+ let mem_data = runtime.wait(get_data_with_last_block_id(30, 7, &store,
uid.clone()));
assert_eq!(3, mem_data.shuffle_data_block_segments.len());
assert_eq!(
8,
@@ -934,7 +959,7 @@ mod test {
);
// case6: read the end to return empty result
- let mem_data = get_data_with_last_block_id(30, 20, &store,
uid.clone()).await;
+ let mem_data = runtime.wait(get_data_with_last_block_id(30, 20,
&store, uid.clone()));
assert_eq!(0, mem_data.shuffle_data_block_segments.len());
}
@@ -980,9 +1005,11 @@ mod test {
WritingViewContext { uid, data_blocks }
}
- #[tokio::test]
- async fn test_allocated_and_purge_for_memory() {
+ #[test]
+ fn test_allocated_and_purge_for_memory() {
let store = MemoryStore::new(1024 * 1024 * 1024);
+ let runtime = store.runtime_manager.clone();
+
let ctx = RequireBufferContext {
uid: PartitionedUId {
app_id: "100".to_string(),
@@ -991,9 +1018,11 @@ mod test {
},
size: 10000,
};
- match store.require_buffer(ctx).await {
+ match runtime.default_runtime.block_on(store.require_buffer(ctx)) {
Ok(_) => {
- let _ = store.purge("100".to_string()).await;
+ let _ = runtime
+ .default_runtime
+ .block_on(store.purge("100".to_string()));
}
_ => panic!(),
}
@@ -1004,9 +1033,11 @@ mod test {
assert_eq!(1024 * 1024 * 1024, budget.capacity);
}
- #[tokio::test]
- async fn test_purge() -> Result<()> {
+ #[test]
+ fn test_purge() -> Result<()> {
let store = MemoryStore::new(1024);
+ let runtime = store.runtime_manager.clone();
+
let app_id = "purge_app";
let shuffle_id = 1;
let partition = 1;
@@ -1014,9 +1045,9 @@ mod test {
let uid = PartitionedUId::from(app_id.to_string(), shuffle_id,
partition);
// the buffer requested
- let _buffer = store
- .require_buffer(RequireBufferContext::new(uid.clone(), 40))
- .await
+
+ let _buffer = runtime
+ .wait(store.require_buffer(RequireBufferContext::new(uid.clone(),
40)))
.expect("");
let writing_ctx = WritingViewContext {
@@ -1030,31 +1061,33 @@ mod test {
task_attempt_id: 0,
}],
};
- store.insert(writing_ctx).await.expect("");
+ runtime.wait(store.insert(writing_ctx)).expect("");
let reading_ctx = ReadingViewContext {
uid: uid.clone(),
reading_options:
ReadingOptions::MEMORY_LAST_BLOCK_ID_AND_MAX_SIZE(-1, 1000000),
};
- let data = store.get(reading_ctx.clone()).await.expect("");
+ let data = runtime.wait(store.get(reading_ctx.clone())).expect("");
assert_eq!(1, data.from_memory().shuffle_data_block_segments.len());
// purge
- store.purge(app_id.to_string()).await.expect("");
- let snapshot = store.budget.snapshot().await;
+ runtime.wait(store.purge(app_id.to_string())).expect("");
+ let snapshot = runtime.wait(store.budget.snapshot());
assert_eq!(snapshot.used, 0);
// the remaining allocated will be removed.
assert_eq!(snapshot.allocated, 0);
assert_eq!(snapshot.capacity, 1024);
- let data = store.get(reading_ctx.clone()).await.expect("");
+ let data = runtime.wait(store.get(reading_ctx.clone())).expect("");
assert_eq!(0, data.from_memory().shuffle_data_block_segments.len());
Ok(())
}
- #[tokio::test]
- async fn test_put_and_get_for_memory() {
+ #[test]
+ fn test_put_and_get_for_memory() {
let store = MemoryStore::new(1024 * 1024 * 1024);
+ let runtime = store.runtime_manager.clone();
+
let writing_ctx = WritingViewContext {
uid: Default::default(),
data_blocks: vec![
@@ -1076,14 +1109,14 @@ mod test {
},
],
};
- store.insert(writing_ctx).await.unwrap();
+ runtime.wait(store.insert(writing_ctx)).unwrap();
let reading_ctx = ReadingViewContext {
uid: Default::default(),
reading_options:
ReadingOptions::MEMORY_LAST_BLOCK_ID_AND_MAX_SIZE(-1, 1000000),
};
- match store.get(reading_ctx).await.unwrap() {
+ match runtime.wait(store.get(reading_ctx)).unwrap() {
ResponseData::Mem(data) => {
assert_eq!(data.shuffle_data_block_segments.len(), 2);
assert_eq!(data.shuffle_data_block_segments.get(0).unwrap().offset, 0);
diff --git a/rust/experimental/server/src/store/mod.rs
b/rust/experimental/server/src/store/mod.rs
index 93b4bdd3f..9e23545b5 100644
--- a/rust/experimental/server/src/store/mod.rs
+++ b/rust/experimental/server/src/store/mod.rs
@@ -35,6 +35,7 @@ use anyhow::Result;
use async_trait::async_trait;
use bytes::Bytes;
+use crate::runtime::manager::RuntimeManager;
use std::sync::Arc;
#[derive(Debug)]
@@ -178,7 +179,7 @@ pub trait Persistent {}
pub struct StoreProvider {}
impl StoreProvider {
- pub fn get(config: Config) -> HybridStore {
- HybridStore::from(config)
+ pub fn get(runtime_manager: RuntimeManager, config: Config) -> HybridStore
{
+ HybridStore::from(config, runtime_manager)
}
}
diff --git a/rust/experimental/server/tests/write_read.rs
b/rust/experimental/server/tests/write_read.rs
index b08ecc2b1..499005a2d 100644
--- a/rust/experimental/server/tests/write_read.rs
+++ b/rust/experimental/server/tests/write_read.rs
@@ -41,6 +41,7 @@ mod tests {
hybrid_store: Some(HybridStoreConfig::new(0.9, 0.5, None)),
hdfs_store: None,
store_type: Some(StorageType::MEMORY_LOCALFILE),
+ runtime_config: Default::default(),
metrics: Some(MetricsConfig {
push_gateway_endpoint: None,
push_interval_sec: None,