This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 03358b418 [#1234] improvement(rust): separate runtimes for different 
overload (#1233)
03358b418 is described below

commit 03358b418356ed4e917d73a1acac7a1baeb990da
Author: Junfan Zhang <[email protected]>
AuthorDate: Mon Oct 16 10:04:02 2023 +0800

    [#1234] improvement(rust): separate runtimes for different overload (#1233)
    
    ### What changes were proposed in this pull request?
    
    In current codebase, we use the same tokio runtime for 
grpc/http/reading/flushing service, this is not
    stable for the different load.
    
    This PR is to introduce runtime manager to separate.
    
    ### Why are the changes needed?
    
    For #1234 , separate runtimes for different overload
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    UTs
---
 .gitignore                                        |   1 +
 rust/experimental/server/Cargo.lock               |   2 +
 rust/experimental/server/Cargo.toml               |   2 +
 rust/experimental/server/src/app.rs               |  19 +-
 rust/experimental/server/src/config.rs            |  38 +++-
 rust/experimental/server/src/http/http_service.rs |   6 +-
 rust/experimental/server/src/http/mod.rs          |   4 +-
 rust/experimental/server/src/lib.rs               |  18 +-
 rust/experimental/server/src/main.rs              |  57 +++--
 rust/experimental/server/src/metric.rs            |  40 +++-
 rust/experimental/server/src/runtime/manager.rs   |  70 ++++++
 rust/experimental/server/src/runtime/metrics.rs   |  54 +++++
 rust/experimental/server/src/runtime/mod.rs       | 264 ++++++++++++++++++++++
 rust/experimental/server/src/signal.rs            |  46 ++++
 rust/experimental/server/src/store/hybrid.rs      | 130 ++++++-----
 rust/experimental/server/src/store/localfile.rs   | 141 ++++++++----
 rust/experimental/server/src/store/memory.rs      | 133 +++++++----
 rust/experimental/server/src/store/mod.rs         |   5 +-
 rust/experimental/server/tests/write_read.rs      |   1 +
 19 files changed, 831 insertions(+), 200 deletions(-)

diff --git a/.gitignore b/.gitignore
index ac86fc6c4..359b8e83c 100644
--- a/.gitignore
+++ b/.gitignore
@@ -48,4 +48,5 @@ deploy/kubernetes/integration-test/e2e/rss-controller.yaml
 deploy/kubernetes/integration-test/e2e/rss-webhook.yaml
 rust/experimental/server/target
 rust/experimental/server/.idea
+rust/experimental/server/._target
 rust/experimental/server/src/proto/uniffle.rs
diff --git a/rust/experimental/server/Cargo.lock 
b/rust/experimental/server/Cargo.lock
index 172363b76..73344c2bb 100644
--- a/rust/experimental/server/Cargo.lock
+++ b/rust/experimental/server/Cargo.lock
@@ -3188,12 +3188,14 @@ dependencies = [
  "hyper",
  "log",
  "once_cell",
+ "pin-project-lite",
  "poem",
  "pprof",
  "prometheus",
  "prost",
  "prost-build",
  "serde",
+ "signal-hook",
  "tempdir",
  "tempfile",
  "thiserror",
diff --git a/rust/experimental/server/Cargo.toml 
b/rust/experimental/server/Cargo.toml
index a71aa497d..9f03bd468 100644
--- a/rust/experimental/server/Cargo.toml
+++ b/rust/experimental/server/Cargo.toml
@@ -91,6 +91,8 @@ tower = { version = "0.4", features = ["util", "load-shed"] }
 hyper = "0.14"
 tokio-console = "0.1.8"
 console-subscriber = "0.1.9"
+pin-project-lite = "0.2.8"
+signal-hook = "0.3.17"
 
 [dependencies.hdrs]
 version = "0.3.0"
diff --git a/rust/experimental/server/src/app.rs 
b/rust/experimental/server/src/app.rs
index 9afbdeeeb..9c1151dfc 100644
--- a/rust/experimental/server/src/app.rs
+++ b/rust/experimental/server/src/app.rs
@@ -50,6 +50,7 @@ use std::sync::atomic::{AtomicU64, Ordering};
 use std::sync::Arc;
 use std::time::Duration;
 
+use crate::runtime::manager::RuntimeManager;
 use tokio::sync::RwLock;
 
 #[derive(Debug, Clone)]
@@ -356,10 +357,10 @@ pub struct AppManager {
 }
 
 impl AppManager {
-    fn new(config: Config) -> Self {
+    fn new(runtime_manager: RuntimeManager, config: Config) -> Self {
         let (sender, receiver) = async_channel::unbounded();
         let app_heartbeat_timeout_min = 
config.app_heartbeat_timeout_min.unwrap_or(10);
-        let store = Arc::new(StoreProvider::get(config.clone()));
+        let store = Arc::new(StoreProvider::get(runtime_manager.clone(), 
config.clone()));
         store.clone().start();
         let manager = AppManager {
             apps: DashMap::new(),
@@ -374,11 +375,11 @@ impl AppManager {
 }
 
 impl AppManager {
-    pub fn get_ref(config: Config) -> AppManagerRef {
-        let app_ref = Arc::new(AppManager::new(config));
+    pub fn get_ref(runtime_manager: RuntimeManager, config: Config) -> 
AppManagerRef {
+        let app_ref = Arc::new(AppManager::new(runtime_manager.clone(), 
config));
         let app_manager_ref_cloned = app_ref.clone();
 
-        tokio::spawn(async move {
+        runtime_manager.default_runtime.spawn(async move {
             info!("Starting app heartbeat checker...");
             loop {
                 // task1: find out heartbeat timeout apps
@@ -409,7 +410,7 @@ impl AppManager {
         });
 
         let app_manager_cloned = app_ref.clone();
-        tokio::spawn(async move {
+        runtime_manager.default_runtime.spawn(async move {
             info!("Starting purge event handler...");
             while let Ok(event) = app_manager_cloned.receiver.recv().await {
                 GAUGE_APP_NUMBER.dec();
@@ -587,7 +588,7 @@ mod test {
     async fn app_put_get_purge_test() {
         let app_id = "app_put_get_purge_test-----id";
 
-        let app_manager_ref = AppManager::get_ref(mock_config()).clone();
+        let app_manager_ref = AppManager::get_ref(Default::default(), 
mock_config()).clone();
         app_manager_ref.register(app_id.clone().into(), 1).unwrap();
 
         if let Some(app) = app_manager_ref.get_app("app_id".into()) {
@@ -653,7 +654,7 @@ mod test {
 
     #[tokio::test]
     async fn app_manager_test() {
-        let app_manager_ref = AppManager::get_ref(mock_config()).clone();
+        let app_manager_ref = AppManager::get_ref(Default::default(), 
mock_config()).clone();
         app_manager_ref.register("app_id".into(), 1).unwrap();
         if let Some(app) = app_manager_ref.get_app("app_id".into()) {
             assert_eq!("app_id", app.app_id);
@@ -664,7 +665,7 @@ mod test {
     async fn test_get_or_put_block_ids() {
         let app_id = "test_get_or_put_block_ids-----id".to_string();
 
-        let app_manager_ref = AppManager::get_ref(mock_config()).clone();
+        let app_manager_ref = AppManager::get_ref(Default::default(), 
mock_config()).clone();
         app_manager_ref.register(app_id.clone().into(), 1).unwrap();
 
         let app = app_manager_ref.get_app(app_id.as_ref()).unwrap();
diff --git a/rust/experimental/server/src/config.rs 
b/rust/experimental/server/src/config.rs
index 8a5dedc30..ed2864693 100644
--- a/rust/experimental/server/src/config.rs
+++ b/rust/experimental/server/src/config.rs
@@ -74,6 +74,30 @@ impl LocalfileStoreConfig {
 
 // =========================================================
 
+#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
+#[serde(default)]
+pub struct RuntimeConfig {
+    pub read_thread_num: usize,
+    pub write_thread_num: usize,
+    pub grpc_thread_num: usize,
+    pub http_thread_num: usize,
+    pub default_thread_num: usize,
+}
+
+impl Default for RuntimeConfig {
+    fn default() -> Self {
+        RuntimeConfig {
+            read_thread_num: 10,
+            write_thread_num: 10,
+            grpc_thread_num: 20,
+            http_thread_num: 5,
+            default_thread_num: 5,
+        }
+    }
+}
+
+// =========================================================
+
 #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
 pub struct HybridStoreConfig {
     pub memory_spill_high_watermark: f32,
@@ -112,6 +136,10 @@ impl Default for HybridStoreConfig {
     }
 }
 
+fn as_default_runtime_config() -> RuntimeConfig {
+    RuntimeConfig::default()
+}
+
 #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Default)]
 pub struct Config {
     pub memory_store: Option<MemoryStoreConfig>,
@@ -121,6 +149,9 @@ pub struct Config {
 
     pub store_type: Option<StorageType>,
 
+    #[serde(default = "as_default_runtime_config")]
+    pub runtime_config: RuntimeConfig,
+
     pub metrics: Option<MetricsConfig>,
 
     pub grpc_port: Option<i32>,
@@ -221,7 +252,7 @@ impl Config {
 
 #[cfg(test)]
 mod test {
-    use crate::config::{Config, StorageType};
+    use crate::config::{Config, RuntimeConfig, StorageType};
     use crate::readable_size::ReadableSize;
     use std::str::FromStr;
 
@@ -261,5 +292,10 @@ mod test {
 
         let capacity = 
ReadableSize::from_str(&decoded.memory_store.unwrap().capacity).unwrap();
         assert_eq!(1024 * 1024 * 1024, capacity.as_bytes());
+
+        assert_eq!(
+            decoded.runtime_config.read_thread_num,
+            RuntimeConfig::default().read_thread_num
+        );
     }
 }
diff --git a/rust/experimental/server/src/http/http_service.rs 
b/rust/experimental/server/src/http/http_service.rs
index 0b6fd86dc..5e83dcd5e 100644
--- a/rust/experimental/server/src/http/http_service.rs
+++ b/rust/experimental/server/src/http/http_service.rs
@@ -26,7 +26,7 @@ use poem::{get, Route, RouteMethod, Server};
 use std::sync::Mutex;
 
 use crate::http::{HTTPServer, Handler};
-
+use crate::runtime::manager::RuntimeManager;
 impl ResponseError for WorkerError {
     fn status(&self) -> StatusCode {
         StatusCode::BAD_REQUEST
@@ -61,13 +61,13 @@ impl PoemHTTPServer {
 }
 
 impl HTTPServer for PoemHTTPServer {
-    fn start(&self, port: u16) {
+    fn start(&self, runtime_manager: RuntimeManager, port: u16) {
         let mut app = Route::new();
         let handlers = self.handlers.lock().unwrap();
         for handler in handlers.iter() {
             app = app.at(handler.get_route_path(), handler.get_route_method());
         }
-        tokio::spawn(async move {
+        runtime_manager.http_runtime.spawn(async move {
             let _ = Server::new(TcpListener::bind(format!("0.0.0.0:{}", port)))
                 .name("uniffle-server-http-service")
                 .run(app)
diff --git a/rust/experimental/server/src/http/mod.rs 
b/rust/experimental/server/src/http/mod.rs
index b9a31887f..a5e121972 100644
--- a/rust/experimental/server/src/http/mod.rs
+++ b/rust/experimental/server/src/http/mod.rs
@@ -26,9 +26,9 @@ use crate::http::http_service::PoemHTTPServer;
 use crate::http::jeprof::JeProfHandler;
 use crate::http::metrics::MetricsHTTPHandler;
 use crate::http::pprof::PProfHandler;
+use crate::runtime::manager::RuntimeManager;
 use once_cell::sync::Lazy;
 use poem::RouteMethod;
-
 pub static HTTP_SERVICE: Lazy<Box<PoemHTTPServer>> = Lazy::new(|| 
new_server());
 
 /// Implement the own handlers for concrete components
@@ -38,7 +38,7 @@ pub trait Handler {
 }
 
 pub trait HTTPServer: Send + Sync {
-    fn start(&self, port: u16);
+    fn start(&self, runtime_manager: RuntimeManager, port: u16);
     fn register_handler(&self, handler: impl Handler + 'static);
 }
 
diff --git a/rust/experimental/server/src/lib.rs 
b/rust/experimental/server/src/lib.rs
index b8946ce45..2ec1657b5 100644
--- a/rust/experimental/server/src/lib.rs
+++ b/rust/experimental/server/src/lib.rs
@@ -27,13 +27,15 @@ mod mem_allocator;
 pub mod metric;
 pub mod proto;
 pub mod readable_size;
+pub mod runtime;
+pub mod signal;
 pub mod store;
 pub mod util;
 
 use crate::app::AppManager;
 use crate::grpc::DefaultShuffleServer;
 use crate::http::{HTTPServer, HTTP_SERVICE};
-use crate::metric::configure_metric_service;
+use crate::metric::init_metric_service;
 use crate::proto::uniffle::shuffle_server_client::ShuffleServerClient;
 use crate::proto::uniffle::shuffle_server_server::ShuffleServerServer;
 use crate::proto::uniffle::{
@@ -41,6 +43,7 @@ use crate::proto::uniffle::{
     RequireBufferRequest, SendShuffleDataRequest, ShuffleBlock, ShuffleData,
     ShuffleRegisterRequest,
 };
+use crate::runtime::manager::RuntimeManager;
 use crate::util::gen_worker_uid;
 use anyhow::Result;
 use bytes::{Buf, Bytes, BytesMut};
@@ -53,13 +56,18 @@ pub async fn start_uniffle_worker(config: config::Config) 
-> Result<()> {
     let rpc_port = config.grpc_port.unwrap_or(19999);
     let worker_uid = gen_worker_uid(rpc_port);
     let metric_config = config.metrics.clone();
-    configure_metric_service(&metric_config, worker_uid.clone());
+
+    let runtime_manager = RuntimeManager::from(config.runtime_config.clone());
+
+    init_metric_service(runtime_manager.clone(), &metric_config, 
worker_uid.clone());
     // start the http monitor service
     let http_port = config.http_monitor_service_port.unwrap_or(20010);
-    HTTP_SERVICE.start(http_port);
+    HTTP_SERVICE.start(runtime_manager.clone(), http_port);
+
     // implement server startup
-    tokio::spawn(async move {
-        let app_manager_ref = AppManager::get_ref(config.clone());
+    let cloned_runtime_manager = runtime_manager.clone();
+    runtime_manager.grpc_runtime.spawn(async move {
+        let app_manager_ref = AppManager::get_ref(cloned_runtime_manager, 
config.clone());
         let rpc_port = config.grpc_port.unwrap_or(19999);
         info!("Starting GRpc server with port:[{}] ......", rpc_port);
         let shuffle_server = DefaultShuffleServer::from(app_manager_ref);
diff --git a/rust/experimental/server/src/main.rs 
b/rust/experimental/server/src/main.rs
index af7a1bde0..5191e4084 100644
--- a/rust/experimental/server/src/main.rs
+++ b/rust/experimental/server/src/main.rs
@@ -18,19 +18,21 @@
 #![feature(impl_trait_in_assoc_type)]
 
 use crate::app::{AppManager, AppManagerRef};
+use crate::await_tree::AWAIT_TREE_REGISTRY;
 use crate::config::{Config, LogConfig, RotationConfig};
 use crate::grpc::grpc_middleware::AwaitTreeMiddlewareLayer;
 use crate::grpc::{DefaultShuffleServer, MAX_CONNECTION_WINDOW_SIZE, 
STREAM_WINDOW_SIZE};
 use crate::http::{HTTPServer, HTTP_SERVICE};
-use crate::metric::configure_metric_service;
+use crate::metric::init_metric_service;
 use crate::proto::uniffle::coordinator_server_client::CoordinatorServerClient;
 use crate::proto::uniffle::shuffle_server_server::ShuffleServerServer;
 use crate::proto::uniffle::{ShuffleServerHeartBeatRequest, ShuffleServerId};
+use crate::runtime::manager::RuntimeManager;
+use crate::signal::details::wait_for_signal;
 use crate::util::{gen_worker_uid, get_local_ip};
+
 use anyhow::Result;
 use log::info;
-
-use crate::await_tree::AWAIT_TREE_REGISTRY;
 use std::net::{IpAddr, Ipv4Addr, SocketAddr};
 use std::time::Duration;
 use tonic::transport::{Channel, Server};
@@ -49,19 +51,22 @@ mod mem_allocator;
 mod metric;
 pub mod proto;
 mod readable_size;
+pub mod runtime;
+pub mod signal;
 pub mod store;
 mod util;
 
 const DEFAULT_SHUFFLE_SERVER_TAG: &str = "ss_v4";
 
-async fn schedule_coordinator_report(
+fn start_coordinator_report(
+    runtime_manager: RuntimeManager,
     app_manager: AppManagerRef,
     coordinator_quorum: Vec<String>,
     grpc_port: i32,
     tags: Vec<String>,
     worker_uid: String,
 ) -> anyhow::Result<()> {
-    tokio::spawn(async move {
+    runtime_manager.default_runtime.spawn(async move {
         let ip = get_local_ip().unwrap().to_string();
 
         info!("machine ip: {}", ip.clone());
@@ -160,10 +165,11 @@ fn init_log(log: &LogConfig) -> WorkerGuard {
     _guard
 }
 
-#[tokio::main]
-async fn main() -> Result<()> {
+fn main() -> Result<()> {
     let config = Config::create_from_env();
 
+    let runtime_manager = RuntimeManager::from(config.runtime_config.clone());
+
     // init log
     let log_config = &config.log.clone().unwrap_or(Default::default());
     let _guard = init_log(log_config);
@@ -172,26 +178,26 @@ async fn main() -> Result<()> {
     let worker_uid = gen_worker_uid(rpc_port);
 
     let metric_config = config.metrics.clone();
-    configure_metric_service(&metric_config, worker_uid.clone());
+    init_metric_service(runtime_manager.clone(), &metric_config, 
worker_uid.clone());
 
     let coordinator_quorum = config.coordinator_quorum.clone();
     let tags = config.tags.clone().unwrap_or(vec![]);
-    let app_manager_ref = AppManager::get_ref(config.clone());
-    let _ = schedule_coordinator_report(
+    let app_manager_ref = AppManager::get_ref(runtime_manager.clone(), 
config.clone());
+    let _ = start_coordinator_report(
+        runtime_manager.clone(),
         app_manager_ref.clone(),
         coordinator_quorum,
         rpc_port,
         tags,
         worker_uid,
-    )
-    .await;
+    );
 
     let http_port = config.http_monitor_service_port.unwrap_or(20010);
     info!(
         "Starting http monitor service with port:[{}] ......",
         http_port
     );
-    HTTP_SERVICE.start(http_port);
+    HTTP_SERVICE.start(runtime_manager.clone(), http_port);
 
     info!("Starting GRpc server with port:[{}] ......", rpc_port);
     let shuffle_server = DefaultShuffleServer::from(app_manager_ref);
@@ -199,16 +205,21 @@ async fn main() -> Result<()> {
     let service = ShuffleServerServer::new(shuffle_server)
         .max_decoding_message_size(usize::MAX)
         .max_encoding_message_size(usize::MAX);
-    Server::builder()
-        .initial_connection_window_size(MAX_CONNECTION_WINDOW_SIZE)
-        .initial_stream_window_size(STREAM_WINDOW_SIZE)
-        .tcp_nodelay(true)
-        .layer(AwaitTreeMiddlewareLayer::new_optional(Some(
-            AWAIT_TREE_REGISTRY.clone(),
-        )))
-        .add_service(service)
-        .serve(addr)
-        .await?;
+
+    let _grpc_service_handle = runtime_manager.grpc_runtime.spawn(async move {
+        Server::builder()
+            .initial_connection_window_size(MAX_CONNECTION_WINDOW_SIZE)
+            .initial_stream_window_size(STREAM_WINDOW_SIZE)
+            .tcp_nodelay(true)
+            .layer(AwaitTreeMiddlewareLayer::new_optional(Some(
+                AWAIT_TREE_REGISTRY.clone(),
+            )))
+            .add_service(service)
+            .serve(addr)
+            .await
+    });
+
+    wait_for_signal();
 
     Ok(())
 }
diff --git a/rust/experimental/server/src/metric.rs 
b/rust/experimental/server/src/metric.rs
index 8a7d811d6..e7aea859e 100644
--- a/rust/experimental/server/src/metric.rs
+++ b/rust/experimental/server/src/metric.rs
@@ -16,9 +16,13 @@
 // under the License.
 
 use crate::config::MetricsConfig;
+use crate::runtime::manager::RuntimeManager;
 use log::{error, info};
 use once_cell::sync::Lazy;
-use prometheus::{labels, Histogram, HistogramOpts, IntCounter, IntGauge, 
Registry};
+use prometheus::{
+    labels, register_int_gauge_vec, Histogram, HistogramOpts, IntCounter, 
IntGauge, IntGaugeVec,
+    Registry,
+};
 use std::time::Duration;
 
 const DEFAULT_BUCKETS: &[f64; 16] = &[
@@ -157,7 +161,33 @@ pub static TOTAL_HUGE_PARTITION_REQUIRE_BUFFER_FAILED: 
Lazy<IntCounter> = Lazy::
     .expect("metrics should be created")
 });
 
+pub static GAUGE_RUNTIME_ALIVE_THREAD_NUM: Lazy<IntGaugeVec> = Lazy::new(|| {
+    register_int_gauge_vec!(
+        "runtime_thread_alive_gauge",
+        "alive thread number for runtime",
+        &["name"]
+    )
+    .unwrap()
+});
+
+pub static GAUGE_RUNTIME_IDLE_THREAD_NUM: Lazy<IntGaugeVec> = Lazy::new(|| {
+    register_int_gauge_vec!(
+        "runtime_thread_idle_gauge",
+        "idle thread number for runtime",
+        &["name"]
+    )
+    .unwrap()
+});
+
 fn register_custom_metrics() {
+    REGISTRY
+        .register(Box::new(GAUGE_RUNTIME_ALIVE_THREAD_NUM.clone()))
+        .expect("");
+
+    REGISTRY
+        .register(Box::new(GAUGE_RUNTIME_IDLE_THREAD_NUM.clone()))
+        .expect("");
+
     REGISTRY
         .register(Box::new(TOTAL_RECEIVED_DATA.clone()))
         .expect("total_received_data must be registered");
@@ -243,7 +273,11 @@ fn register_custom_metrics() {
         .expect("grpc_get_memory_data_transport_time must be registered");
 }
 
-pub fn configure_metric_service(metric_config: &Option<MetricsConfig>, 
worker_uid: String) -> bool {
+pub fn init_metric_service(
+    runtime_manager: RuntimeManager,
+    metric_config: &Option<MetricsConfig>,
+    worker_uid: String,
+) -> bool {
     if metric_config.is_none() {
         return false;
     }
@@ -258,7 +292,7 @@ pub fn configure_metric_service(metric_config: 
&Option<MetricsConfig>, worker_ui
 
     if let Some(ref _endpoint) = push_gateway_endpoint {
         let push_interval_sec = cfg.push_interval_sec.unwrap_or(60);
-        tokio::spawn(async move {
+        runtime_manager.default_runtime.spawn(async move {
             info!("Starting prometheus metrics exporter...");
             loop {
                 tokio::time::sleep(Duration::from_secs(push_interval_sec as 
u64)).await;
diff --git a/rust/experimental/server/src/runtime/manager.rs 
b/rust/experimental/server/src/runtime/manager.rs
new file mode 100644
index 000000000..0dbee3caa
--- /dev/null
+++ b/rust/experimental/server/src/runtime/manager.rs
@@ -0,0 +1,70 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::config::RuntimeConfig;
+use crate::runtime::{Builder, RuntimeRef};
+use std::future::Future;
+use std::sync::Arc;
+
+#[derive(Clone, Debug)]
+pub struct RuntimeManager {
+    // for reading data
+    pub read_runtime: RuntimeRef,
+    // for writing data
+    pub write_runtime: RuntimeRef,
+    // for grpc service
+    pub grpc_runtime: RuntimeRef,
+    // for http monitor service
+    pub http_runtime: RuntimeRef,
+    // the default runtime for not important tasks.
+    // like the data purging/ heartbeat / metric push
+    pub default_runtime: RuntimeRef,
+}
+
+impl Default for RuntimeManager {
+    fn default() -> Self {
+        RuntimeManager::from(Default::default())
+    }
+}
+
+impl RuntimeManager {
+    pub fn from(config: RuntimeConfig) -> Self {
+        fn create_runtime(pool_size: usize, name: &str) -> RuntimeRef {
+            Arc::new(
+                Builder::default()
+                    .worker_threads(pool_size as usize)
+                    .thread_name(name)
+                    .enable_all()
+                    .build()
+                    .unwrap(),
+            )
+        }
+
+        Self {
+            read_runtime: create_runtime(config.read_thread_num, 
"read_thread_pool"),
+            write_runtime: create_runtime(config.write_thread_num, 
"write_thread_pool"),
+            grpc_runtime: create_runtime(config.grpc_thread_num, 
"grpc_thread_pool"),
+            http_runtime: create_runtime(config.http_thread_num, 
"http_thread_pool"),
+            default_runtime: create_runtime(config.default_thread_num, 
"default_thread_pool"),
+        }
+    }
+
+    // for test cases to wait the future
+    pub fn wait<F: Future>(&self, future: F) -> F::Output {
+        self.default_runtime.block_on(future)
+    }
+}
diff --git a/rust/experimental/server/src/runtime/metrics.rs 
b/rust/experimental/server/src/runtime/metrics.rs
new file mode 100644
index 000000000..9a40edd25
--- /dev/null
+++ b/rust/experimental/server/src/runtime/metrics.rs
@@ -0,0 +1,54 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::metric::{GAUGE_RUNTIME_ALIVE_THREAD_NUM, 
GAUGE_RUNTIME_IDLE_THREAD_NUM};
+use prometheus::IntGauge;
+
+#[derive(Debug)]
+pub struct Metrics {
+    pub thread_alive_gauge: IntGauge,
+    pub thread_idle_gauge: IntGauge,
+}
+
+impl Metrics {
+    pub fn new(name: &str) -> Self {
+        Self {
+            thread_alive_gauge: 
GAUGE_RUNTIME_ALIVE_THREAD_NUM.with_label_values(&[name]),
+            thread_idle_gauge: 
GAUGE_RUNTIME_IDLE_THREAD_NUM.with_label_values(&[name]),
+        }
+    }
+
+    #[inline]
+    pub fn on_thread_start(&self) {
+        self.thread_alive_gauge.inc();
+    }
+
+    #[inline]
+    pub fn on_thread_stop(&self) {
+        self.thread_alive_gauge.dec();
+    }
+
+    #[inline]
+    pub fn on_thread_park(&self) {
+        self.thread_idle_gauge.inc();
+    }
+
+    #[inline]
+    pub fn on_thread_unpark(&self) {
+        self.thread_idle_gauge.dec();
+    }
+}
diff --git a/rust/experimental/server/src/runtime/mod.rs 
b/rust/experimental/server/src/runtime/mod.rs
new file mode 100644
index 000000000..257f29ffa
--- /dev/null
+++ b/rust/experimental/server/src/runtime/mod.rs
@@ -0,0 +1,264 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+pub mod manager;
+mod metrics;
+
+use crate::runtime::metrics::Metrics;
+use anyhow::anyhow;
+use pin_project_lite::pin_project;
+use std::fmt::Debug;
+use std::{
+    future::Future,
+    pin::Pin,
+    sync::Arc,
+    task::{Context, Poll},
+};
+use tokio::runtime::Builder as TokioRuntimeBuilder;
+use tokio::runtime::Runtime as TokioRuntime;
+use tokio::task::JoinHandle as TokioJoinHandle;
+
+pub type RuntimeRef = Arc<Runtime>;
+
+#[derive(Debug)]
+pub struct Runtime {
+    rt: TokioRuntime,
+    metrics: Arc<Metrics>,
+}
+
+impl Runtime {
+    pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
+    where
+        F: Future + Send + 'static,
+        F::Output: Send + 'static,
+    {
+        JoinHandle {
+            inner: self.rt.spawn(future),
+        }
+    }
+
+    pub fn spawn_blocking<F, R>(&self, func: F) -> JoinHandle<R>
+    where
+        F: FnOnce() -> R + Send + 'static,
+        R: Send + 'static,
+    {
+        JoinHandle {
+            inner: self.rt.spawn_blocking(func),
+        }
+    }
+
+    pub fn block_on<F: Future>(&self, future: F) -> F::Output {
+        self.rt.block_on(future)
+    }
+
+    pub fn stats(&self) -> RuntimeStats {
+        RuntimeStats {
+            alive_thread_num: self.metrics.thread_alive_gauge.get(),
+            idle_thread_num: self.metrics.thread_idle_gauge.get(),
+        }
+    }
+}
+
+#[derive(Debug)]
+pub struct RuntimeStats {
+    pub alive_thread_num: i64,
+    pub idle_thread_num: i64,
+}
+
+pin_project! {
+    #[derive(Debug)]
+    pub struct JoinHandle<T> {
+        #[pin]
+        inner: TokioJoinHandle<T>,
+    }
+}
+
+impl<T> JoinHandle<T> {
+    pub fn abort(&self) {
+        self.inner.abort();
+    }
+}
+
+impl<T> Future for JoinHandle<T> {
+    type Output = Result<T, anyhow::Error>;
+
+    fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> 
{
+        let this = self.project();
+        this.inner
+            .poll(ctx)
+            .map_err(|_source| anyhow!("errors on polling for future."))
+    }
+}
+
+pub struct Builder {
+    thread_name: String,
+    builder: TokioRuntimeBuilder,
+}
+
+impl Default for Builder {
+    fn default() -> Self {
+        Self {
+            thread_name: "runtime-worker".to_string(),
+            builder: TokioRuntimeBuilder::new_multi_thread(),
+        }
+    }
+}
+
+fn with_metrics<F>(metrics: &Arc<Metrics>, f: F) -> impl Fn()
+where
+    F: Fn(&Arc<Metrics>) + 'static,
+{
+    let m = metrics.clone();
+    move || {
+        f(&m);
+    }
+}
+
+impl Builder {
+    /// Sets the number of worker threads the Runtime will use.
+    ///
+    /// This can be any number above 0
+    pub fn worker_threads(&mut self, val: usize) -> &mut Self {
+        self.builder.worker_threads(val);
+        self
+    }
+
+    /// Sets name of threads spawned by the Runtime thread pool
+    pub fn thread_name(&mut self, val: impl Into<String>) -> &mut Self {
+        self.thread_name = val.into();
+        self
+    }
+
+    /// Enable all feature of the underlying runtime
+    pub fn enable_all(&mut self) -> &mut Self {
+        self.builder.enable_all();
+        self
+    }
+
+    pub fn build(&mut self) -> anyhow::Result<Runtime> {
+        let metrics = Arc::new(Metrics::new(&self.thread_name));
+
+        let rt = self
+            .builder
+            .thread_name(self.thread_name.clone())
+            .on_thread_start(with_metrics(&metrics, |m| {
+                m.on_thread_start();
+            }))
+            .on_thread_stop(with_metrics(&metrics, |m| {
+                m.on_thread_stop();
+            }))
+            .on_thread_park(with_metrics(&metrics, |m| {
+                m.on_thread_park();
+            }))
+            .on_thread_unpark(with_metrics(&metrics, |m| {
+                m.on_thread_unpark();
+            }))
+            .build()?;
+
+        Ok(Runtime { rt, metrics })
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use crate::runtime::{Builder, Runtime};
+    use std::sync::Arc;
+    use std::thread;
+    use std::time::Duration;
+
+    fn create_runtime(pool_size: usize, name: &str) -> Arc<Runtime> {
+        let runtime = Builder::default()
+            .worker_threads(pool_size as usize)
+            .thread_name(name)
+            .enable_all()
+            .build();
+        assert!(runtime.is_ok());
+        Arc::new(runtime.unwrap())
+    }
+
+    #[test]
+    fn test_metrics() {
+        let runtime = create_runtime(2usize, "test_metrics");
+        thread::sleep(Duration::from_millis(60));
+
+        let stats = runtime.stats();
+        assert_eq!(2, stats.idle_thread_num);
+        assert_eq!(2, stats.alive_thread_num);
+
+        runtime.spawn(async {
+            thread::sleep(Duration::from_millis(1000));
+        });
+
+        // waiting the task is invoked.
+        thread::sleep(Duration::from_millis(50));
+
+        let stats = runtime.stats();
+        assert_eq!(2, stats.alive_thread_num);
+        assert_eq!(1, stats.idle_thread_num);
+    }
+
+    #[test]
+    fn test_nested_spawn() {
+        let runtime = create_runtime(4usize, "test_nested_spawn");
+        let cloned_rt = runtime.clone();
+
+        let handle = runtime.spawn(async move {
+            let mut counter = 0;
+            for _ in 0..3 {
+                counter += cloned_rt
+                    .spawn(async move {
+                        thread::sleep(Duration::from_millis(50));
+                        1
+                    })
+                    .await
+                    .unwrap()
+            }
+            counter
+        });
+
+        assert_eq!(3, runtime.block_on(handle).unwrap())
+    }
+
+    #[test]
+    fn test_spawn_block() {
+        let runtime = create_runtime(2usize, "test_spawn_block");
+
+        let handle = runtime.spawn(async {
+            thread::sleep(Duration::from_millis(50));
+            1
+        });
+
+        assert_eq!(1, runtime.block_on(handle).unwrap());
+    }
+
+    #[test]
+    fn test_spawn() {
+        let runtime = create_runtime(2usize, "test_spawn");
+
+        let rt_cloned = runtime.clone();
+
+        let res = runtime.block_on(async {
+            rt_cloned
+                .spawn_blocking(move || {
+                    thread::sleep(Duration::from_millis(100));
+                    2
+                })
+                .await
+        });
+        assert_eq!(res.unwrap(), 2);
+    }
+}
diff --git a/rust/experimental/server/src/signal.rs 
b/rust/experimental/server/src/signal.rs
new file mode 100644
index 000000000..aa96c3ec7
--- /dev/null
+++ b/rust/experimental/server/src/signal.rs
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#[cfg(unix)]
+pub mod details {
+    use log::info;
+    use signal_hook::{consts::TERM_SIGNALS, iterator::Signals};
+
+    pub fn wait_for_signal() {
+        let mut sigs = Signals::new(TERM_SIGNALS).expect("Failed to register 
signal handlers");
+
+        for signal in &mut sigs {
+            if TERM_SIGNALS.contains(&signal) {
+                info!("Received signal {}, stopping server...", signal);
+                break;
+            }
+        }
+    }
+}
+
+#[cfg(not(unix))]
+pub mod details {
+    use std::thread;
+    use std::time::Duration;
+
+    // todo: this should be handled elegantly.
+    pub fn wait_for_signal() {
+        while 1 {
+            thread::sleep(Duration::from_millis(20));
+        }
+    }
+}
diff --git a/rust/experimental/server/src/store/hybrid.rs 
b/rust/experimental/server/src/store/hybrid.rs
index 6dca1b009..93dcc9016 100644
--- a/rust/experimental/server/src/store/hybrid.rs
+++ b/rust/experimental/server/src/store/hybrid.rs
@@ -50,6 +50,7 @@ use await_tree::InstrumentAwait;
 use std::str::FromStr;
 use std::sync::Arc;
 
+use crate::runtime::manager::RuntimeManager;
 use tokio::sync::{Mutex, Semaphore};
 
 trait PersistentStore: Store + Persistent + Send + Sync {}
@@ -79,6 +80,8 @@ pub struct HybridStore {
 
     memory_watermark_flush_trigger_sender: async_channel::Sender<()>,
     memory_watermark_flush_trigger_recv: async_channel::Receiver<()>,
+
+    runtime_manager: RuntimeManager,
 }
 
 struct SpillMessage {
@@ -90,7 +93,7 @@ unsafe impl Send for HybridStore {}
 unsafe impl Sync for HybridStore {}
 
 impl HybridStore {
-    pub fn from(config: Config) -> Self {
+    pub fn from(config: Config, runtime_manager: RuntimeManager) -> Self {
         let store_type = &config.store_type.unwrap_or(StorageType::MEMORY);
         if !StorageType::contains_memory(&store_type) {
             panic!("Storage type must contains memory.");
@@ -98,7 +101,8 @@ impl HybridStore {
 
         let mut persistent_stores: VecDeque<Box<dyn PersistentStore>> = 
VecDeque::with_capacity(2);
         if StorageType::contains_localfile(&store_type) {
-            let localfile_store = 
LocalFileStore::from(config.localfile_store.unwrap());
+            let localfile_store =
+                LocalFileStore::from(config.localfile_store.unwrap(), 
runtime_manager.clone());
             persistent_stores.push_back(Box::new(localfile_store));
         }
 
@@ -126,7 +130,10 @@ impl HybridStore {
         let (watermark_flush_send, watermark_flush_recv) = 
async_channel::unbounded();
 
         let store = HybridStore {
-            hot_store: 
Box::new(MemoryStore::from(config.memory_store.unwrap())),
+            hot_store: Box::new(MemoryStore::from(
+                config.memory_store.unwrap(),
+                runtime_manager.clone(),
+            )),
             warm_store: persistent_stores.pop_front(),
             cold_store: persistent_stores.pop_front(),
             config: hybrid_conf,
@@ -138,6 +145,7 @@ impl HybridStore {
             memory_spill_max_concurrency,
             memory_watermark_flush_trigger_sender: watermark_flush_send,
             memory_watermark_flush_trigger_recv: watermark_flush_recv,
+            runtime_manager,
         };
         store
     }
@@ -318,7 +326,7 @@ impl Store for HybridStore {
 
         // the handler to accept watermark flush trigger
         let hybrid_store = self.clone();
-        tokio::spawn(async move {
+        self.runtime_manager.default_runtime.spawn(async move {
             let store = hybrid_store.clone();
             while let Ok(_) = 
&store.memory_watermark_flush_trigger_recv.recv().await {
                 if let Err(e) = watermark_flush(store.clone()).await {
@@ -331,7 +339,7 @@ impl Store for HybridStore {
         let store = self.clone();
         let concurrency_limiter =
             Arc::new(Semaphore::new(store.memory_spill_max_concurrency as 
usize));
-        tokio::spawn(async move {
+        self.runtime_manager.default_runtime.spawn(async move {
             while let Ok(message) = store.memory_spill_recv.recv().await {
                 let await_root = await_tree_registry
                     .register(format!("hot->warm flush."))
@@ -344,33 +352,36 @@ impl Store for HybridStore {
                 TOTAL_MEMORY_SPILL_OPERATION.inc();
                 GAUGE_MEMORY_SPILL_OPERATION.inc();
                 let store_cloned = store.clone();
-                tokio::spawn(await_root.instrument(async move {
-                    let mut size = 0u64;
-                    for block in &message.ctx.data_blocks {
-                        size += block.length as u64;
-                    }
-                    match store_cloned
-                        .memory_spill_to_persistent_store(message.ctx.clone(), 
message.id)
-                        .await
-                    {
-                        Ok(msg) => {
-                            
store_cloned.hot_store.desc_to_in_flight_buffer_size(size);
-                            debug!("{}", msg)
+                store
+                    .runtime_manager
+                    .write_runtime
+                    .spawn(await_root.instrument(async move {
+                        let mut size = 0u64;
+                        for block in &message.ctx.data_blocks {
+                            size += block.length as u64;
                         }
-                        Err(error) => {
-                            TOTAL_MEMORY_SPILL_OPERATION_FAILED.inc();
-                            error!(
+                        match store_cloned
+                            
.memory_spill_to_persistent_store(message.ctx.clone(), message.id)
+                            .await
+                        {
+                            Ok(msg) => {
+                                
store_cloned.hot_store.desc_to_in_flight_buffer_size(size);
+                                debug!("{}", msg)
+                            }
+                            Err(error) => {
+                                TOTAL_MEMORY_SPILL_OPERATION_FAILED.inc();
+                                error!(
                                 "Errors on spill memory data to persistent 
storage. error: {:#?}",
                                 error
                             );
-                            // re-push to the queue to execute
-                            let _ = 
store_cloned.memory_spill_send.send(message).await;
+                                // re-push to the queue to execute
+                                let _ = 
store_cloned.memory_spill_send.send(message).await;
+                            }
                         }
-                    }
-                    store_cloned.memory_spill_event_num.dec_by(1);
-                    GAUGE_MEMORY_SPILL_OPERATION.dec();
-                    drop(concurrency_guarder);
-                }));
+                        store_cloned.memory_spill_event_num.dec_by(1);
+                        GAUGE_MEMORY_SPILL_OPERATION.dec();
+                        drop(concurrency_guarder);
+                    }));
             }
         });
     }
@@ -535,6 +546,7 @@ mod tests {
     use std::collections::VecDeque;
 
     use std::sync::Arc;
+    use std::thread;
 
     use std::time::Duration;
 
@@ -556,14 +568,16 @@ mod tests {
         assert_eq!(false, is_apple(&Banana {}));
     }
 
-    #[tokio::test]
-    async fn test_only_memory() {
+    #[test]
+    fn test_only_memory() {
         let mut config = Config::default();
         config.memory_store = Some(MemoryStoreConfig::new("20M".to_string()));
         config.hybrid_store = Some(HybridStoreConfig::new(0.8, 0.2, None));
         config.store_type = Some(StorageType::MEMORY);
-        let store = HybridStore::from(config);
-        assert_eq!(true, store.is_healthy().await.unwrap());
+        let store = HybridStore::from(config, Default::default());
+
+        let runtime = store.runtime_manager.clone();
+        assert_eq!(true, runtime.wait(store.is_healthy()).unwrap());
     }
 
     #[test]
@@ -599,7 +613,7 @@ mod tests {
 
         // The hybrid store will flush the memory data to file when
         // the data reaches the number of 4
-        let store = Arc::new(HybridStore::from(config));
+        let store = Arc::new(HybridStore::from(config, Default::default()));
         store
     }
 
@@ -630,8 +644,8 @@ mod tests {
         block_ids
     }
 
-    #[tokio::test]
-    async fn single_buffer_spill_test() -> anyhow::Result<()> {
+    #[test]
+    fn single_buffer_spill_test() -> anyhow::Result<()> {
         let data = b"hello world!";
         let data_len = data.len();
 
@@ -641,33 +655,37 @@ mod tests {
         );
         store.clone().start();
 
+        let runtime = store.runtime_manager.clone();
+
         let uid = PartitionedUId {
             app_id: "1000".to_string(),
             shuffle_id: 0,
             partition_id: 0,
         };
-        let expected_block_ids =
-            write_some_data(store.clone(), uid.clone(), data_len as i32, data, 
100).await;
-        tokio::time::sleep(Duration::from_secs(1)).await;
+        let expected_block_ids = runtime.wait(write_some_data(
+            store.clone(),
+            uid.clone(),
+            data_len as i32,
+            data,
+            100,
+        ));
+
+        thread::sleep(Duration::from_secs(1));
 
         // read from memory and then from localfile
-        let response_data = store
-            .get(ReadingViewContext {
-                uid: uid.clone(),
-                reading_options: MEMORY_LAST_BLOCK_ID_AND_MAX_SIZE(-1, 1024 * 
1024 * 1024),
-            })
-            .await?;
+        let response_data = runtime.wait(store.get(ReadingViewContext {
+            uid: uid.clone(),
+            reading_options: MEMORY_LAST_BLOCK_ID_AND_MAX_SIZE(-1, 1024 * 1024 
* 1024),
+        }))?;
 
         let mut accepted_block_ids = vec![];
         for segment in response_data.from_memory().shuffle_data_block_segments 
{
             accepted_block_ids.push(segment.block_id);
         }
 
-        let local_index_data = store
-            .get_index(ReadingIndexViewContext {
-                partition_id: uid.clone(),
-            })
-            .await?;
+        let local_index_data = 
runtime.wait(store.get_index(ReadingIndexViewContext {
+            partition_id: uid.clone(),
+        }))?;
 
         match local_index_data {
             ResponseDataIndex::Local(index) => {
@@ -691,6 +709,7 @@ mod tests {
             }
         }
 
+        accepted_block_ids.sort();
         assert_eq!(accepted_block_ids, expected_block_ids);
 
         Ok(())
@@ -781,19 +800,26 @@ mod tests {
         // then again.
     }
 
-    #[tokio::test]
-    async fn test_insert_and_get_from_memory() {
+    #[test]
+    fn test_insert_and_get_from_memory() {
         let data = b"hello world!";
         let data_len = data.len();
 
         let store = start_store(None, ((data_len * 1) as i64).to_string());
+        let runtime = store.runtime_manager.clone();
 
         let uid = PartitionedUId {
             app_id: "1000".to_string(),
             shuffle_id: 0,
             partition_id: 0,
         };
-        write_some_data(store.clone(), uid.clone(), data_len as i32, data, 
4).await;
+        runtime.wait(write_some_data(
+            store.clone(),
+            uid.clone(),
+            data_len as i32,
+            data,
+            4,
+        ));
         let mut last_block_id = -1;
         // read data one by one
         for idx in 0..=10 {
@@ -805,7 +831,7 @@ mod tests {
                 ),
             };
 
-            let read_data = store.get(reading_view_ctx).await;
+            let read_data = runtime.wait(store.get(reading_view_ctx));
             if read_data.is_err() {
                 panic!();
             }
diff --git a/rust/experimental/server/src/store/localfile.rs 
b/rust/experimental/server/src/store/localfile.rs
index 2391a2642..e85745851 100644
--- a/rust/experimental/server/src/store/localfile.rs
+++ b/rust/experimental/server/src/store/localfile.rs
@@ -37,6 +37,7 @@ use dashmap::DashMap;
 
 use log::{debug, error, info, warn};
 
+use crate::runtime::manager::RuntimeManager;
 use std::io::SeekFrom;
 use std::path::Path;
 use std::sync::atomic::{AtomicBool, Ordering};
@@ -57,6 +58,8 @@ pub struct LocalFileStore {
     partition_written_disk_map: DashMap<String, DashMap<i32, DashMap<i32, 
Arc<LocalDisk>>>>,
     partition_file_locks: DashMap<String, Arc<RwLock<()>>>,
     healthy_check_min_disks: i32,
+
+    runtime_manager: RuntimeManager,
 }
 
 impl Persistent for LocalFileStore {}
@@ -65,20 +68,27 @@ unsafe impl Send for LocalFileStore {}
 unsafe impl Sync for LocalFileStore {}
 
 impl LocalFileStore {
+    // only for test cases
     pub fn new(local_disks: Vec<String>) -> Self {
         let mut local_disk_instances = vec![];
+        let runtime_manager: RuntimeManager = Default::default();
         for path in local_disks {
-            local_disk_instances.push(LocalDisk::new(path, 
LocalDiskConfig::default()));
+            local_disk_instances.push(LocalDisk::new(
+                path,
+                LocalDiskConfig::default(),
+                runtime_manager.clone(),
+            ));
         }
         LocalFileStore {
             local_disks: local_disk_instances,
             partition_written_disk_map: DashMap::new(),
             partition_file_locks: DashMap::new(),
             healthy_check_min_disks: 1,
+            runtime_manager,
         }
     }
 
-    pub fn from(localfile_config: LocalfileStoreConfig) -> Self {
+    pub fn from(localfile_config: LocalfileStoreConfig, runtime_manager: 
RuntimeManager) -> Self {
         let mut local_disk_instances = vec![];
         for path in localfile_config.data_paths {
             let config = LocalDiskConfig {
@@ -87,13 +97,14 @@ impl LocalFileStore {
                 max_concurrency: 
localfile_config.disk_max_concurrency.unwrap_or(40),
             };
 
-            local_disk_instances.push(LocalDisk::new(path, config));
+            local_disk_instances.push(LocalDisk::new(path, config, 
runtime_manager.clone()));
         }
         LocalFileStore {
             local_disks: local_disk_instances,
             partition_written_disk_map: DashMap::new(),
             partition_file_locks: DashMap::new(),
             healthy_check_min_disks: 
localfile_config.healthy_check_min_disks.unwrap_or(1),
+            runtime_manager,
         }
     }
 
@@ -485,7 +496,7 @@ struct LocalDisk {
 }
 
 impl LocalDisk {
-    fn new(path: String, config: LocalDiskConfig) -> Arc<Self> {
+    fn new(path: String, config: LocalDiskConfig, runtime_manager: 
RuntimeManager) -> Arc<Self> {
         create_directory_if_not_exists(&path);
         let instance = LocalDisk {
             base_path: path,
@@ -496,8 +507,9 @@ impl LocalDisk {
         };
         let instance = Arc::new(instance);
 
+        let runtime = runtime_manager.default_runtime.clone();
         let cloned = instance.clone();
-        tokio::spawn(async {
+        runtime.spawn(async {
             info!(
                 "Starting the disk healthy checking, base path: {}",
                 &cloned.base_path
@@ -736,16 +748,20 @@ mod test {
     use bytes::{Buf, Bytes, BytesMut};
     use log::info;
 
+    use crate::runtime::manager::RuntimeManager;
     use std::io::Read;
+    use std::thread;
     use std::time::Duration;
 
-    #[tokio::test]
-    async fn purge_test() -> anyhow::Result<()> {
+    #[test]
+    fn purge_test() -> anyhow::Result<()> {
         let temp_dir = tempdir::TempDir::new("test_local_store").unwrap();
         let temp_path = temp_dir.path().to_str().unwrap().to_string();
         println!("init local file path: {}", &temp_path);
         let local_store = LocalFileStore::new(vec![temp_path.clone()]);
 
+        let runtime = local_store.runtime_manager.clone();
+
         let app_id = "purge_test-app-id".to_string();
         let uid = PartitionedUId {
             app_id: app_id.clone(),
@@ -777,35 +793,36 @@ mod test {
             ],
         };
 
-        let insert_result = local_store.insert(writing_ctx).await;
+        let insert_result = runtime.wait(local_store.insert(writing_ctx));
         if insert_result.is_err() {
             println!("{:?}", insert_result.err());
             panic!()
         }
         assert_eq!(
             true,
-            tokio::fs::try_exists(format!(
+            runtime.wait(tokio::fs::try_exists(format!(
                 "{}/{}/{}/partition-{}.data",
                 &temp_path, &app_id, "0", "0"
-            ))
-            .await?
+            )))?
         );
-        local_store.purge(app_id.clone()).await?;
+        runtime.wait(local_store.purge(app_id.clone()))?;
         assert_eq!(
             false,
-            tokio::fs::try_exists(format!("{}/{}", &temp_path, &app_id)).await?
+            runtime.wait(tokio::fs::try_exists(format!("{}/{}", &temp_path, 
&app_id)))?
         );
 
         Ok(())
     }
 
-    #[tokio::test]
-    async fn local_store_test() {
+    #[test]
+    fn local_store_test() {
         let temp_dir = tempdir::TempDir::new("test_local_store").unwrap();
         let temp_path = temp_dir.path().to_str().unwrap().to_string();
         info!("init local file path: {}", temp_path);
         let mut local_store = LocalFileStore::new(vec![temp_path]);
 
+        let runtime = local_store.runtime_manager.clone();
+
         let uid = PartitionedUId {
             app_id: "100".to_string(),
             shuffle_id: 0,
@@ -836,7 +853,7 @@ mod test {
             ],
         };
 
-        let insert_result = local_store.insert(writing_ctx).await;
+        let insert_result = runtime.wait(local_store.insert(writing_ctx));
         if insert_result.is_err() {
             println!("{:?}", insert_result.err());
             panic!()
@@ -867,25 +884,29 @@ mod test {
         }
 
         // case1: read the one partition block data
-        get_and_check_partitial_data(&mut local_store, uid.clone(), size as 
i64, data).await;
+        runtime.wait(get_and_check_partitial_data(
+            &mut local_store,
+            uid.clone(),
+            size as i64,
+            data,
+        ));
 
         // case2: read the complete block data
         let mut expected = BytesMut::with_capacity(size * 2);
         expected.extend_from_slice(data);
         expected.extend_from_slice(data);
-        get_and_check_partitial_data(
+        runtime.wait(get_and_check_partitial_data(
             &mut local_store,
             uid.clone(),
             size as i64 * 2,
             expected.freeze().as_ref(),
-        )
-        .await;
+        ));
 
         // case3: get the index data
         let reading_index_view_ctx = ReadingIndexViewContext {
             partition_id: uid.clone(),
         };
-        let result = local_store.get_index(reading_index_view_ctx).await;
+        let result = 
runtime.wait(local_store.get_index(reading_index_view_ctx));
         if result.is_err() {
             panic!()
         }
@@ -913,65 +934,84 @@ mod test {
         temp_dir.close().unwrap();
     }
 
-    #[tokio::test]
-    async fn test_local_disk_delete_operation() {
+    #[test]
+    fn test_local_disk_delete_operation() {
         let temp_dir = 
tempdir::TempDir::new("test_local_disk_delete_operation-dir").unwrap();
         let temp_path = temp_dir.path().to_str().unwrap().to_string();
 
         println!("init the path: {}", &temp_path);
 
-        let local_disk = LocalDisk::new(temp_path.clone(), 
LocalDiskConfig::default());
+        let runtime: RuntimeManager = Default::default();
+        let local_disk = LocalDisk::new(
+            temp_path.clone(),
+            LocalDiskConfig::default(),
+            runtime.clone(),
+        );
 
         let data = b"hello!";
-        local_disk
-            .write(Bytes::copy_from_slice(data), "a/b".to_string())
-            .await
+        runtime
+            .wait(local_disk.write(Bytes::copy_from_slice(data), 
"a/b".to_string()))
             .unwrap();
 
         assert_eq!(
             true,
-            tokio::fs::try_exists(format!("{}/{}", &temp_path, 
"a/b".to_string()))
-                .await
+            runtime
+                .wait(tokio::fs::try_exists(format!(
+                    "{}/{}",
+                    &temp_path,
+                    "a/b".to_string()
+                )))
                 .unwrap()
         );
 
-        local_disk
-            .delete("a/".to_string())
-            .await
+        runtime
+            .wait(local_disk.delete("a/".to_string()))
             .expect("TODO: panic message");
         assert_eq!(
             false,
-            tokio::fs::try_exists(format!("{}/{}", &temp_path, 
"a/b".to_string()))
-                .await
+            runtime
+                .wait(tokio::fs::try_exists(format!(
+                    "{}/{}",
+                    &temp_path,
+                    "a/b".to_string()
+                )))
                 .unwrap()
         );
     }
 
-    #[tokio::test]
-    async fn local_disk_corruption_healthy_check() {
+    #[test]
+    fn local_disk_corruption_healthy_check() {
         let temp_dir = tempdir::TempDir::new("test_directory").unwrap();
         let temp_path = temp_dir.path().to_str().unwrap().to_string();
 
-        let local_disk = LocalDisk::new(temp_path.clone(), 
LocalDiskConfig::create_mocked_config());
+        let local_disk = LocalDisk::new(
+            temp_path.clone(),
+            LocalDiskConfig::create_mocked_config(),
+            Default::default(),
+        );
 
-        tokio::time::sleep(Duration::from_secs(12)).await;
+        thread::sleep(Duration::from_secs(12));
         assert_eq!(true, local_disk.is_healthy().unwrap());
         assert_eq!(false, local_disk.is_corrupted().unwrap());
     }
 
-    #[tokio::test]
-    async fn local_disk_test() {
+    #[test]
+    fn local_disk_test() {
         let temp_dir = tempdir::TempDir::new("test_directory").unwrap();
         let temp_path = temp_dir.path().to_str().unwrap().to_string();
 
-        let local_disk = LocalDisk::new(temp_path.clone(), 
LocalDiskConfig::default());
+        let runtime: RuntimeManager = Default::default();
+        let local_disk = LocalDisk::new(
+            temp_path.clone(),
+            LocalDiskConfig::default(),
+            runtime.clone(),
+        );
 
         let data = b"Hello, World!";
 
         let relative_path = "app-id/test_file.txt";
-        let write_result = local_disk
-            .write(Bytes::copy_from_slice(data), relative_path.to_string())
-            .await;
+        let write_result =
+            runtime.wait(local_disk.write(Bytes::copy_from_slice(data), 
relative_path.to_string()));
         assert!(write_result.is_ok());
 
         // test whether the content is written
@@ -982,14 +1022,15 @@ mod test {
         assert_eq!(file_content, data);
 
         // if the file has been created, append some content
-        let write_result = local_disk
-            .write(Bytes::copy_from_slice(data), relative_path.to_string())
-            .await;
+        let write_result =
+            runtime.wait(local_disk.write(Bytes::copy_from_slice(data), 
relative_path.to_string()));
         assert!(write_result.is_ok());
 
-        let read_result = local_disk
-            .read(relative_path.to_string(), 0, Some(data.len() as i64 * 2))
-            .await;
+        let read_result = runtime.wait(local_disk.read(
+            relative_path.to_string(),
+            0,
+            Some(data.len() as i64 * 2),
+        ));
         assert!(read_result.is_ok());
         let read_data = read_result.unwrap();
         let expected = b"Hello, World!Hello, World!";
diff --git a/rust/experimental/server/src/store/memory.rs 
b/rust/experimental/server/src/store/memory.rs
index a380a7520..0d0852d0b 100644
--- a/rust/experimental/server/src/store/memory.rs
+++ b/rust/experimental/server/src/store/memory.rs
@@ -58,12 +58,14 @@ pub struct MemoryStore {
     buffer_ticket_timeout_sec: i64,
     buffer_ticket_check_interval_sec: i64,
     in_flush_buffer_size: AtomicU64,
+    runtime_manager: RuntimeManager,
 }
 
 unsafe impl Send for MemoryStore {}
 unsafe impl Sync for MemoryStore {}
 
 impl MemoryStore {
+    // only for test cases
     pub fn new(max_memory_size: i64) -> Self {
         MemoryStore {
             state: DashMap::new(),
@@ -73,10 +75,11 @@ impl MemoryStore {
             buffer_ticket_timeout_sec: 5 * 60,
             buffer_ticket_check_interval_sec: 10,
             in_flush_buffer_size: Default::default(),
+            runtime_manager: Default::default(),
         }
     }
 
-    pub fn from(conf: MemoryStoreConfig) -> Self {
+    pub fn from(conf: MemoryStoreConfig, runtime_manager: RuntimeManager) -> 
Self {
         let capacity = ReadableSize::from_str(&conf.capacity).unwrap();
         MemoryStore {
             state: DashMap::new(),
@@ -86,6 +89,7 @@ impl MemoryStore {
             buffer_ticket_timeout_sec: 
conf.buffer_ticket_timeout_sec.unwrap_or(5 * 60),
             buffer_ticket_check_interval_sec: 10,
             in_flush_buffer_size: Default::default(),
+            runtime_manager,
         }
     }
 
@@ -298,7 +302,7 @@ impl Store for MemoryStore {
     fn start(self: Arc<Self>) {
         // schedule check to find out the timeout allocated buffer ticket
         let mem_store = self.clone();
-        tokio::spawn(async move {
+        self.runtime_manager.default_runtime.spawn(async move {
             loop {
                 mem_store.check_allocated_tickets().await;
                 delay_for(Duration::from_secs(
@@ -718,17 +722,19 @@ mod test {
     use bytes::BytesMut;
     use core::panic;
     use std::sync::Arc;
+    use std::thread;
     use std::time::Duration;
 
     use crate::config::MemoryStoreConfig;
+    use crate::runtime::manager::RuntimeManager;
     use anyhow::Result;
-    use tokio::time::sleep as delay_for;
 
-    #[tokio::test]
-    async fn test_ticket_timeout() -> Result<()> {
+    #[test]
+    fn test_ticket_timeout() -> Result<()> {
         let cfg = MemoryStoreConfig::from("2M".to_string(), 1);
+        let runtime_manager: RuntimeManager = Default::default();
+        let mut store = MemoryStore::from(cfg, runtime_manager.clone());
 
-        let mut store = MemoryStore::from(cfg);
         store.refresh_buffer_ticket_check_interval_sec(1);
 
         let store = Arc::new(store);
@@ -736,69 +742,76 @@ mod test {
 
         let app_id = "mocked-app-id";
         let ctx = 
RequireBufferContext::new(PartitionedUId::from(app_id.to_string(), 1, 1), 1000);
-        let resp = store.require_buffer(ctx.clone()).await?;
+        let resp = runtime_manager.wait(store.require_buffer(ctx.clone()))?;
         assert!(store.is_ticket_exist(app_id, resp.ticket_id));
 
-        let snapshot = store.budget.snapshot().await;
+        let snapshot = runtime_manager.wait(store.budget.snapshot());
         assert_eq!(snapshot.allocated, 1000);
         assert_eq!(snapshot.used, 0);
 
-        delay_for(Duration::from_secs(5)).await;
+        thread::sleep(Duration::from_secs(5));
 
         assert!(!store.is_ticket_exist(app_id, resp.ticket_id));
 
-        let snapshot = store.budget.snapshot().await;
+        let snapshot = runtime_manager.wait(store.budget.snapshot());
         assert_eq!(snapshot.allocated, 0);
         assert_eq!(snapshot.used, 0);
 
         Ok(())
     }
 
-    #[tokio::test]
-    async fn test_memory_buffer_ticket() -> Result<()> {
+    #[test]
+    fn test_memory_buffer_ticket() -> Result<()> {
         let store = MemoryStore::new(1024 * 1000);
+        let runtime = store.runtime_manager.clone();
 
         let app_id = "mocked-app-id";
         let ctx = 
RequireBufferContext::new(PartitionedUId::from(app_id.to_string(), 1, 1), 1000);
-        let resp = store.require_buffer(ctx.clone()).await?;
+        let resp = runtime.wait(store.require_buffer(ctx.clone()))?;
         let ticket_id_1 = resp.ticket_id;
 
-        let resp = store.require_buffer(ctx.clone()).await?;
+        let resp = runtime.wait(store.require_buffer(ctx.clone()))?;
         let ticket_id_2 = resp.ticket_id;
 
         assert!(store.is_ticket_exist(app_id, ticket_id_1));
         assert!(store.is_ticket_exist(app_id, ticket_id_2));
         assert!(!store.is_ticket_exist(app_id, 100239));
 
-        let snapshot = store.budget.snapshot().await;
+        let snapshot = runtime.wait(store.budget.snapshot());
         assert_eq!(snapshot.allocated, 1000 * 2);
         assert_eq!(snapshot.used, 0);
 
-        store.purge(app_id.to_string()).await?;
+        runtime.wait(store.purge(app_id.to_string()))?;
 
-        let snapshot = store.budget.snapshot().await;
+        let snapshot = runtime.wait(store.budget.snapshot());
         assert_eq!(snapshot.allocated, 0);
         assert_eq!(snapshot.used, 0);
 
         Ok(())
     }
 
-    #[tokio::test]
-    async fn test_read_buffer_in_flight() {
+    #[test]
+    fn test_read_buffer_in_flight() {
         let store = MemoryStore::new(1024);
+        let runtime = store.runtime_manager.clone();
+
         let uid = PartitionedUId {
             app_id: "100".to_string(),
             shuffle_id: 0,
             partition_id: 0,
         };
         let writing_view_ctx = create_writing_ctx_with_blocks(10, 10, 
uid.clone());
-        let _ = store.insert(writing_view_ctx).await;
+        let _ = runtime.wait(store.insert(writing_view_ctx));
 
         let default_single_read_size = 20;
 
         // case1: read from -1
-        let mem_data =
-            get_data_with_last_block_id(default_single_read_size, -1, &store, 
uid.clone()).await;
+        let mem_data = runtime.wait(get_data_with_last_block_id(
+            default_single_read_size,
+            -1,
+            &store,
+            uid.clone(),
+        ));
         assert_eq!(2, mem_data.shuffle_data_block_segments.len());
         assert_eq!(
             0,
@@ -818,8 +831,12 @@ mod test {
         );
 
         // case2: when the last_block_id doesn't exist, it should return the 
data like when last_block_id=-1
-        let mem_data =
-            get_data_with_last_block_id(default_single_read_size, 100, &store, 
uid.clone()).await;
+        let mem_data = runtime.wait(get_data_with_last_block_id(
+            default_single_read_size,
+            100,
+            &store,
+            uid.clone(),
+        ));
         assert_eq!(2, mem_data.shuffle_data_block_segments.len());
         assert_eq!(
             0,
@@ -839,8 +856,12 @@ mod test {
         );
 
         // case3: read from 3
-        let mem_data =
-            get_data_with_last_block_id(default_single_read_size, 3, &store, 
uid.clone()).await;
+        let mem_data = runtime.wait(get_data_with_last_block_id(
+            default_single_read_size,
+            3,
+            &store,
+            uid.clone(),
+        ));
         assert_eq!(2, mem_data.shuffle_data_block_segments.len());
         assert_eq!(
             4,
@@ -861,7 +882,7 @@ mod test {
 
         // case4: some data are in inflight blocks
         let buffer = 
store.get_or_create_underlying_staging_buffer(uid.clone());
-        let mut buffer = buffer.lock().await;
+        let mut buffer = runtime.wait(buffer.lock());
         let owned = buffer.staging.to_owned();
         buffer.staging.clear();
         let mut idx = 0;
@@ -872,8 +893,12 @@ mod test {
         drop(buffer);
 
         // all data will be fetched from in_flight data
-        let mem_data =
-            get_data_with_last_block_id(default_single_read_size, 3, &store, 
uid.clone()).await;
+        let mem_data = runtime.wait(get_data_with_last_block_id(
+            default_single_read_size,
+            3,
+            &store,
+            uid.clone(),
+        ));
         assert_eq!(2, mem_data.shuffle_data_block_segments.len());
         assert_eq!(
             4,
@@ -895,7 +920,7 @@ mod test {
         // case5: old data in in_flight and latest data in staging.
         // read it from the block id 9, and read size of 30
         let buffer = 
store.get_or_create_underlying_staging_buffer(uid.clone());
-        let mut buffer = buffer.lock().await;
+        let mut buffer = runtime.wait(buffer.lock());
         buffer.staging.push(PartitionedDataBlock {
             block_id: 20,
             length: 10,
@@ -906,7 +931,7 @@ mod test {
         });
         drop(buffer);
 
-        let mem_data = get_data_with_last_block_id(30, 7, &store, 
uid.clone()).await;
+        let mem_data = runtime.wait(get_data_with_last_block_id(30, 7, &store, 
uid.clone()));
         assert_eq!(3, mem_data.shuffle_data_block_segments.len());
         assert_eq!(
             8,
@@ -934,7 +959,7 @@ mod test {
         );
 
         // case6: read the end to return empty result
-        let mem_data = get_data_with_last_block_id(30, 20, &store, 
uid.clone()).await;
+        let mem_data = runtime.wait(get_data_with_last_block_id(30, 20, 
&store, uid.clone()));
         assert_eq!(0, mem_data.shuffle_data_block_segments.len());
     }
 
@@ -980,9 +1005,11 @@ mod test {
         WritingViewContext { uid, data_blocks }
     }
 
-    #[tokio::test]
-    async fn test_allocated_and_purge_for_memory() {
+    #[test]
+    fn test_allocated_and_purge_for_memory() {
         let store = MemoryStore::new(1024 * 1024 * 1024);
+        let runtime = store.runtime_manager.clone();
+
         let ctx = RequireBufferContext {
             uid: PartitionedUId {
                 app_id: "100".to_string(),
@@ -991,9 +1018,11 @@ mod test {
             },
             size: 10000,
         };
-        match store.require_buffer(ctx).await {
+        match runtime.default_runtime.block_on(store.require_buffer(ctx)) {
             Ok(_) => {
-                let _ = store.purge("100".to_string()).await;
+                let _ = runtime
+                    .default_runtime
+                    .block_on(store.purge("100".to_string()));
             }
             _ => panic!(),
         }
@@ -1004,9 +1033,11 @@ mod test {
         assert_eq!(1024 * 1024 * 1024, budget.capacity);
     }
 
-    #[tokio::test]
-    async fn test_purge() -> Result<()> {
+    #[test]
+    fn test_purge() -> Result<()> {
         let store = MemoryStore::new(1024);
+        let runtime = store.runtime_manager.clone();
+
         let app_id = "purge_app";
         let shuffle_id = 1;
         let partition = 1;
@@ -1014,9 +1045,9 @@ mod test {
         let uid = PartitionedUId::from(app_id.to_string(), shuffle_id, 
partition);
 
         // the buffer requested
-        let _buffer = store
-            .require_buffer(RequireBufferContext::new(uid.clone(), 40))
-            .await
+
+        let _buffer = runtime
+            .wait(store.require_buffer(RequireBufferContext::new(uid.clone(), 
40)))
             .expect("");
 
         let writing_ctx = WritingViewContext {
@@ -1030,31 +1061,33 @@ mod test {
                 task_attempt_id: 0,
             }],
         };
-        store.insert(writing_ctx).await.expect("");
+        runtime.wait(store.insert(writing_ctx)).expect("");
 
         let reading_ctx = ReadingViewContext {
             uid: uid.clone(),
             reading_options: 
ReadingOptions::MEMORY_LAST_BLOCK_ID_AND_MAX_SIZE(-1, 1000000),
         };
-        let data = store.get(reading_ctx.clone()).await.expect("");
+        let data = runtime.wait(store.get(reading_ctx.clone())).expect("");
         assert_eq!(1, data.from_memory().shuffle_data_block_segments.len());
 
         // purge
-        store.purge(app_id.to_string()).await.expect("");
-        let snapshot = store.budget.snapshot().await;
+        runtime.wait(store.purge(app_id.to_string())).expect("");
+        let snapshot = runtime.wait(store.budget.snapshot());
         assert_eq!(snapshot.used, 0);
         // the remaining allocated will be removed.
         assert_eq!(snapshot.allocated, 0);
         assert_eq!(snapshot.capacity, 1024);
-        let data = store.get(reading_ctx.clone()).await.expect("");
+        let data = runtime.wait(store.get(reading_ctx.clone())).expect("");
         assert_eq!(0, data.from_memory().shuffle_data_block_segments.len());
 
         Ok(())
     }
 
-    #[tokio::test]
-    async fn test_put_and_get_for_memory() {
+    #[test]
+    fn test_put_and_get_for_memory() {
         let store = MemoryStore::new(1024 * 1024 * 1024);
+        let runtime = store.runtime_manager.clone();
+
         let writing_ctx = WritingViewContext {
             uid: Default::default(),
             data_blocks: vec![
@@ -1076,14 +1109,14 @@ mod test {
                 },
             ],
         };
-        store.insert(writing_ctx).await.unwrap();
+        runtime.wait(store.insert(writing_ctx)).unwrap();
 
         let reading_ctx = ReadingViewContext {
             uid: Default::default(),
             reading_options: 
ReadingOptions::MEMORY_LAST_BLOCK_ID_AND_MAX_SIZE(-1, 1000000),
         };
 
-        match store.get(reading_ctx).await.unwrap() {
+        match runtime.wait(store.get(reading_ctx)).unwrap() {
             ResponseData::Mem(data) => {
                 assert_eq!(data.shuffle_data_block_segments.len(), 2);
                 
assert_eq!(data.shuffle_data_block_segments.get(0).unwrap().offset, 0);
diff --git a/rust/experimental/server/src/store/mod.rs 
b/rust/experimental/server/src/store/mod.rs
index 93b4bdd3f..9e23545b5 100644
--- a/rust/experimental/server/src/store/mod.rs
+++ b/rust/experimental/server/src/store/mod.rs
@@ -35,6 +35,7 @@ use anyhow::Result;
 use async_trait::async_trait;
 use bytes::Bytes;
 
+use crate::runtime::manager::RuntimeManager;
 use std::sync::Arc;
 
 #[derive(Debug)]
@@ -178,7 +179,7 @@ pub trait Persistent {}
 pub struct StoreProvider {}
 
 impl StoreProvider {
-    pub fn get(config: Config) -> HybridStore {
-        HybridStore::from(config)
+    pub fn get(runtime_manager: RuntimeManager, config: Config) -> HybridStore 
{
+        HybridStore::from(config, runtime_manager)
     }
 }
diff --git a/rust/experimental/server/tests/write_read.rs 
b/rust/experimental/server/tests/write_read.rs
index b08ecc2b1..499005a2d 100644
--- a/rust/experimental/server/tests/write_read.rs
+++ b/rust/experimental/server/tests/write_read.rs
@@ -41,6 +41,7 @@ mod tests {
             hybrid_store: Some(HybridStoreConfig::new(0.9, 0.5, None)),
             hdfs_store: None,
             store_type: Some(StorageType::MEMORY_LOCALFILE),
+            runtime_config: Default::default(),
             metrics: Some(MetricsConfig {
                 push_gateway_endpoint: None,
                 push_interval_sec: None,

Reply via email to