This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 882cc2082 [#1407] feat(rust): support multiple spill policies and 
simplify hdfs config (#1487)
882cc2082 is described below

commit 882cc208256f5b9b7d360e84ec0bb0f0c289473f
Author: Junfan Zhang <[email protected]>
AuthorDate: Fri Jan 26 16:37:32 2024 +0800

    [#1407] feat(rust): support multiple spill policies and simplify hdfs 
config (#1487)
    
    ### What changes were proposed in this pull request?
    
    1. fix(rust): fast fail of writing to localfile when disk is unhealthy
    2. fix(rust): incorrect failed buffer required cnt metrics
    3. feat(rust): support fallback to hdfs when local store is invalid
    4. btw: support huge partition flushing to hdfs directly
    5. feat(rust): avoid setting hdfs config in server client
    
    ### Why are the changes needed?
    
    For #1407, hope bringing up this into production env.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Unit tests.
---
 rust/experimental/server/Cargo.lock              |  60 ++++--
 rust/experimental/server/Cargo.toml              |   2 +-
 rust/experimental/server/README.md               |   6 +-
 rust/experimental/server/src/app.rs              | 246 +++++++++++++++++------
 rust/experimental/server/src/config.rs           |   1 -
 rust/experimental/server/src/error.rs            |   6 +
 rust/experimental/server/src/grpc.rs             |  20 +-
 rust/experimental/server/src/main.rs             |   4 +-
 rust/experimental/server/src/store/hdfs.rs       | 116 ++++++++---
 rust/experimental/server/src/store/hybrid.rs     |  73 +++++--
 rust/experimental/server/src/store/local/disk.rs |   6 +-
 rust/experimental/server/src/store/localfile.rs  | 112 ++++++++++-
 rust/experimental/server/src/store/memory.rs     |  32 +--
 rust/experimental/server/src/store/mod.rs        |   5 +-
 14 files changed, 535 insertions(+), 154 deletions(-)

diff --git a/rust/experimental/server/Cargo.lock 
b/rust/experimental/server/Cargo.lock
index 0d3e7ac2f..87048fc34 100644
--- a/rust/experimental/server/Cargo.lock
+++ b/rust/experimental/server/Cargo.lock
@@ -467,8 +467,8 @@ version = "0.5.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "c2895653b4d9f1538a83970077cb01dfc77a4810524e51a110944688e916b18e"
 dependencies = [
- "prost",
- "prost-types",
+ "prost 0.11.9",
+ "prost-types 0.11.9",
  "tonic",
  "tracing-core",
 ]
@@ -485,7 +485,7 @@ dependencies = [
  "futures",
  "hdrhistogram",
  "humantime",
- "prost-types",
+ "prost-types 0.11.9",
  "serde",
  "serde_json",
  "thread_local",
@@ -1227,9 +1227,9 @@ checksum = 
"2c6201b9ff9fd90a5a3bac2e56a830d0caa509576f0e503818ee82c181b3437a"
 
 [[package]]
 name = "hdfs-native"
-version = "0.5.0"
+version = "0.7.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "824c00b5e1b0ba3aeb5678debcb78bf3a26d3d6473e7fd0f53d1310c988c1f02"
+checksum = "7068d52978481fb74f1ac1522c0823d180a6e112c459169cf72f3cd08b207b0d"
 dependencies = [
  "base64 0.21.4",
  "bytes 1.5.0",
@@ -1240,8 +1240,8 @@ dependencies = [
  "libgssapi",
  "log",
  "num-traits",
- "prost",
- "prost-types",
+ "prost 0.12.3",
+ "prost-types 0.12.3",
  "roxmltree",
  "socket2 0.5.4",
  "thiserror",
@@ -2337,7 +2337,17 @@ source = 
"registry+https://github.com/rust-lang/crates.io-index";
 checksum = "0b82eaa1d779e9a4bc1c3217db8ffbeabaae1dca241bf70183242128d48681cd"
 dependencies = [
  "bytes 1.5.0",
- "prost-derive",
+ "prost-derive 0.11.9",
+]
+
+[[package]]
+name = "prost"
+version = "0.12.3"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "146c289cda302b98a28d40c8b3b90498d6e526dd24ac2ecea73e4e491685b94a"
+dependencies = [
+ "bytes 1.5.0",
+ "prost-derive 0.12.3",
 ]
 
 [[package]]
@@ -2354,8 +2364,8 @@ dependencies = [
  "multimap",
  "petgraph",
  "prettyplease",
- "prost",
- "prost-types",
+ "prost 0.11.9",
+ "prost-types 0.11.9",
  "regex",
  "syn 1.0.109",
  "tempfile",
@@ -2375,13 +2385,35 @@ dependencies = [
  "syn 1.0.109",
 ]
 
+[[package]]
+name = "prost-derive"
+version = "0.12.3"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "efb6c9a1dd1def8e2124d17e83a20af56f1570d6c2d2bd9e266ccb768df3840e"
+dependencies = [
+ "anyhow",
+ "itertools",
+ "proc-macro2",
+ "quote",
+ "syn 2.0.37",
+]
+
 [[package]]
 name = "prost-types"
 version = "0.11.9"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "213622a1460818959ac1181aaeb2dc9c7f63df720db7d788b3e24eacd1983e13"
 dependencies = [
- "prost",
+ "prost 0.11.9",
+]
+
+[[package]]
+name = "prost-types"
+version = "0.12.3"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "193898f59edcf43c26227dcd4c8427f00d99d61e95dcde58dabd49fa291d470e"
+dependencies = [
+ "prost 0.12.3",
 ]
 
 [[package]]
@@ -3419,7 +3451,7 @@ dependencies = [
  "hdrhistogram",
  "humantime",
  "once_cell",
- "prost-types",
+ "prost-types 0.11.9",
  "ratatui",
  "regex",
  "serde",
@@ -3587,7 +3619,7 @@ dependencies = [
  "hyper-timeout",
  "percent-encoding",
  "pin-project 1.1.3",
- "prost",
+ "prost 0.11.9",
  "tokio",
  "tokio-stream",
  "tower",
@@ -3809,7 +3841,7 @@ dependencies = [
  "poem",
  "pprof",
  "prometheus",
- "prost",
+ "prost 0.11.9",
  "prost-build",
  "serde",
  "signal-hook",
diff --git a/rust/experimental/server/Cargo.toml 
b/rust/experimental/server/Cargo.toml
index ad9f5fc92..860ad4320 100644
--- a/rust/experimental/server/Cargo.toml
+++ b/rust/experimental/server/Cargo.toml
@@ -100,7 +100,7 @@ spin = "0.9.8"
 opendal = { version = "0.44.0", features = ["services-fs"]}
 
 [dependencies.hdfs-native]
-version = "0.5.0"
+version = "0.7.0"
 optional = true
 features = ["kerberos"]
 
diff --git a/rust/experimental/server/README.md 
b/rust/experimental/server/README.md
index 9cec4df48..73e88da0f 100644
--- a/rust/experimental/server/README.md
+++ b/rust/experimental/server/README.md
@@ -15,7 +15,7 @@
   ~ limitations under the License.
   -->
 
-Another implementation of Apache Uniffle shuffle server
+Another implementation of Apache Uniffle shuffle server (Single binary, no 
extra dependencies)
 
 ## Benchmark report
 
@@ -90,7 +90,7 @@ In the future, rust-based server will use io_uring mechanism 
to improve writing
 
 ## Build
 
-`cargo build --release`
+`cargo build --release --features hdfs,jemalloc`
 
 Uniffle-x currently treats all compiler warnings as error, with some dead-code 
warning excluded. When you are developing
 and really want to ignore the warnings for now, you can use `ccargo --config 
'build.rustflags=["-W", "warnings"]' build`
@@ -115,7 +115,6 @@ data_paths = ["/data1/uniffle", "/data2/uniffle"]
 healthy_check_min_disks = 0
 
 [hdfs_store]
-data_path = "hdfs://rbf-x/user/bi"
 max_concurrency = 10
 
 [hybrid_store]
@@ -135,6 +134,7 @@ push_gateway_endpoint = "http://xxxxxxxxxxxxxx/pushgateway";
 ### HDFS Setup 
 
 Benefit from the hdfs-native crate, there is no need to setup the JAVA_HOME 
and relative dependencies.
+If HDFS store is valid, the spark client must specify the conf of 
`spark.rss.client.remote.storage.useLocalConfAsDefault=true`
 
 ```shell
 cargo build --features hdfs --release
diff --git a/rust/experimental/server/src/app.rs 
b/rust/experimental/server/src/app.rs
index 9298fb6e3..35699f780 100644
--- a/rust/experimental/server/src/app.rs
+++ b/rust/experimental/server/src/app.rs
@@ -40,19 +40,22 @@ use dashmap::DashMap;
 use log::{debug, error, info};
 
 use std::collections::hash_map::DefaultHasher;
-use std::collections::HashSet;
+use std::collections::{HashMap, HashSet};
 
 use std::hash::{Hash, Hasher};
 
 use std::str::FromStr;
 
+use crate::proto::uniffle::RemoteStorage;
 use std::sync::atomic::{AtomicU64, Ordering};
-use std::sync::Arc;
 use std::sync::RwLock;
+use std::sync::{Arc, OnceLock};
 use std::time::Duration;
 
+pub static SHUFFLE_SERVER_ID: OnceLock<String> = OnceLock::new();
+
 #[derive(Debug, Clone)]
-enum DataDistribution {
+pub enum DataDistribution {
     NORMAL,
     #[allow(non_camel_case_types)]
     LOCAL_ORDER,
@@ -61,9 +64,24 @@ enum DataDistribution {
 pub const MAX_CONCURRENCY_PER_PARTITION_TO_WRITE: i32 = 20;
 
 #[derive(Debug, Clone)]
-struct AppConfigOptions {
-    data_distribution: DataDistribution,
-    max_concurrency_per_partition_to_write: i32,
+pub struct AppConfigOptions {
+    pub data_distribution: DataDistribution,
+    pub max_concurrency_per_partition_to_write: i32,
+    pub remote_storage_config_option: Option<RemoteStorageConfig>,
+}
+
+impl AppConfigOptions {
+    pub fn new(
+        data_distribution: DataDistribution,
+        max_concurrency_per_partition_to_write: i32,
+        remote_storage_config_option: Option<RemoteStorageConfig>,
+    ) -> Self {
+        Self {
+            data_distribution,
+            max_concurrency_per_partition_to_write,
+            remote_storage_config_option,
+        }
+    }
 }
 
 impl Default for AppConfigOptions {
@@ -71,6 +89,30 @@ impl Default for AppConfigOptions {
         AppConfigOptions {
             data_distribution: DataDistribution::LOCAL_ORDER,
             max_concurrency_per_partition_to_write: 20,
+            remote_storage_config_option: None,
+        }
+    }
+}
+
+// =============================================================
+
+#[derive(Clone, Debug)]
+pub struct RemoteStorageConfig {
+    pub root: String,
+    pub configs: HashMap<String, String>,
+}
+
+impl From<RemoteStorage> for RemoteStorageConfig {
+    fn from(remote_conf: RemoteStorage) -> Self {
+        let root = remote_conf.path;
+        let mut confs = HashMap::new();
+        for kv in remote_conf.remote_storage_conf {
+            confs.insert(kv.key, kv.value);
+        }
+
+        Self {
+            root,
+            configs: confs,
         }
     }
 }
@@ -81,7 +123,7 @@ pub struct App {
     app_id: String,
     // key: shuffleId, value: partitionIds
     partitions: DashMap<i32, HashSet<i32>>,
-    app_config_options: Option<AppConfigOptions>,
+    app_config_options: AppConfigOptions,
     latest_heartbeat_time: AtomicU64,
     store: Arc<HybridStore>,
     // key: (shuffle_id, partition_id)
@@ -139,11 +181,36 @@ impl PartitionedMeta {
 impl App {
     fn from(
         app_id: String,
-        config_options: Option<AppConfigOptions>,
+        config_options: AppConfigOptions,
         store: Arc<HybridStore>,
         huge_partition_marked_threshold: Option<u64>,
         huge_partition_memory_max_available_size: Option<u64>,
+        runtime_manager: RuntimeManager,
     ) -> Self {
+        // todo: should throw exception if register failed.
+        let copy_app_id = app_id.to_string();
+        let app_options = config_options.clone();
+        let cloned_store = store.clone();
+        let register_result = futures::executor::block_on(async move {
+            runtime_manager
+                .default_runtime
+                .spawn(async move {
+                    cloned_store
+                        .register_app(RegisterAppContext {
+                            app_id: copy_app_id,
+                            app_config_options: app_options,
+                        })
+                        .await
+                })
+                .await
+        });
+        if register_result.is_err() {
+            error!(
+                "Errors on registering app to store: {:#?}",
+                register_result.err()
+            );
+        }
+
         App {
             app_id,
             partitions: DashMap::new(),
@@ -179,8 +246,12 @@ impl App {
             .incr_data_size(len)?;
         TOTAL_RECEIVED_DATA.inc_by(len as u64);
 
-        self.store.insert(ctx).await?;
+        let ctx = match self.is_huge_partition(&ctx.uid).await {
+            Ok(true) => WritingViewContext::new(ctx.uid.clone(), 
ctx.data_blocks, true),
+            _ => ctx,
+        };
 
+        self.store.insert(ctx).await?;
         Ok(len)
     }
 
@@ -204,23 +275,37 @@ impl App {
         self.store.get_index(ctx).await
     }
 
-    async fn huge_partition_limit(&self, uid: &PartitionedUId) -> Result<bool> 
{
-        let huge_partition_threshold = &self.huge_partition_marked_threshold;
-        let huge_partition_memory_used = 
&self.huge_partition_memory_max_available_size;
-        if huge_partition_threshold.is_none() || 
huge_partition_memory_used.is_none() {
+    async fn is_huge_partition(&self, uid: &PartitionedUId) -> Result<bool> {
+        let huge_partition_threshold_option = 
&self.huge_partition_marked_threshold;
+        let huge_partition_memory_used_option = 
&self.huge_partition_memory_max_available_size;
+        if huge_partition_threshold_option.is_none() || 
huge_partition_memory_used_option.is_none()
+        {
             return Ok(false);
         }
-        let huge_partition_size = &huge_partition_threshold.unwrap();
-        let huge_partition_memory = &huge_partition_memory_used.unwrap();
+
+        let huge_partition_threshold = 
&huge_partition_threshold_option.unwrap();
 
         let meta = self.get_underlying_partition_bitmap(uid.clone());
         let data_size = meta.get_data_size()?;
-        if data_size > *huge_partition_size
-            && self
-                .store
-                .get_hot_store_memory_partitioned_buffer_size(uid)
-                .await?
-                > *huge_partition_memory
+        if data_size > *huge_partition_threshold {
+            return Ok(true);
+        }
+
+        return Ok(false);
+    }
+
+    async fn is_backpressure_for_huge_partition(&self, uid: &PartitionedUId) 
-> Result<bool> {
+        if !self.is_huge_partition(uid).await? {
+            return Ok(false);
+        }
+        let huge_partition_memory_used = 
&self.huge_partition_memory_max_available_size;
+        let huge_partition_memory = &huge_partition_memory_used.unwrap();
+
+        if self
+            .store
+            .get_hot_store_memory_partitioned_buffer_size(uid)
+            .await?
+            > *huge_partition_memory
         {
             info!(
                 "[{:?}] with huge partition, it has been writing speed 
limited",
@@ -241,12 +326,15 @@ impl App {
         &self,
         ctx: RequireBufferContext,
     ) -> Result<RequireBufferResponse, WorkerError> {
-        if self.huge_partition_limit(&ctx.uid).await? {
+        if self.is_backpressure_for_huge_partition(&ctx.uid).await? {
             TOTAL_REQUIRE_BUFFER_FAILED.inc();
             return Err(WorkerError::MEMORY_USAGE_LIMITED_BY_HUGE_PARTITION);
         }
 
-        self.store.require_buffer(ctx).await
+        self.store.require_buffer(ctx).await.map_err(|err| {
+            TOTAL_REQUIRE_BUFFER_FAILED.inc();
+            err
+        })
     }
 
     pub async fn release_buffer(&self, ticket_id: i64) -> Result<i64, 
WorkerError> {
@@ -322,6 +410,29 @@ pub struct GetBlocksContext {
 pub struct WritingViewContext {
     pub uid: PartitionedUId,
     pub data_blocks: Vec<PartitionedDataBlock>,
+    pub owned_by_huge_partition: bool,
+}
+
+impl WritingViewContext {
+    pub fn from(uid: PartitionedUId, data_blocks: Vec<PartitionedDataBlock>) 
-> Self {
+        WritingViewContext {
+            uid,
+            data_blocks,
+            owned_by_huge_partition: false,
+        }
+    }
+
+    pub fn new(
+        uid: PartitionedUId,
+        data_blocks: Vec<PartitionedDataBlock>,
+        owned_by_huge_partition: bool,
+    ) -> Self {
+        WritingViewContext {
+            uid,
+            data_blocks,
+            owned_by_huge_partition,
+        }
+    }
 }
 
 #[derive(Debug, Clone)]
@@ -341,6 +452,12 @@ pub struct RequireBufferContext {
     pub size: i64,
 }
 
+#[derive(Debug, Clone)]
+pub struct RegisterAppContext {
+    pub app_id: String,
+    pub app_config_options: AppConfigOptions,
+}
+
 #[derive(Debug, Clone)]
 pub struct ReleaseBufferContext {
     pub(crate) ticket_id: i64,
@@ -389,6 +506,7 @@ pub struct AppManager {
     store: Arc<HybridStore>,
     app_heartbeat_timeout_min: u32,
     config: Config,
+    runtime_manager: RuntimeManager,
 }
 
 impl AppManager {
@@ -404,6 +522,7 @@ impl AppManager {
             store,
             app_heartbeat_timeout_min,
             config,
+            runtime_manager: runtime_manager.clone(),
         };
         manager
     }
@@ -506,7 +625,12 @@ impl AppManager {
         self.apps.get(app_id).map(|v| v.value().clone())
     }
 
-    pub fn register(&self, app_id: String, shuffle_id: i32) -> Result<()> {
+    pub fn register(
+        &self,
+        app_id: String,
+        shuffle_id: i32,
+        app_config_options: AppConfigOptions,
+    ) -> Result<()> {
         info!(
             "Accepted registry. app_id: {}, shuffle_id: {}",
             app_id.clone(),
@@ -539,10 +663,11 @@ impl AppManager {
 
             Arc::new(App::from(
                 app_id,
-                Some(AppConfigOptions::default()),
+                app_config_options,
                 self.store.clone(),
                 threshold,
                 huge_partition_max_available_size,
+                self.runtime_manager.clone(),
             ))
         });
         app_ref.register_shuffle(shuffle_id)
@@ -590,6 +715,7 @@ mod test {
     };
     use crate::config::{Config, HybridStoreConfig, LocalfileStoreConfig, 
MemoryStoreConfig};
 
+    use crate::runtime::manager::RuntimeManager;
     use crate::store::{PartitionedDataBlock, ResponseData};
     use croaring::treemap::JvmSerializer;
     use croaring::Treemap;
@@ -613,21 +739,24 @@ mod test {
         config
     }
 
-    #[tokio::test]
-    async fn app_put_get_purge_test() {
+    #[test]
+    fn app_put_get_purge_test() {
         let app_id = "app_put_get_purge_test-----id";
 
-        let app_manager_ref = AppManager::get_ref(Default::default(), 
mock_config()).clone();
-        app_manager_ref.register(app_id.clone().into(), 1).unwrap();
+        let runtime_manager: RuntimeManager = Default::default();
+        let app_manager_ref = AppManager::get_ref(runtime_manager.clone(), 
mock_config()).clone();
+        app_manager_ref
+            .register(app_id.clone().into(), 1, Default::default())
+            .unwrap();
 
         if let Some(app) = app_manager_ref.get_app("app_id".into()) {
-            let writing_ctx = WritingViewContext {
-                uid: PartitionedUId {
+            let writing_ctx = WritingViewContext::from(
+                PartitionedUId {
                     app_id: app_id.clone().into(),
                     shuffle_id: 1,
                     partition_id: 0,
                 },
-                data_blocks: vec![
+                vec![
                     PartitionedDataBlock {
                         block_id: 0,
                         length: 10,
@@ -645,11 +774,11 @@ mod test {
                         task_attempt_id: 0,
                     },
                 ],
-            };
+            );
 
             // case1: put
-            let result = app.insert(writing_ctx);
-            if result.await.is_err() {
+            let f = app.insert(writing_ctx);
+            if runtime_manager.wait(f).is_err() {
                 panic!()
             }
 
@@ -660,7 +789,8 @@ mod test {
             };
 
             // case2: get
-            let result = app.select(reading_ctx).await;
+            let f = app.select(reading_ctx);
+            let result = runtime_manager.wait(f);
             if result.is_err() {
                 panic!()
             }
@@ -673,42 +803,46 @@ mod test {
             }
 
             // case3: purge
-            app_manager_ref
-                .purge_app_data(app_id.to_string(), None)
-                .await
+            runtime_manager
+                .wait(app_manager_ref.purge_app_data(app_id.to_string(), None))
                 .expect("");
 
             assert_eq!(false, app_manager_ref.get_app(app_id).is_none());
         }
     }
 
-    #[tokio::test]
-    async fn app_manager_test() {
+    #[test]
+    fn app_manager_test() {
         let app_manager_ref = AppManager::get_ref(Default::default(), 
mock_config()).clone();
-        app_manager_ref.register("app_id".into(), 1).unwrap();
+        app_manager_ref
+            .register("app_id".into(), 1, Default::default())
+            .unwrap();
         if let Some(app) = app_manager_ref.get_app("app_id".into()) {
             assert_eq!("app_id", app.app_id);
         }
     }
 
-    #[tokio::test]
-    async fn test_get_or_put_block_ids() {
+    #[test]
+    fn test_get_or_put_block_ids() {
         let app_id = "test_get_or_put_block_ids-----id".to_string();
 
-        let app_manager_ref = AppManager::get_ref(Default::default(), 
mock_config()).clone();
-        app_manager_ref.register(app_id.clone().into(), 1).unwrap();
+        let runtime_manager: RuntimeManager = Default::default();
+        let app_manager_ref = AppManager::get_ref(runtime_manager.clone(), 
mock_config()).clone();
+        app_manager_ref
+            .register(app_id.clone().into(), 1, Default::default())
+            .unwrap();
 
         let app = app_manager_ref.get_app(app_id.as_ref()).unwrap();
-        app.report_block_ids(ReportBlocksContext {
-            uid: PartitionedUId {
-                app_id: app_id.clone(),
-                shuffle_id: 1,
-                partition_id: 0,
-            },
-            blocks: vec![123, 124],
-        })
-        .await
-        .expect("TODO: panic message");
+        runtime_manager
+            .wait(app.report_block_ids(ReportBlocksContext {
+                uid: PartitionedUId {
+                    app_id: app_id.clone(),
+                    shuffle_id: 1,
+                    partition_id: 0,
+                },
+                blocks: vec![123, 124],
+            }))
+            .expect("TODO: panic message");
 
         let data = app
             .get_block_ids(GetBlocksContext {
diff --git a/rust/experimental/server/src/config.rs 
b/rust/experimental/server/src/config.rs
index 85e37d0a7..a273e2db2 100644
--- a/rust/experimental/server/src/config.rs
+++ b/rust/experimental/server/src/config.rs
@@ -45,7 +45,6 @@ impl MemoryStoreConfig {
 
 #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Default)]
 pub struct HdfsStoreConfig {
-    pub data_path: String,
     pub max_concurrency: Option<i32>,
 }
 
diff --git a/rust/experimental/server/src/error.rs 
b/rust/experimental/server/src/error.rs
index cc07536a5..c39adfcf4 100644
--- a/rust/experimental/server/src/error.rs
+++ b/rust/experimental/server/src/error.rs
@@ -34,6 +34,9 @@ pub enum WorkerError {
     #[error("Partial data has been lost, corrupted path: {0}")]
     PARTIAL_DATA_LOST(String),
 
+    #[error("Local disk:[{0}] is not healthy")]
+    LOCAL_DISK_UNHEALTHY(String),
+
     #[error("Local disk:[{0}] owned by current partition has been corrupted")]
     LOCAL_DISK_OWNED_BY_PARTITION_CORRUPTED(String),
 
@@ -51,6 +54,9 @@ pub enum WorkerError {
 
     #[error("Ticket id: {0} not exist")]
     TICKET_ID_NOT_EXIST(i64),
+
+    #[error("Hdfs native client not found for app: {0}")]
+    HDFS_NATIVE_CLIENT_NOT_FOUND(String),
 }
 
 impl From<AcquireError> for WorkerError {
diff --git a/rust/experimental/server/src/grpc.rs 
b/rust/experimental/server/src/grpc.rs
index ff5245ea6..e04447335 100644
--- a/rust/experimental/server/src/grpc.rs
+++ b/rust/experimental/server/src/grpc.rs
@@ -16,8 +16,9 @@
 // under the License.
 
 use crate::app::{
-    AppManagerRef, GetBlocksContext, PartitionedUId, ReadingIndexViewContext, 
ReadingOptions,
-    ReadingViewContext, ReportBlocksContext, RequireBufferContext, 
WritingViewContext,
+    AppConfigOptions, AppManagerRef, DataDistribution, GetBlocksContext, 
PartitionedUId,
+    ReadingIndexViewContext, ReadingOptions, ReadingViewContext, 
RemoteStorageConfig,
+    ReportBlocksContext, RequireBufferContext, WritingViewContext,
 };
 use crate::proto::uniffle::shuffle_server_server::ShuffleServer;
 use crate::proto::uniffle::{
@@ -91,9 +92,17 @@ impl ShuffleServer for DefaultShuffleServer {
         request: Request<ShuffleRegisterRequest>,
     ) -> Result<Response<ShuffleRegisterResponse>, Status> {
         let inner = request.into_inner();
+        // todo: fast fail when hdfs is enabled but empty remote storage info.
+        let remote_storage_info = inner.remote_storage.map(|x| 
RemoteStorageConfig::from(x));
+        // todo: add more options: huge_partition_threshold. and so on...
+        let app_config_option = AppConfigOptions::new(
+            DataDistribution::LOCAL_ORDER,
+            inner.max_concurrency_per_partition_to_write,
+            remote_storage_info,
+        );
         let status = self
             .app_manager_ref
-            .register(inner.app_id, inner.shuffle_id)
+            .register(inner.app_id, inner.shuffle_id, app_config_option)
             .map_or(StatusCode::INTERNAL_ERROR, |_| StatusCode::SUCCESS)
             .into();
         Ok(Response::new(ShuffleRegisterResponse {
@@ -184,10 +193,7 @@ impl ShuffleServer for DefaultShuffleServer {
                 shuffle_id,
                 partition_id,
             };
-            let ctx = WritingViewContext {
-                uid: uid.clone(),
-                data_blocks: blocks,
-            };
+            let ctx = WritingViewContext::from(uid.clone(), blocks);
 
             let inserted = app
                 .insert(ctx)
diff --git a/rust/experimental/server/src/main.rs 
b/rust/experimental/server/src/main.rs
index d2fafa4d4..01ec4aefb 100644
--- a/rust/experimental/server/src/main.rs
+++ b/rust/experimental/server/src/main.rs
@@ -17,7 +17,7 @@
 
 #![feature(impl_trait_in_assoc_type)]
 
-use crate::app::{AppManager, AppManagerRef};
+use crate::app::{AppManager, AppManagerRef, SHUFFLE_SERVER_ID};
 use crate::await_tree::AWAIT_TREE_REGISTRY;
 use crate::config::{Config, LogConfig, RotationConfig};
 use crate::grpc::await_tree_middleware::AwaitTreeMiddlewareLayer;
@@ -201,6 +201,8 @@ fn main() -> Result<()> {
 
     let rpc_port = config.grpc_port.unwrap_or(19999);
     let worker_uid = gen_worker_uid(rpc_port);
+    // todo: remove some unnecessary worker_id transfer.
+    SHUFFLE_SERVER_ID.get_or_init(|| worker_uid.clone());
 
     let metric_config = config.metrics.clone();
     init_metric_service(runtime_manager.clone(), &metric_config, 
worker_uid.clone());
diff --git a/rust/experimental/server/src/store/hdfs.rs 
b/rust/experimental/server/src/store/hdfs.rs
index 78d903543..3a1b74c80 100644
--- a/rust/experimental/server/src/store/hdfs.rs
+++ b/rust/experimental/server/src/store/hdfs.rs
@@ -17,14 +17,15 @@
 
 use crate::app::{
     PartitionedUId, PurgeDataContext, ReadingIndexViewContext, 
ReadingViewContext,
-    ReleaseBufferContext, RequireBufferContext, WritingViewContext,
+    RegisterAppContext, ReleaseBufferContext, RequireBufferContext, 
WritingViewContext,
 };
 use crate::config::HdfsStoreConfig;
 use crate::error::WorkerError;
+use std::collections::HashMap;
 
 use crate::metric::TOTAL_HDFS_USED;
 use crate::store::{Persistent, RequireBufferResponse, ResponseData, 
ResponseDataIndex, Store};
-use anyhow::Result;
+use anyhow::{anyhow, Result};
 
 use async_trait::async_trait;
 use await_tree::InstrumentAwait;
@@ -40,6 +41,7 @@ use std::sync::Arc;
 use tokio::sync::{Mutex, Semaphore};
 
 use tracing::debug;
+use url::Url;
 
 struct PartitionCachedMeta {
     is_file_created: bool,
@@ -62,10 +64,11 @@ impl Default for PartitionCachedMeta {
 }
 
 pub struct HdfsStore {
-    root: String,
-    filesystem: Box<HdfsNativeClient>,
     concurrency_access_limiter: Semaphore,
 
+    // key: app_id, value: hdfs_native_client
+    app_remote_clients: DashMap<String, HdfsNativeClient>,
+
     partition_file_locks: DashMap<String, Arc<Mutex<()>>>,
     partition_cached_meta: DashMap<String, PartitionCachedMeta>,
 }
@@ -76,26 +79,21 @@ impl Persistent for HdfsStore {}
 
 impl HdfsStore {
     pub fn from(conf: HdfsStoreConfig) -> Self {
-        let data_path = conf.data_path;
-
-        let filesystem = HdfsNativeClient::new();
-
         HdfsStore {
-            root: data_path,
-            filesystem: Box::new(filesystem),
             partition_file_locks: DashMap::new(),
             concurrency_access_limiter: 
Semaphore::new(conf.max_concurrency.unwrap_or(1) as usize),
             partition_cached_meta: Default::default(),
+            app_remote_clients: Default::default(),
         }
     }
 
     fn get_app_dir(&self, app_id: &str) -> String {
-        format!("{}/{}/", &self.root, app_id)
+        format!("{}/", app_id)
     }
 
     /// the dir created with app_id/shuffle_id
     fn get_shuffle_dir(&self, app_id: &str, shuffle_id: i32) -> String {
-        format!("{}/{}/{}/", &self.root, app_id, shuffle_id)
+        format!("{}/{}/", app_id, shuffle_id)
     }
 
     fn get_file_path_by_uid(&self, uid: &PartitionedUId) -> (String, String) {
@@ -103,14 +101,15 @@ impl HdfsStore {
         let shuffle_id = &uid.shuffle_id;
         let p_id = &uid.partition_id;
 
+        let worker_id = crate::app::SHUFFLE_SERVER_ID.get().unwrap();
         (
             format!(
-                "{}/{}/{}/{}-{}/partition-{}.data",
-                &self.root, app_id, shuffle_id, p_id, p_id, p_id
+                "{}/{}/{}-{}/{}.data",
+                app_id, shuffle_id, p_id, p_id, worker_id
             ),
             format!(
-                "{}/{}/{}/{}-{}/partition-{}.index",
-                &self.root, app_id, shuffle_id, p_id, p_id, p_id
+                "{}/{}/{}-{}/{}.index",
+                app_id, shuffle_id, p_id, p_id, worker_id
             ),
         )
     }
@@ -151,17 +150,22 @@ impl Store for HdfsStore {
             ))
             .await;
 
+        let filesystem = self.app_remote_clients.get(&uid.app_id).ok_or(
+            WorkerError::HDFS_NATIVE_CLIENT_NOT_FOUND(uid.app_id.to_string()),
+        )?;
+
         let mut next_offset = match 
self.partition_cached_meta.get(&data_file_path) {
             None => {
                 // setup the parent folder
                 let parent_dir = 
Path::new(data_file_path.as_str()).parent().unwrap();
                 let parent_path_str = format!("{}/", 
parent_dir.to_str().unwrap());
                 debug!("creating dir: {}", parent_path_str.as_str());
-                self.filesystem.create_dir(parent_path_str.as_str()).await?;
+
+                filesystem.create_dir(parent_path_str.as_str()).await?;
 
                 // setup the file
-                self.filesystem.touch(&data_file_path).await?;
-                self.filesystem.touch(&index_file_path).await?;
+                filesystem.touch(&data_file_path).await?;
+                filesystem.touch(&index_file_path).await?;
 
                 self.partition_cached_meta
                     .insert(data_file_path.to_string(), Default::default());
@@ -196,11 +200,11 @@ impl Store for HdfsStore {
             total_flushed += length;
         }
 
-        self.filesystem
+        filesystem
             .append(&data_file_path, data_bytes_holder.freeze())
             .instrument_await(format!("hdfs writing data. path: {}", 
data_file_path))
             .await?;
-        self.filesystem
+        filesystem
             .append(&index_file_path, index_bytes_holder.freeze())
             .instrument_await(format!("hdfs writing index. path: {}", 
data_file_path))
             .await?;
@@ -240,6 +244,9 @@ impl Store for HdfsStore {
 
     async fn purge(&self, ctx: PurgeDataContext) -> Result<()> {
         let app_id = ctx.app_id;
+        let filesystem = self.app_remote_clients.get(&app_id).ok_or(
+            WorkerError::HDFS_NATIVE_CLIENT_NOT_FOUND(app_id.to_string()),
+        )?;
 
         let dir = match ctx.shuffle_id {
             Some(shuffle_id) => self.get_shuffle_dir(app_id.as_str(), 
shuffle_id),
@@ -259,12 +266,37 @@ impl Store for HdfsStore {
         }
 
         info!("The hdfs data for {} has been deleted", &dir);
-        self.filesystem.delete_dir(dir.as_str()).await
+        filesystem.delete_dir(dir.as_str()).await?;
+        drop(filesystem);
+
+        if ctx.shuffle_id.is_none() {
+            self.app_remote_clients.remove(&app_id);
+        }
+
+        Ok(())
     }
 
     async fn is_healthy(&self) -> Result<bool> {
         Ok(true)
     }
+
+    async fn register_app(&self, ctx: RegisterAppContext) -> Result<()> {
+        let remote_storage_conf_option = 
ctx.app_config_options.remote_storage_config_option;
+        if remote_storage_conf_option.is_none() {
+            return Err(anyhow!(
+                "The remote config must be populated by app registry action!"
+            ));
+        }
+
+        let remote_storage_conf = remote_storage_conf_option.unwrap();
+        let client = HdfsNativeClient::new(remote_storage_conf.root, 
remote_storage_conf.configs)?;
+
+        let app_id = ctx.app_id.clone();
+        self.app_remote_clients
+            .entry(app_id)
+            .or_insert_with(|| client);
+        Ok(())
+    }
 }
 
 #[async_trait]
@@ -279,18 +311,38 @@ trait HdfsDelegator {
 
 struct HdfsNativeClient {
     client: Client,
+    root: String,
 }
 
 impl HdfsNativeClient {
-    fn new() -> Self {
-        let client = Client::default();
-        Self { client }
+    fn new(root: String, configs: HashMap<String, String>) -> Result<Self> {
+        // todo: do more optimizations!
+        let url = Url::parse(root.as_str())?;
+        let url_header = format!("{}://{}", url.scheme(), url.host().unwrap());
+
+        let root_path = url.path();
+
+        info!(
+            "Created hdfs client, header: {}, path: {}",
+            &url_header, root_path
+        );
+
+        let client = Client::new_with_config(url_header.as_str(), configs)?;
+        Ok(Self {
+            client,
+            root: root_path.to_string(),
+        })
+    }
+
+    fn wrap_root(&self, path: &str) -> String {
+        format!("{}/{}", &self.root, path)
     }
 }
 
 #[async_trait]
 impl HdfsDelegator for HdfsNativeClient {
     async fn touch(&self, file_path: &str) -> Result<()> {
+        let file_path = &self.wrap_root(file_path);
         self.client
             .create(file_path, WriteOptions::default())
             .await?
@@ -300,6 +352,7 @@ impl HdfsDelegator for HdfsNativeClient {
     }
 
     async fn append(&self, file_path: &str, data: Bytes) -> Result<()> {
+        let file_path = &self.wrap_root(file_path);
         let mut file_writer = self.client.append(file_path).await?;
         file_writer.write(data).await?;
         file_writer.close().await?;
@@ -307,16 +360,19 @@ impl HdfsDelegator for HdfsNativeClient {
     }
 
     async fn len(&self, file_path: &str) -> Result<u64> {
+        let file_path = &self.wrap_root(file_path);
         let file_info = self.client.get_file_info(file_path).await?;
         Ok(file_info.length as u64)
     }
 
     async fn create_dir(&self, dir: &str) -> Result<()> {
+        let dir = &self.wrap_root(dir);
         let _ = self.client.mkdirs(dir, 777, true).await?;
         Ok(())
     }
 
     async fn delete_dir(&self, dir: &str) -> Result<()> {
+        let dir = &self.wrap_root(dir);
         self.client.delete(dir, true).await?;
         Ok(())
     }
@@ -325,6 +381,16 @@ impl HdfsDelegator for HdfsNativeClient {
 #[cfg(test)]
 mod tests {
     use std::path::Path;
+    use url::Url;
+
+    #[test]
+    fn url_test() {
+        let url = Url::parse("hdfs://rbf-1:19999/a/b").unwrap();
+        assert_eq!("hdfs", url.scheme());
+        assert_eq!("rbf-1", url.host().unwrap().to_string());
+        assert_eq!(19999, url.port().unwrap());
+        assert_eq!("/a/b", url.path());
+    }
 
     #[test]
     fn dir_test() -> anyhow::Result<()> {
diff --git a/rust/experimental/server/src/store/hybrid.rs 
b/rust/experimental/server/src/store/hybrid.rs
index f9feaa091..cb29357fe 100644
--- a/rust/experimental/server/src/store/hybrid.rs
+++ b/rust/experimental/server/src/store/hybrid.rs
@@ -17,7 +17,7 @@
 
 use crate::app::{
     PartitionedUId, PurgeDataContext, ReadingIndexViewContext, ReadingOptions, 
ReadingViewContext,
-    ReleaseBufferContext, RequireBufferContext, WritingViewContext,
+    RegisterAppContext, ReleaseBufferContext, RequireBufferContext, 
WritingViewContext,
 };
 use crate::await_tree::AWAIT_TREE_REGISTRY;
 
@@ -85,9 +85,12 @@ pub struct HybridStore {
     runtime_manager: RuntimeManager,
 }
 
+#[derive(Clone)]
 struct SpillMessage {
     ctx: WritingViewContext,
     id: i64,
+    retry_cnt: i32,
+    previous_spilled_storage: Option<Arc<Box<dyn PersistentStore>>>,
 }
 
 unsafe impl Send for HybridStore {}
@@ -181,9 +184,12 @@ impl HybridStore {
 
     async fn memory_spill_to_persistent_store(
         &self,
-        mut ctx: WritingViewContext,
-        in_flight_blocks_id: i64,
+        spill_message: SpillMessage,
     ) -> Result<String, WorkerError> {
+        let mut ctx: WritingViewContext = spill_message.ctx;
+        let in_flight_blocks_id: i64 = spill_message.id;
+        let retry_cnt = spill_message.retry_cnt;
+
         let uid = ctx.uid.clone();
         let blocks = &ctx.data_blocks;
         let mut spill_size = 0i64;
@@ -197,9 +203,15 @@ impl HybridStore {
             .ok_or(anyhow!("empty warm store. It should not happen"))?;
         let cold = self.cold_store.as_ref().unwrap_or(warm);
 
-        let candidate_store = if warm.is_healthy().await? {
+        // we should cover the following cases
+        // 1. local store is unhealthy. spill to hdfs
+        // 2. event flushed to localfile failed. and exceed retry max cnt, 
fallback to hdfs
+        // 3. huge partition directly flush to hdfs
+
+        // normal assignment
+        let mut candidate_store = if warm.is_healthy().await? {
             let cold_spilled_size = 
self.memory_spill_to_cold_threshold_size.unwrap_or(u64::MAX);
-            if cold_spilled_size < spill_size as u64 {
+            if cold_spilled_size < spill_size as u64 || 
ctx.owned_by_huge_partition {
                 cold
             } else {
                 warm
@@ -208,6 +220,11 @@ impl HybridStore {
             cold
         };
 
+        // fallback assignment. propose hdfs always is active and stable
+        if retry_cnt >= 1 {
+            candidate_store = cold;
+        }
+
         match self.get_store_type(candidate_store) {
             StorageType::LOCALFILE => {
                 TOTAL_MEMORY_SPILL_TO_LOCALFILE.inc();
@@ -290,16 +307,15 @@ impl HybridStore {
         blocks: Vec<PartitionedDataBlock>,
         uid: PartitionedUId,
     ) -> Result<()> {
-        let writing_ctx = WritingViewContext {
-            uid,
-            data_blocks: blocks,
-        };
+        let writing_ctx = WritingViewContext::from(uid, blocks);
 
         if self
             .memory_spill_send
             .send(SpillMessage {
                 ctx: writing_ctx,
                 id: in_flight_uid,
+                retry_cnt: 0,
+                previous_spilled_storage: None,
             })
             .await
             .is_err()
@@ -339,7 +355,7 @@ impl Store for HybridStore {
         let concurrency_limiter =
             Arc::new(Semaphore::new(store.memory_spill_max_concurrency as 
usize));
         self.runtime_manager.write_runtime.spawn(async move {
-            while let Ok(message) = store.memory_spill_recv.recv().await {
+            while let Ok(mut message) = store.memory_spill_recv.recv().await {
                 let await_root = await_tree_registry
                     .register(format!("hot->warm flush. uid: {:#?}", 
&message.ctx.uid))
                     .await;
@@ -364,7 +380,7 @@ impl Store for HybridStore {
                             size += block.length as u64;
                         }
                         match store_cloned
-                            
.memory_spill_to_persistent_store(message.ctx.clone(), message.id)
+                            .memory_spill_to_persistent_store(message.clone())
                             
.instrument_await("memory_spill_to_persistent_store.")
                             .await
                         {
@@ -375,9 +391,11 @@ impl Store for HybridStore {
                             Err(error) => {
                                 TOTAL_MEMORY_SPILL_OPERATION_FAILED.inc();
                                 error!(
-                                "Errors on spill memory data to persistent 
storage. error: {:#?}",
-                                error
-                            );
+                                "Errors on spill memory data to persistent 
storage for event_id:{:?}. The error: {:#?}",
+                                    message.id,
+                                    error);
+
+                                message.retry_cnt = message.retry_cnt + 1;
                                 // re-push to the queue to execute
                                 let _ = 
store_cloned.memory_spill_send.send(message).await;
                             }
@@ -495,6 +513,25 @@ impl Store for HybridStore {
             .unwrap_or(false);
         Ok(self.hot_store.is_healthy().await? && (warm || cold))
     }
+
+    async fn register_app(&self, ctx: RegisterAppContext) -> Result<()> {
+        self.hot_store.register_app(ctx.clone()).await?;
+        if self.warm_store.is_some() {
+            self.warm_store
+                .as_ref()
+                .unwrap()
+                .register_app(ctx.clone())
+                .await?;
+        }
+        if self.cold_store.is_some() {
+            self.cold_store
+                .as_ref()
+                .unwrap()
+                .register_app(ctx.clone())
+                .await?;
+        }
+        Ok(())
+    }
 }
 
 pub async fn watermark_flush(store: Arc<HybridStore>) -> Result<()> {
@@ -628,9 +665,9 @@ mod tests {
         let mut block_ids = vec![];
         for i in 0..batch_size {
             block_ids.push(i);
-            let writing_ctx = WritingViewContext {
-                uid: uid.clone(),
-                data_blocks: vec![PartitionedDataBlock {
+            let writing_ctx = WritingViewContext::from(
+                uid.clone(),
+                vec![PartitionedDataBlock {
                     block_id: i,
                     length: data_len as i32,
                     uncompress_length: 100,
@@ -638,7 +675,7 @@ mod tests {
                     data: Bytes::copy_from_slice(data),
                     task_attempt_id: 0,
                 }],
-            };
+            );
             let _ = store.insert(writing_ctx).await;
         }
 
diff --git a/rust/experimental/server/src/store/local/disk.rs 
b/rust/experimental/server/src/store/local/disk.rs
index a0e16af74..b8cd2c14a 100644
--- a/rust/experimental/server/src/store/local/disk.rs
+++ b/rust/experimental/server/src/store/local/disk.rs
@@ -228,15 +228,15 @@ impl LocalDisk {
         Ok(())
     }
 
-    fn mark_corrupted(&self) {
+    pub fn mark_corrupted(&self) {
         self.is_corrupted.store(true, Ordering::SeqCst);
     }
 
-    fn mark_unhealthy(&self) {
+    pub fn mark_unhealthy(&self) {
         self.is_healthy.store(false, Ordering::SeqCst);
     }
 
-    fn mark_healthy(&self) {
+    pub fn mark_healthy(&self) {
         self.is_healthy.store(true, Ordering::SeqCst);
     }
 
diff --git a/rust/experimental/server/src/store/localfile.rs 
b/rust/experimental/server/src/store/localfile.rs
index cc2a881f6..bb810dc78 100644
--- a/rust/experimental/server/src/store/localfile.rs
+++ b/rust/experimental/server/src/store/localfile.rs
@@ -18,7 +18,7 @@
 use crate::app::ReadingOptions::FILE_OFFSET_AND_LEN;
 use crate::app::{
     PartitionedUId, PurgeDataContext, ReadingIndexViewContext, 
ReadingViewContext,
-    ReleaseBufferContext, RequireBufferContext, WritingViewContext,
+    RegisterAppContext, ReleaseBufferContext, RequireBufferContext, 
WritingViewContext,
 };
 use crate::config::LocalfileStoreConfig;
 use crate::error::WorkerError;
@@ -203,6 +203,12 @@ impl Store for LocalFileStore {
             return 
Err(WorkerError::PARTIAL_DATA_LOST(local_disk.root.to_string()));
         }
 
+        if !local_disk.is_healthy()? {
+            return Err(WorkerError::LOCAL_DISK_UNHEALTHY(
+                local_disk.root.to_string(),
+            ));
+        }
+
         if !parent_dir_is_created {
             if let Some(path) = Path::new(&data_file_path).parent() {
                 local_disk
@@ -400,6 +406,10 @@ impl Store for LocalFileStore {
     async fn release_buffer(&self, _ctx: ReleaseBufferContext) -> Result<i64, 
WorkerError> {
         todo!()
     }
+
+    async fn register_app(&self, _ctx: RegisterAppContext) -> Result<()> {
+        Ok(())
+    }
 }
 
 #[cfg(test)]
@@ -410,10 +420,94 @@ mod test {
     };
     use crate::store::localfile::LocalFileStore;
 
+    use crate::error::WorkerError;
     use crate::store::{PartitionedDataBlock, ResponseData, ResponseDataIndex, 
Store};
     use bytes::{Buf, Bytes, BytesMut};
     use log::info;
 
+    fn create_writing_ctx() -> WritingViewContext {
+        let uid = PartitionedUId {
+            app_id: "100".to_string(),
+            shuffle_id: 0,
+            partition_id: 0,
+        };
+
+        let data = b"hello world!hello china!";
+        let size = data.len();
+        let writing_ctx = WritingViewContext::from(
+            uid.clone(),
+            vec![
+                PartitionedDataBlock {
+                    block_id: 0,
+                    length: size as i32,
+                    uncompress_length: 200,
+                    crc: 0,
+                    data: Bytes::copy_from_slice(data),
+                    task_attempt_id: 0,
+                },
+                PartitionedDataBlock {
+                    block_id: 1,
+                    length: size as i32,
+                    uncompress_length: 200,
+                    crc: 0,
+                    data: Bytes::copy_from_slice(data),
+                    task_attempt_id: 0,
+                },
+            ],
+        );
+
+        writing_ctx
+    }
+
+    #[test]
+    fn local_disk_under_exception_test() -> anyhow::Result<()> {
+        let temp_dir = 
tempdir::TempDir::new("local_disk_under_exception_test").unwrap();
+        let temp_path = temp_dir.path().to_str().unwrap().to_string();
+        println!("init local file path: {}", &temp_path);
+        let local_store = LocalFileStore::new(vec![temp_path.to_string()]);
+
+        let runtime = local_store.runtime_manager.clone();
+
+        let writing_view_ctx = create_writing_ctx();
+        let insert_result = runtime.wait(local_store.insert(writing_view_ctx));
+
+        if insert_result.is_err() {
+            println!("{:?}", insert_result.err());
+            panic!()
+        }
+
+        // case1: mark the local disk unhealthy, that will the following flush 
throw exception directly.
+        let local_disk = local_store.local_disks[0].clone();
+        local_disk.mark_unhealthy();
+
+        let writing_view_ctx = create_writing_ctx();
+        let insert_result = runtime.wait(local_store.insert(writing_view_ctx));
+        match insert_result {
+            Err(WorkerError::LOCAL_DISK_UNHEALTHY(_)) => {}
+            _ => panic!(),
+        }
+
+        // case2: mark the local disk healthy, all things work!
+        local_disk.mark_healthy();
+        let writing_view_ctx = create_writing_ctx();
+        let insert_result = runtime.wait(local_store.insert(writing_view_ctx));
+        match insert_result {
+            Err(WorkerError::LOCAL_DISK_UNHEALTHY(_)) => panic!(),
+            _ => {}
+        }
+
+        // case3: mark the local disk corrupted, fail directly.
+        local_disk.mark_corrupted();
+        let writing_view_ctx = create_writing_ctx();
+        let insert_result = runtime.wait(local_store.insert(writing_view_ctx));
+        match insert_result {
+            Err(WorkerError::PARTIAL_DATA_LOST(_)) => {}
+            _ => panic!(),
+        }
+
+        Ok(())
+    }
+
     #[test]
     fn purge_test() -> anyhow::Result<()> {
         let temp_dir = tempdir::TempDir::new("test_local_store").unwrap();
@@ -432,9 +526,9 @@ mod test {
 
         let data = b"hello world!hello china!";
         let size = data.len();
-        let writing_ctx = WritingViewContext {
-            uid: uid.clone(),
-            data_blocks: vec![
+        let writing_ctx = WritingViewContext::from(
+            uid.clone(),
+            vec![
                 PartitionedDataBlock {
                     block_id: 0,
                     length: size as i32,
@@ -452,7 +546,7 @@ mod test {
                     task_attempt_id: 0,
                 },
             ],
-        };
+        );
 
         let insert_result = runtime.wait(local_store.insert(writing_ctx));
         if insert_result.is_err() {
@@ -506,9 +600,9 @@ mod test {
 
         let data = b"hello world!hello china!";
         let size = data.len();
-        let writing_ctx = WritingViewContext {
-            uid: uid.clone(),
-            data_blocks: vec![
+        let writing_ctx = WritingViewContext::from(
+            uid.clone(),
+            vec![
                 PartitionedDataBlock {
                     block_id: 0,
                     length: size as i32,
@@ -526,7 +620,7 @@ mod test {
                     task_attempt_id: 0,
                 },
             ],
-        };
+        );
 
         let insert_result = runtime.wait(local_store.insert(writing_ctx));
         if insert_result.is_err() {
diff --git a/rust/experimental/server/src/store/memory.rs 
b/rust/experimental/server/src/store/memory.rs
index cfb6ff0e0..b1be8e5ae 100644
--- a/rust/experimental/server/src/store/memory.rs
+++ b/rust/experimental/server/src/store/memory.rs
@@ -18,7 +18,7 @@
 use crate::app::ReadingOptions::MEMORY_LAST_BLOCK_ID_AND_MAX_SIZE;
 use crate::app::{
     PartitionedUId, PurgeDataContext, ReadingIndexViewContext, 
ReadingViewContext,
-    ReleaseBufferContext, RequireBufferContext, WritingViewContext,
+    RegisterAppContext, ReleaseBufferContext, RequireBufferContext, 
WritingViewContext,
 };
 use crate::config::MemoryStoreConfig;
 use crate::error::WorkerError;
@@ -455,6 +455,10 @@ impl Store for MemoryStore {
         let ticket_id = ctx.ticket_id;
         self.ticket_manager.delete(ticket_id)
     }
+
+    async fn register_app(&self, _ctx: RegisterAppContext) -> Result<()> {
+        Ok(())
+    }
 }
 
 /// thread safe, this will be guarded by the lock
@@ -882,7 +886,7 @@ mod test {
                 task_attempt_id: 0,
             });
         }
-        WritingViewContext { uid, data_blocks }
+        WritingViewContext::from(uid, data_blocks)
     }
 
     #[test]
@@ -927,9 +931,9 @@ mod test {
             .wait(store.require_buffer(RequireBufferContext::new(uid.clone(), 
40)))
             .expect("");
 
-        let writing_ctx = WritingViewContext {
-            uid: uid.clone(),
-            data_blocks: vec![PartitionedDataBlock {
+        let writing_ctx = WritingViewContext::from(
+            uid.clone(),
+            vec![PartitionedDataBlock {
                 block_id: 0,
                 length: 10,
                 uncompress_length: 100,
@@ -937,7 +941,7 @@ mod test {
                 data: Default::default(),
                 task_attempt_id: 0,
             }],
-        };
+        );
         runtime.wait(store.insert(writing_ctx)).expect("");
 
         let reading_ctx = ReadingViewContext {
@@ -988,9 +992,9 @@ mod test {
         let store = MemoryStore::new(1024 * 1024 * 1024);
         let runtime = store.runtime_manager.clone();
 
-        let writing_ctx = WritingViewContext {
-            uid: Default::default(),
-            data_blocks: vec![
+        let writing_ctx = WritingViewContext::from(
+            Default::default(),
+            vec![
                 PartitionedDataBlock {
                     block_id: 0,
                     length: 10,
@@ -1008,7 +1012,7 @@ mod test {
                     task_attempt_id: 1,
                 },
             ],
-        };
+        );
         runtime.wait(store.insert(writing_ctx)).unwrap();
 
         let reading_ctx = ReadingViewContext {
@@ -1033,9 +1037,9 @@ mod test {
         let runtime = store.runtime_manager.clone();
 
         // 1. insert 2 block
-        let writing_ctx = WritingViewContext {
-            uid: Default::default(),
-            data_blocks: vec![
+        let writing_ctx = WritingViewContext::from(
+            Default::default(),
+            vec![
                 PartitionedDataBlock {
                     block_id: 0,
                     length: 10,
@@ -1053,7 +1057,7 @@ mod test {
                     task_attempt_id: 1,
                 },
             ],
-        };
+        );
         runtime.wait(store.insert(writing_ctx)).unwrap();
 
         // 2. block_ids_filter is empty, should return 2 blocks
diff --git a/rust/experimental/server/src/store/mod.rs 
b/rust/experimental/server/src/store/mod.rs
index caf00a068..12a509d9b 100644
--- a/rust/experimental/server/src/store/mod.rs
+++ b/rust/experimental/server/src/store/mod.rs
@@ -24,8 +24,8 @@ pub mod mem;
 pub mod memory;
 
 use crate::app::{
-    PurgeDataContext, ReadingIndexViewContext, ReadingViewContext, 
ReleaseBufferContext,
-    RequireBufferContext, WritingViewContext,
+    PurgeDataContext, ReadingIndexViewContext, ReadingViewContext, 
RegisterAppContext,
+    ReleaseBufferContext, RequireBufferContext, WritingViewContext,
 };
 use crate::config::Config;
 use crate::error::WorkerError;
@@ -176,6 +176,7 @@ pub trait Store {
         ctx: RequireBufferContext,
     ) -> Result<RequireBufferResponse, WorkerError>;
     async fn release_buffer(&self, ctx: ReleaseBufferContext) -> Result<i64, 
WorkerError>;
+    async fn register_app(&self, ctx: RegisterAppContext) -> Result<()>;
 }
 
 pub trait Persistent {}

Reply via email to