This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 5d027c0cc [#1407] feat(rust): refactor localfile store to speed up 
writing (#1422)
5d027c0cc is described below

commit 5d027c0ccc4f1c24225e5639f0a44142d4b7b1ae
Author: Junfan Zhang <[email protected]>
AuthorDate: Mon Jan 8 11:41:48 2024 +0800

    [#1407] feat(rust): refactor localfile store to speed up writing (#1422)
    
    ### What changes were proposed in this pull request?
    
    1. using `opendal` crate to simplify read/write
    2. avoid using dashmap.get to fix potential deadlock
    
    ### Why are the changes needed?
    
    For: #1407
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Unit tests
    
    ### Commit logs
    
    1. using opendal crate to simplify read/write
    2. avoid using dashmap.get to fix potential deadlock
---
 .gitignore                                       |   1 -
 rust/experimental/server/Cargo.lock              | 645 +++++++++++++++++++++-
 rust/experimental/server/Cargo.toml              |   1 +
 rust/experimental/server/README.md               |  17 +-
 rust/experimental/server/src/store/hybrid.rs     |  21 +-
 rust/experimental/server/src/store/local/disk.rs | 372 +++++++++++++
 rust/experimental/server/src/store/local/mod.rs  |  18 +
 rust/experimental/server/src/store/localfile.rs  | 670 ++++-------------------
 rust/experimental/server/src/store/mod.rs        |   1 +
 9 files changed, 1151 insertions(+), 595 deletions(-)

diff --git a/.gitignore b/.gitignore
index df4bb57a5..6f7717539 100644
--- a/.gitignore
+++ b/.gitignore
@@ -29,7 +29,6 @@ deploy/kubernetes/docker/hadoopconfig/*
 *.dll
 *.so
 *.dylib
-local
 vendor
 VERSION
 testbin/*
diff --git a/rust/experimental/server/Cargo.lock 
b/rust/experimental/server/Cargo.lock
index 94a281877..0d3e7ac2f 100644
--- a/rust/experimental/server/Cargo.lock
+++ b/rust/experimental/server/Cargo.lock
@@ -38,6 +38,21 @@ dependencies = [
  "memchr",
 ]
 
+[[package]]
+name = "android-tzdata"
+version = "0.1.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "e999941b234f3131b00bc13c22d06e8c5ff726d1b6318ac7eb276997bbb4fef0"
+
+[[package]]
+name = "android_system_properties"
+version = "0.1.5"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "819e7219dbd41043ac279b19830f2efc897156490d7fd6ea916720117ee66311"
+dependencies = [
+ "libc",
+]
+
 [[package]]
 name = "anyhow"
 version = "1.0.75"
@@ -61,6 +76,19 @@ dependencies = [
  "futures-core",
 ]
 
+[[package]]
+name = "async-compat"
+version = "0.2.3"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "f68a707c1feb095d8c07f8a65b9f506b117d30af431cab89374357de7c11461b"
+dependencies = [
+ "futures-core",
+ "futures-io",
+ "once_cell",
+ "pin-project-lite",
+ "tokio",
+]
+
 [[package]]
 name = "async-trait"
 version = "0.1.73"
@@ -157,6 +185,18 @@ dependencies = [
  "tower-service",
 ]
 
+[[package]]
+name = "backon"
+version = "0.4.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "0c1a6197b2120bb2185a267f6515038558b019e92b832bb0320e96d66268dcf9"
+dependencies = [
+ "fastrand 1.9.0",
+ "futures-core",
+ "pin-project 1.1.3",
+ "tokio",
+]
+
 [[package]]
 name = "backtrace"
 version = "0.3.69"
@@ -184,6 +224,12 @@ version = "0.21.4"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "9ba43ea6f343b788c8764558649e08df62f86c6ef251fdaeb1ffd010a9ae50a2"
 
+[[package]]
+name = "base64ct"
+version = "1.6.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b"
+
 [[package]]
 name = "bindgen"
 version = "0.64.0"
@@ -293,6 +339,20 @@ version = "1.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
 
+[[package]]
+name = "chrono"
+version = "0.4.31"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "7f2c685bad3eb3d45a01354cedb7d5faa66194d1d58ba6e267a8de788f79db38"
+dependencies = [
+ "android-tzdata",
+ "iana-time-zone",
+ "js-sys",
+ "num-traits",
+ "wasm-bindgen",
+ "windows-targets 0.48.5",
+]
+
 [[package]]
 name = "clang-sys"
 version = "1.7.0"
@@ -437,6 +497,32 @@ dependencies = [
  "tracing-subscriber",
 ]
 
+[[package]]
+name = "const-oid"
+version = "0.9.6"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "c2459377285ad874054d797f3ccebf984978aa39129f6eafde5cdc8315b612f8"
+
+[[package]]
+name = "const-random"
+version = "0.1.17"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "5aaf16c9c2c612020bcfd042e170f6e32de9b9d75adb5277cdbbd2e2c8c8299a"
+dependencies = [
+ "const-random-macro",
+]
+
+[[package]]
+name = "const-random-macro"
+version = "0.1.16"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "f9d839f2a20b0aee515dc581a6172f2321f96cab76c1a38a4c584a194955390e"
+dependencies = [
+ "getrandom",
+ "once_cell",
+ "tiny-keccak",
+]
+
 [[package]]
 name = "core-foundation"
 version = "0.9.3"
@@ -607,6 +693,12 @@ dependencies = [
  "winapi",
 ]
 
+[[package]]
+name = "crunchy"
+version = "0.2.2"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "7a81dae078cea95a014a339291cec439d2f232ebe854a9d672b796c6afafa9b7"
+
 [[package]]
 name = "crypto-common"
 version = "0.1.6"
@@ -674,6 +766,17 @@ dependencies = [
  "uuid",
 ]
 
+[[package]]
+name = "der"
+version = "0.7.8"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "fffa369a668c8af7dbf8b5e56c9f744fbd399949ed171606040001947de40b1c"
+dependencies = [
+ "const-oid",
+ "pem-rfc7468",
+ "zeroize",
+]
+
 [[package]]
 name = "deranged"
 version = "0.3.8"
@@ -718,7 +821,9 @@ source = 
"registry+https://github.com/rust-lang/crates.io-index";
 checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292"
 dependencies = [
  "block-buffer",
+ "const-oid",
  "crypto-common",
+ "subtle",
 ]
 
 [[package]]
@@ -741,6 +846,15 @@ dependencies = [
  "winapi",
 ]
 
+[[package]]
+name = "dlv-list"
+version = "0.5.2"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "442039f5147480ba31067cb00ada1adae6892028e40e45fc5de7b7df6dcc1b5f"
+dependencies = [
+ "const-random",
+]
+
 [[package]]
 name = "either"
 version = "1.9.0"
@@ -812,6 +926,15 @@ dependencies = [
  "once_cell",
 ]
 
+[[package]]
+name = "fastrand"
+version = "1.9.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "e51093e27b0797c359783294ca4f0a911c270184cb10f85783b118614a1501be"
+dependencies = [
+ "instant",
+]
+
 [[package]]
 name = "fastrand"
 version = "2.0.0"
@@ -836,6 +959,12 @@ version = "0.4.2"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80"
 
+[[package]]
+name = "flagset"
+version = "0.4.4"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "d52a7e408202050813e6f1d9addadcaafef3dca7530c7ddfb005d4081cce6779"
+
 [[package]]
 name = "flate2"
 version = "1.0.27"
@@ -1047,8 +1176,10 @@ source = 
"registry+https://github.com/rust-lang/crates.io-index";
 checksum = "be4136b2a15dd319360be1c07d9933517ccf0be8f16bf62a3bee4f0d618df427"
 dependencies = [
  "cfg-if",
+ "js-sys",
  "libc",
  "wasi",
+ "wasm-bindgen",
 ]
 
 [[package]]
@@ -1184,6 +1315,15 @@ version = "0.4.3"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70"
 
+[[package]]
+name = "hmac"
+version = "0.12.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "6c49c37c09c17a53d937dfbb742eb3a961d65a994e6bcdcf37e7399d0cc8ab5e"
+dependencies = [
+ "digest",
+]
+
 [[package]]
 name = "home"
 version = "0.5.5"
@@ -1257,6 +1397,20 @@ dependencies = [
  "want",
 ]
 
+[[package]]
+name = "hyper-rustls"
+version = "0.24.2"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "ec3efd23720e2049821a693cbc7e65ea87c72f1c58ff2f9522ff332b1491e590"
+dependencies = [
+ "futures-util",
+ "http",
+ "hyper",
+ "rustls 0.21.7",
+ "tokio",
+ "tokio-rustls 0.24.1",
+]
+
 [[package]]
 name = "hyper-timeout"
 version = "0.4.1"
@@ -1282,6 +1436,29 @@ dependencies = [
  "tokio-native-tls",
 ]
 
+[[package]]
+name = "iana-time-zone"
+version = "0.1.59"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "b6a67363e2aa4443928ce15e57ebae94fd8949958fd1223c4cfc0cd473ad7539"
+dependencies = [
+ "android_system_properties",
+ "core-foundation-sys",
+ "iana-time-zone-haiku",
+ "js-sys",
+ "wasm-bindgen",
+ "windows-core",
+]
+
+[[package]]
+name = "iana-time-zone-haiku"
+version = "0.1.2"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "f31827a206f56af32e590ba56d5d2d085f558508192593743f16b2306495269f"
+dependencies = [
+ "cc",
+]
+
 [[package]]
 name = "ident_case"
 version = "1.0.1"
@@ -1343,11 +1520,20 @@ dependencies = [
  "log",
  "num-format",
  "once_cell",
- "quick-xml",
+ "quick-xml 0.26.0",
  "rgb",
  "str_stack",
 ]
 
+[[package]]
+name = "instant"
+version = "0.1.12"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "7a5bbe824c507c5da5956355e86a746d82e0e1464f65d862cc5e71da70e94b2c"
+dependencies = [
+ "cfg-if",
+]
+
 [[package]]
 name = "io-lifetimes"
 version = "1.0.11"
@@ -1400,11 +1586,29 @@ dependencies = [
  "wasm-bindgen",
 ]
 
+[[package]]
+name = "jsonwebtoken"
+version = "9.2.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "5c7ea04a7c5c055c175f189b6dc6ba036fd62306b58c66c9f6389036c503a3f4"
+dependencies = [
+ "base64 0.21.4",
+ "js-sys",
+ "pem",
+ "ring 0.17.7",
+ "serde",
+ "serde_json",
+ "simple_asn1",
+]
+
 [[package]]
 name = "lazy_static"
 version = "1.4.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
+dependencies = [
+ "spin 0.5.2",
+]
 
 [[package]]
 name = "lazycell"
@@ -1449,6 +1653,12 @@ dependencies = [
  "windows-sys 0.48.0",
 ]
 
+[[package]]
+name = "libm"
+version = "0.2.8"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "4ec2a862134d2a7d32d7983ddcdd1c4923530833c9f2ea1a44fc5fa473989058"
+
 [[package]]
 name = "linux-raw-sys"
 version = "0.1.4"
@@ -1492,6 +1702,16 @@ version = "0.7.3"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94"
 
+[[package]]
+name = "md-5"
+version = "0.10.6"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "d89e7ee0cfbedfc4da3340218492196241d89eefb6dab27de5df917a6d2e78cf"
+dependencies = [
+ "cfg-if",
+ "digest",
+]
+
 [[package]]
 name = "memchr"
 version = "2.6.3"
@@ -1604,6 +1824,34 @@ dependencies = [
  "winapi",
 ]
 
+[[package]]
+name = "num-bigint"
+version = "0.4.4"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "608e7659b5c3d7cba262d894801b9ec9d00de989e8a82bd4bef91d08da45cdc0"
+dependencies = [
+ "autocfg",
+ "num-integer",
+ "num-traits",
+]
+
+[[package]]
+name = "num-bigint-dig"
+version = "0.8.4"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "dc84195820f291c7697304f3cbdadd1cb7199c0efc917ff5eafd71225c136151"
+dependencies = [
+ "byteorder",
+ "lazy_static",
+ "libm",
+ "num-integer",
+ "num-iter",
+ "num-traits",
+ "rand 0.8.5",
+ "smallvec",
+ "zeroize",
+]
+
 [[package]]
 name = "num-format"
 version = "0.4.4"
@@ -1614,6 +1862,27 @@ dependencies = [
  "itoa",
 ]
 
+[[package]]
+name = "num-integer"
+version = "0.1.45"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "225d3389fb3509a24c93f5c29eb6bde2586b98d9f016636dff58d7c6f7569cd9"
+dependencies = [
+ "autocfg",
+ "num-traits",
+]
+
+[[package]]
+name = "num-iter"
+version = "0.1.43"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "7d03e6c028c5dc5cac6e2dec0efda81fc887605bb3d884578bb6d6bf7514e252"
+dependencies = [
+ "autocfg",
+ "num-integer",
+ "num-traits",
+]
+
 [[package]]
 name = "num-traits"
 version = "0.2.16"
@@ -1621,6 +1890,7 @@ source = 
"registry+https://github.com/rust-lang/crates.io-index";
 checksum = "f30b0abd723be7e2ffca1272140fac1a2f084c77ec3e123c192b66af1ee9e6c2"
 dependencies = [
  "autocfg",
+ "libm",
 ]
 
 [[package]]
@@ -1648,6 +1918,39 @@ version = "1.18.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "dd8b5dd2ae5ed71462c540258bedcb51965123ad7e7ccf4b9a8cafaa4a63576d"
 
+[[package]]
+name = "opendal"
+version = "0.44.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "c32736a48ef08a5d2212864e2295c8e54f4d6b352b7f49aa0c29a12fc410ff66"
+dependencies = [
+ "anyhow",
+ "async-compat",
+ "async-trait",
+ "backon",
+ "base64 0.21.4",
+ "bytes 1.5.0",
+ "chrono",
+ "flagset",
+ "futures",
+ "getrandom",
+ "http",
+ "log",
+ "md-5",
+ "once_cell",
+ "parking_lot",
+ "percent-encoding",
+ "pin-project 1.1.3",
+ "quick-xml 0.30.0",
+ "reqsign",
+ "reqwest",
+ "serde",
+ "serde_json",
+ "sha2",
+ "tokio",
+ "uuid",
+]
+
 [[package]]
 name = "openssl"
 version = "0.10.57"
@@ -1692,6 +1995,16 @@ dependencies = [
  "vcpkg",
 ]
 
+[[package]]
+name = "ordered-multimap"
+version = "0.7.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "a4d6a8c22fc714f0c2373e6091bf6f5e9b37b1bc0b1184874b7e0a4e303d318f"
+dependencies = [
+ "dlv-list",
+ "hashbrown 0.14.0",
+]
+
 [[package]]
 name = "os_str_bytes"
 version = "6.5.1"
@@ -1745,6 +2058,25 @@ version = "0.1.2"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "19b17cddbe7ec3f8bc800887bab5e717348c95ea2ca0b1bf0837fb964dc67099"
 
+[[package]]
+name = "pem"
+version = "3.0.3"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "1b8fcc794035347fb64beda2d3b462595dd2753e3f268d89c5aae77e8cf2c310"
+dependencies = [
+ "base64 0.21.4",
+ "serde",
+]
+
+[[package]]
+name = "pem-rfc7468"
+version = "0.7.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "88b39c9bfcfc231068454382784bb460aae594343fb030d46e9f50a645418412"
+dependencies = [
+ "base64ct",
+]
+
 [[package]]
 name = "percent-encoding"
 version = "2.3.0"
@@ -1813,6 +2145,27 @@ version = "0.1.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
 
+[[package]]
+name = "pkcs1"
+version = "0.7.5"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "c8ffb9f10fa047879315e6625af03c164b16962a5368d724ed16323b68ace47f"
+dependencies = [
+ "der",
+ "pkcs8",
+ "spki",
+]
+
+[[package]]
+name = "pkcs8"
+version = "0.10.2"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "f950b2377845cebe5cf8b5165cb3cc1a5e0fa5cfa3e1f7f55707d8fd82e0a7b7"
+dependencies = [
+ "der",
+ "spki",
+]
+
 [[package]]
 name = "pkg-config"
 version = "0.3.27"
@@ -2065,6 +2418,26 @@ dependencies = [
  "memchr",
 ]
 
+[[package]]
+name = "quick-xml"
+version = "0.30.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "eff6510e86862b57b210fd8cbe8ed3f0d7d600b9c2863cd4549a2e033c66e956"
+dependencies = [
+ "memchr",
+ "serde",
+]
+
+[[package]]
+name = "quick-xml"
+version = "0.31.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "1004a344b30a54e2ee58d66a71b32d2db2feb0a31f9a2d302bf0536f15de2a33"
+dependencies = [
+ "memchr",
+ "serde",
+]
+
 [[package]]
 name = "quote"
 version = "1.0.33"
@@ -2236,6 +2609,38 @@ dependencies = [
  "winapi",
 ]
 
+[[package]]
+name = "reqsign"
+version = "0.14.6"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "dce87f66ba6c6acef277a729f989a0eca946cb9ce6a15bcc036bda0f72d4b9fd"
+dependencies = [
+ "anyhow",
+ "async-trait",
+ "base64 0.21.4",
+ "chrono",
+ "form_urlencoded",
+ "getrandom",
+ "hex",
+ "hmac",
+ "home",
+ "http",
+ "jsonwebtoken",
+ "log",
+ "once_cell",
+ "percent-encoding",
+ "quick-xml 0.31.0",
+ "rand 0.8.5",
+ "reqwest",
+ "rsa",
+ "rust-ini",
+ "serde",
+ "serde_json",
+ "sha1",
+ "sha2",
+ "tokio",
+]
+
 [[package]]
 name = "reqwest"
 version = "0.11.20"
@@ -2251,6 +2656,7 @@ dependencies = [
  "http",
  "http-body",
  "hyper",
+ "hyper-rustls",
  "hyper-tls",
  "ipnet",
  "js-sys",
@@ -2260,15 +2666,21 @@ dependencies = [
  "once_cell",
  "percent-encoding",
  "pin-project-lite",
+ "rustls 0.21.7",
+ "rustls-native-certs",
+ "rustls-pemfile",
  "serde",
  "serde_json",
  "serde_urlencoded",
  "tokio",
  "tokio-native-tls",
+ "tokio-rustls 0.24.1",
+ "tokio-util 0.7.9",
  "tower-service",
  "url",
  "wasm-bindgen",
  "wasm-bindgen-futures",
+ "wasm-streams",
  "web-sys",
  "winreg",
 ]
@@ -2301,11 +2713,25 @@ dependencies = [
  "libc",
  "once_cell",
  "spin 0.5.2",
- "untrusted",
+ "untrusted 0.7.1",
  "web-sys",
  "winapi",
 ]
 
+[[package]]
+name = "ring"
+version = "0.17.7"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "688c63d65483050968b2a8937f7995f443e27041a0f7700aa59b0822aedebb74"
+dependencies = [
+ "cc",
+ "getrandom",
+ "libc",
+ "spin 0.9.8",
+ "untrusted 0.9.0",
+ "windows-sys 0.48.0",
+]
+
 [[package]]
 name = "roxmltree"
 version = "0.18.1"
@@ -2315,6 +2741,36 @@ dependencies = [
  "xmlparser",
 ]
 
+[[package]]
+name = "rsa"
+version = "0.9.6"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "5d0e5124fcb30e76a7e79bfee683a2746db83784b86289f6251b54b7950a0dfc"
+dependencies = [
+ "const-oid",
+ "digest",
+ "num-bigint-dig",
+ "num-integer",
+ "num-traits",
+ "pkcs1",
+ "pkcs8",
+ "rand_core 0.6.4",
+ "signature",
+ "spki",
+ "subtle",
+ "zeroize",
+]
+
+[[package]]
+name = "rust-ini"
+version = "0.20.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "3e0698206bcb8882bf2a9ecb4c1e7785db57ff052297085a6efd4fe42302068a"
+dependencies = [
+ "cfg-if",
+ "ordered-multimap",
+]
+
 [[package]]
 name = "rustc-demangle"
 version = "0.1.23"
@@ -2362,7 +2818,7 @@ checksum = 
"35edb675feee39aec9c99fa5ff985081995a06d594114ae14cbe797ad7b7a6d7"
 dependencies = [
  "base64 0.13.1",
  "log",
- "ring",
+ "ring 0.16.20",
  "sct 0.6.1",
  "webpki",
 ]
@@ -2374,11 +2830,23 @@ source = 
"registry+https://github.com/rust-lang/crates.io-index";
 checksum = "cd8d6c9f025a446bc4d18ad9632e69aec8f287aa84499ee335599fabd20c3fd8"
 dependencies = [
  "log",
- "ring",
+ "ring 0.16.20",
  "rustls-webpki",
  "sct 0.7.0",
 ]
 
+[[package]]
+name = "rustls-native-certs"
+version = "0.6.3"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "a9aace74cb666635c918e9c12bc0d348266037aa8eb599b5cba565709a8dff00"
+dependencies = [
+ "openssl-probe",
+ "rustls-pemfile",
+ "schannel",
+ "security-framework",
+]
+
 [[package]]
 name = "rustls-pemfile"
 version = "1.0.3"
@@ -2394,8 +2862,8 @@ version = "0.101.6"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "3c7d5dece342910d9ba34d259310cae3e0154b873b35408b787b59bce53d34fe"
 dependencies = [
- "ring",
- "untrusted",
+ "ring 0.16.20",
+ "untrusted 0.7.1",
 ]
 
 [[package]]
@@ -2431,8 +2899,8 @@ version = "0.6.1"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "b362b83898e0e69f38515b82ee15aa80636befe47c3b6d3d89a911e78fc228ce"
 dependencies = [
- "ring",
- "untrusted",
+ "ring 0.16.20",
+ "untrusted 0.7.1",
 ]
 
 [[package]]
@@ -2441,8 +2909,8 @@ version = "0.7.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "d53dcdb7c9f8158937a7981b48accfd39a43af418591a5d008c7b22b5e1b7ca4"
 dependencies = [
- "ring",
- "untrusted",
+ "ring 0.16.20",
+ "untrusted 0.7.1",
 ]
 
 [[package]]
@@ -2531,6 +2999,17 @@ dependencies = [
  "digest",
 ]
 
+[[package]]
+name = "sha2"
+version = "0.10.8"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "793db75ad2bcafc3ffa7c68b215fee268f537982cd901d132f89c6343f3a3dc8"
+dependencies = [
+ "cfg-if",
+ "cpufeatures",
+ "digest",
+]
+
 [[package]]
 name = "sharded-slab"
 version = "0.1.4"
@@ -2576,6 +3055,28 @@ dependencies = [
  "libc",
 ]
 
+[[package]]
+name = "signature"
+version = "2.2.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "77549399552de45a898a580c1b41d445bf730df867cc44e6c0233bbc4b8329de"
+dependencies = [
+ "digest",
+ "rand_core 0.6.4",
+]
+
+[[package]]
+name = "simple_asn1"
+version = "0.6.2"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "adc4e5204eb1910f40f9cfa375f6f05b68c3abac4b6fd879c8ff5e7ae8a0a085"
+dependencies = [
+ "num-bigint",
+ "num-traits",
+ "thiserror",
+ "time",
+]
+
 [[package]]
 name = "slab"
 version = "0.4.9"
@@ -2626,6 +3127,16 @@ dependencies = [
  "lock_api",
 ]
 
+[[package]]
+name = "spki"
+version = "0.7.3"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "d91ed6c858b01f942cd56b37a94b3e0a1798290327d1236e4d9cf4eaca44d29d"
+dependencies = [
+ "base64ct",
+ "der",
+]
+
 [[package]]
 name = "sse-codec"
 version = "0.3.2"
@@ -2662,6 +3173,12 @@ version = "0.10.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623"
 
+[[package]]
+name = "subtle"
+version = "2.5.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "81cdd64d312baedb58e21336b31bc043b77e01cc99033ce76ef539f78e965ebc"
+
 [[package]]
 name = "symbolic-common"
 version = "10.2.1"
@@ -2730,7 +3247,7 @@ source = 
"registry+https://github.com/rust-lang/crates.io-index";
 checksum = "cb94d2f3cc536af71caac6b6fcebf65860b347e7ce0cc9ebe8f70d3e521054ef"
 dependencies = [
  "cfg-if",
- "fastrand",
+ "fastrand 2.0.0",
  "redox_syscall 0.3.5",
  "rustix 0.38.14",
  "windows-sys 0.48.0",
@@ -2840,6 +3357,15 @@ dependencies = [
  "time-core",
 ]
 
+[[package]]
+name = "tiny-keccak"
+version = "2.0.2"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "2c9d3793400a45f954c52e73d068316d76b6f4e36977e3fcebb13a2721e80237"
+dependencies = [
+ "crunchy",
+]
+
 [[package]]
 name = "tinyvec"
 version = "1.6.0"
@@ -3278,6 +3804,7 @@ dependencies = [
  "hyper",
  "log",
  "once_cell",
+ "opendal",
  "pin-project-lite",
  "poem",
  "pprof",
@@ -3315,6 +3842,12 @@ version = "0.7.1"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a"
 
+[[package]]
+name = "untrusted"
+version = "0.9.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1"
+
 [[package]]
 name = "url"
 version = "2.4.1"
@@ -3342,6 +3875,7 @@ source = 
"registry+https://github.com/rust-lang/crates.io-index";
 checksum = "79daa5ed5740825c40b389c5e50312b9c86df53fccd33f281df655642b43869d"
 dependencies = [
  "getrandom",
+ "serde",
 ]
 
 [[package]]
@@ -3443,6 +3977,19 @@ version = "0.2.87"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "ca6ad05a4870b2bf5fe995117d3728437bd27d7cd5f06f13c17443ef369775a1"
 
+[[package]]
+name = "wasm-streams"
+version = "0.3.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "b4609d447824375f43e1ffbc051b50ad8f4b3ae8219680c94452ea05eb240ac7"
+dependencies = [
+ "futures-util",
+ "js-sys",
+ "wasm-bindgen",
+ "wasm-bindgen-futures",
+ "web-sys",
+]
+
 [[package]]
 name = "web-sys"
 version = "0.3.64"
@@ -3459,8 +4006,8 @@ version = "0.21.4"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "b8e38c0608262c46d4a56202ebabdeb094cef7e560ca7a226c6bf055188aa4ea"
 dependencies = [
- "ring",
- "untrusted",
+ "ring 0.16.20",
+ "untrusted 0.7.1",
 ]
 
 [[package]]
@@ -3506,6 +4053,15 @@ version = "0.4.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
 
+[[package]]
+name = "windows-core"
+version = "0.52.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "33ab640c8d7e35bf8ba19b884ba838ceb4fba93a4e8c65a9059d08afcfc683d9"
+dependencies = [
+ "windows-targets 0.52.0",
+]
+
 [[package]]
 name = "windows-sys"
 version = "0.45.0"
@@ -3554,6 +4110,21 @@ dependencies = [
  "windows_x86_64_msvc 0.48.5",
 ]
 
+[[package]]
+name = "windows-targets"
+version = "0.52.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "8a18201040b24831fbb9e4eb208f8892e1f50a37feb53cc7ff887feb8f50e7cd"
+dependencies = [
+ "windows_aarch64_gnullvm 0.52.0",
+ "windows_aarch64_msvc 0.52.0",
+ "windows_i686_gnu 0.52.0",
+ "windows_i686_msvc 0.52.0",
+ "windows_x86_64_gnu 0.52.0",
+ "windows_x86_64_gnullvm 0.52.0",
+ "windows_x86_64_msvc 0.52.0",
+]
+
 [[package]]
 name = "windows_aarch64_gnullvm"
 version = "0.42.2"
@@ -3566,6 +4137,12 @@ version = "0.48.5"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "2b38e32f0abccf9987a4e3079dfb67dcd799fb61361e53e2882c3cbaf0d905d8"
 
+[[package]]
+name = "windows_aarch64_gnullvm"
+version = "0.52.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "cb7764e35d4db8a7921e09562a0304bf2f93e0a51bfccee0bd0bb0b666b015ea"
+
 [[package]]
 name = "windows_aarch64_msvc"
 version = "0.42.2"
@@ -3578,6 +4155,12 @@ version = "0.48.5"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "dc35310971f3b2dbbf3f0690a219f40e2d9afcf64f9ab7cc1be722937c26b4bc"
 
+[[package]]
+name = "windows_aarch64_msvc"
+version = "0.52.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "bbaa0368d4f1d2aaefc55b6fcfee13f41544ddf36801e793edbbfd7d7df075ef"
+
 [[package]]
 name = "windows_i686_gnu"
 version = "0.42.2"
@@ -3590,6 +4173,12 @@ version = "0.48.5"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "a75915e7def60c94dcef72200b9a8e58e5091744960da64ec734a6c6e9b3743e"
 
+[[package]]
+name = "windows_i686_gnu"
+version = "0.52.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "a28637cb1fa3560a16915793afb20081aba2c92ee8af57b4d5f28e4b3e7df313"
+
 [[package]]
 name = "windows_i686_msvc"
 version = "0.42.2"
@@ -3602,6 +4191,12 @@ version = "0.48.5"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "8f55c233f70c4b27f66c523580f78f1004e8b5a8b659e05a4eb49d4166cca406"
 
+[[package]]
+name = "windows_i686_msvc"
+version = "0.52.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "ffe5e8e31046ce6230cc7215707b816e339ff4d4d67c65dffa206fd0f7aa7b9a"
+
 [[package]]
 name = "windows_x86_64_gnu"
 version = "0.42.2"
@@ -3614,6 +4209,12 @@ version = "0.48.5"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "53d40abd2583d23e4718fddf1ebec84dbff8381c07cae67ff7768bbf19c6718e"
 
+[[package]]
+name = "windows_x86_64_gnu"
+version = "0.52.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "3d6fa32db2bc4a2f5abeacf2b69f7992cd09dca97498da74a151a3132c26befd"
+
 [[package]]
 name = "windows_x86_64_gnullvm"
 version = "0.42.2"
@@ -3626,6 +4227,12 @@ version = "0.48.5"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "0b7b52767868a23d5bab768e390dc5f5c55825b6d30b86c844ff2dc7414044cc"
 
+[[package]]
+name = "windows_x86_64_gnullvm"
+version = "0.52.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "1a657e1e9d3f514745a572a6846d3c7aa7dbe1658c056ed9c3344c4109a6949e"
+
 [[package]]
 name = "windows_x86_64_msvc"
 version = "0.42.2"
@@ -3638,6 +4245,12 @@ version = "0.48.5"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538"
 
+[[package]]
+name = "windows_x86_64_msvc"
+version = "0.52.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "dff9641d1cd4be8d1a070daf9e3773c5f67e78b4d9d42263020c057706765c04"
+
 [[package]]
 name = "winnow"
 version = "0.5.15"
@@ -3662,3 +4275,9 @@ name = "xmlparser"
 version = "0.13.6"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "66fee0b777b0f5ac1c69bb06d361268faafa61cd4682ae064a171c16c433e9e4"
+
+[[package]]
+name = "zeroize"
+version = "1.7.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "525b4ec142c6b68a2d10f01f7bbf6755599ca3f81ea53b8431b7dd348f5fdb2d"
diff --git a/rust/experimental/server/Cargo.toml 
b/rust/experimental/server/Cargo.toml
index acb082986..ad9f5fc92 100644
--- a/rust/experimental/server/Cargo.toml
+++ b/rust/experimental/server/Cargo.toml
@@ -97,6 +97,7 @@ clap = "3.0.14"
 socket2 = { version="0.4", features = ["all"]}
 cap = "0.1.2"
 spin = "0.9.8"
+opendal = { version = "0.44.0", features = ["services-fs"]}
 
 [dependencies.hdfs-native]
 version = "0.5.0"
diff --git a/rust/experimental/server/README.md 
b/rust/experimental/server/README.md
index 73d48e833..9cec4df48 100644
--- a/rust/experimental/server/README.md
+++ b/rust/experimental/server/README.md
@@ -71,14 +71,15 @@ memory_spill_low_watermark = 0.7
 ``` 
 
 #### TeraSort cost times
-| type/buffer capacity                 | 250G (compressed)  |                  
       comment                          |
-|--------------------------------------|:------------------:|:--------------------------------------------------------:|
-| vanilla uniffle (grpc-based)  / 10g  |  5.3min (2.3m/3m)  |                  
        1.9G/s                          |
-| vanilla uniffle (grpc-based)  / 300g | 5.6min (3.7m/1.9m) |              GC 
occurs frequently / 2.5G/s               |
-| vanilla uniffle (netty-based) / 10g  |         /          | read failed. 
2.5G/s (write is better due to zero copy)   |
-| vanilla uniffle (netty-based) / 300g |         /          |                  
       app hang                         |
-| rust based shuffle server     / 10g  | 4.6min (2.2m/2.4m) |                  
       2.0 G/s                          |
-| rust based shuffle server     / 300g |  4min (1.5m/2.5m)  |                  
       3.5 G/s                          |
+| type/buffer capacity                 | 250G (compressed)  |                  
               comment                                  |
+|--------------------------------------|:------------------:|:------------------------------------------------------------------------:|
+| vanilla spark ess                    |  5.0min (2.2/2.8)  | ess use 400 
nodes but uniffle only one. But the rss speed is still fast! | 
+| vanilla uniffle (grpc-based)  / 10g  |  5.3min (2.3m/3m)  |                  
                1.9G/s                                  |
+| vanilla uniffle (grpc-based)  / 300g | 5.6min (3.7m/1.9m) |                  
    GC occurs frequently / 2.5G/s                       |
+| vanilla uniffle (netty-based) / 10g  |         /          |          read 
failed. 2.5G/s (write is better due to zero copy)          |
+| vanilla uniffle (netty-based) / 300g |         /          |                  
               app hang                                 |
+| rust based shuffle server     / 10g  | 4.6min (2.2m/2.4m) |                  
               2.4 G/s                                  |
+| rust based shuffle server     / 300g |  4min (1.5m/2.5m)  |                  
               3.5 G/s                                  |
 
 
 Compared with grpc based server, rust-based server has less memory footprint 
and stable performance.  
diff --git a/rust/experimental/server/src/store/hybrid.rs 
b/rust/experimental/server/src/store/hybrid.rs
index 32be53511..f9feaa091 100644
--- a/rust/experimental/server/src/store/hybrid.rs
+++ b/rust/experimental/server/src/store/hybrid.rs
@@ -230,7 +230,10 @@ impl HybridStore {
         ctx.data_blocks.sort_by_key(|block| block.task_attempt_id);
 
         // when throwing the data lost error, it should fast fail for this 
partition data.
-        let inserted = candidate_store.insert(ctx).await;
+        let inserted = candidate_store
+            .insert(ctx)
+            .instrument_await("inserting into the persistent store, invoking 
[write]")
+            .await;
         if let Err(err) = inserted {
             match err {
                 WorkerError::PARTIAL_DATA_LOST(msg) => {
@@ -335,15 +338,19 @@ impl Store for HybridStore {
         let store = self.clone();
         let concurrency_limiter =
             Arc::new(Semaphore::new(store.memory_spill_max_concurrency as 
usize));
-        self.runtime_manager.default_runtime.spawn(async move {
+        self.runtime_manager.write_runtime.spawn(async move {
             while let Ok(message) = store.memory_spill_recv.recv().await {
                 let await_root = await_tree_registry
-                    .register(format!("hot->warm flush."))
+                    .register(format!("hot->warm flush. uid: {:#?}", 
&message.ctx.uid))
                     .await;
 
                 // using acquire_owned(), refer to 
https://github.com/tokio-rs/tokio/issues/1998
-                let concurrency_guarder =
-                    concurrency_limiter.clone().acquire_owned().await.unwrap();
+                let concurrency_guarder = concurrency_limiter
+                    .clone()
+                    .acquire_owned()
+                    .instrument_await("waiting for the spill concurrent lock.")
+                    .await
+                    .unwrap();
 
                 TOTAL_MEMORY_SPILL_OPERATION.inc();
                 GAUGE_MEMORY_SPILL_OPERATION.inc();
@@ -358,6 +365,7 @@ impl Store for HybridStore {
                         }
                         match store_cloned
                             
.memory_spill_to_persistent_store(message.ctx.clone(), message.id)
+                            
.instrument_await("memory_spill_to_persistent_store.")
                             .await
                         {
                             Ok(msg) => {
@@ -768,8 +776,9 @@ mod tests {
                     let reading_view_ctx = ReadingViewContext {
                         uid: uid.clone(),
                         reading_options: 
ReadingOptions::FILE_OFFSET_AND_LEN(offset, length as i64),
-                        serialized_expected_task_ids_bitmap: 
Default::default(),
+                        serialized_expected_task_ids_bitmap: None,
                     };
+                    println!("reading. offset: {:?}. len: {:?}", offset, 
length);
                     let read_data = store.get(reading_view_ctx).await.unwrap();
                     match read_data {
                         ResponseData::Local(local_data) => {
diff --git a/rust/experimental/server/src/store/local/disk.rs 
b/rust/experimental/server/src/store/local/disk.rs
new file mode 100644
index 000000000..a0e16af74
--- /dev/null
+++ b/rust/experimental/server/src/store/local/disk.rs
@@ -0,0 +1,372 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::runtime::manager::RuntimeManager;
+use anyhow::{anyhow, Result};
+use await_tree::InstrumentAwait;
+use bytes::{Bytes, BytesMut};
+use futures::AsyncWriteExt;
+use log::{error, info, warn};
+use opendal::services::Fs;
+use opendal::Operator;
+use std::io::SeekFrom;
+use std::sync::atomic::{AtomicBool, Ordering};
+use std::sync::Arc;
+use std::time::Duration;
+use tokio::io::{AsyncReadExt, AsyncSeekExt};
+use tokio::sync::Semaphore;
+
+pub struct LocalDiskConfig {
+    pub(crate) high_watermark: f32,
+    pub(crate) low_watermark: f32,
+    pub(crate) max_concurrency: i32,
+}
+
+impl LocalDiskConfig {
+    pub fn create_mocked_config() -> Self {
+        LocalDiskConfig {
+            high_watermark: 1.0,
+            low_watermark: 0.6,
+            max_concurrency: 20,
+        }
+    }
+}
+
+impl Default for LocalDiskConfig {
+    fn default() -> Self {
+        LocalDiskConfig {
+            high_watermark: 0.8,
+            low_watermark: 0.6,
+            max_concurrency: 40,
+        }
+    }
+}
+
+pub struct LocalDisk {
+    pub(crate) root: String,
+    operator: Operator,
+    concurrency_limiter: Semaphore,
+    is_corrupted: AtomicBool,
+    is_healthy: AtomicBool,
+    config: LocalDiskConfig,
+}
+
+impl LocalDisk {
+    pub fn new(
+        root: String,
+        config: LocalDiskConfig,
+        runtime_manager: RuntimeManager,
+    ) -> Arc<Self> {
+        let mut builder = Fs::default();
+        builder.root(&root);
+        let operator: Operator = Operator::new(builder).unwrap().finish();
+
+        let instance = LocalDisk {
+            root,
+            operator,
+            concurrency_limiter: Semaphore::new(config.max_concurrency as 
usize),
+            is_corrupted: AtomicBool::new(false),
+            is_healthy: AtomicBool::new(true),
+            config,
+        };
+        let instance = Arc::new(instance);
+
+        let runtime = runtime_manager.default_runtime.clone();
+        let cloned = instance.clone();
+        runtime.spawn(async {
+            info!("Starting the disk healthy check, root: {}", &cloned.root);
+            LocalDisk::loop_check_disk(cloned).await;
+        });
+
+        instance
+    }
+
+    async fn write_read_check(local_disk: Arc<LocalDisk>) -> Result<()> {
+        let temp_path = "corruption_check.file";
+        // cleanup remaining files before checking.
+        local_disk.delete(temp_path).await?;
+
+        let written_data = Bytes::copy_from_slice(b"file corruption check");
+        local_disk.write(written_data.clone(), temp_path).await?;
+        let read_data = local_disk.read(temp_path, 0, None).await?;
+        local_disk.delete(temp_path).await?;
+
+        if written_data != read_data {
+            let msg = format!(
+                "The local disk has been corrupted. path: {}. expected: {:?}, 
actual: {:?}",
+                &local_disk.root, &written_data, &read_data
+            );
+            Err(anyhow!(msg))
+        } else {
+            Ok(())
+        }
+    }
+
+    async fn loop_check_disk(local_disk: Arc<LocalDisk>) {
+        loop {
+            tokio::time::sleep(Duration::from_secs(10)).await;
+
+            if local_disk.is_corrupted().unwrap() {
+                return;
+            }
+
+            let check_succeed: Result<()> = 
LocalDisk::write_read_check(local_disk.clone()).await;
+            if check_succeed.is_err() {
+                local_disk.mark_corrupted();
+                error!(
+                    "Errors on checking local disk corruption. err: {:#?}",
+                    check_succeed.err()
+                );
+            }
+
+            // check the capacity
+            let used_ratio = local_disk.get_disk_used_ratio();
+            if used_ratio.is_err() {
+                error!(
+                    "Errors on getting the used ratio of the disk capacity. 
err: {:?}",
+                    used_ratio.err()
+                );
+                continue;
+            }
+
+            let used_ratio = used_ratio.unwrap();
+            if local_disk.is_healthy().unwrap()
+                && used_ratio > local_disk.config.high_watermark as f64
+            {
+                warn!("Disk={} has been unhealthy.", &local_disk.root);
+                local_disk.mark_unhealthy();
+                continue;
+            }
+
+            if !local_disk.is_healthy().unwrap()
+                && used_ratio < local_disk.config.low_watermark as f64
+            {
+                warn!("Disk={} has been healthy.", &local_disk.root);
+                local_disk.mark_healthy();
+                continue;
+            }
+        }
+    }
+
+    pub async fn create_dir(&self, dir: &str) -> Result<()> {
+        self.operator.create_dir(dir).await?;
+        Ok(())
+    }
+
+    // this will ensure the data flushed into the file
+    async fn write(&self, data: Bytes, path: &str) -> Result<()> {
+        self.operator.write(path, data).await?;
+        Ok(())
+    }
+
+    pub async fn append(&self, data: Bytes, path: &str) -> Result<()> {
+        let _concurrency_guarder = self
+            .concurrency_limiter
+            .acquire()
+            .instrument_await("meet the concurrency limiter")
+            .await?;
+
+        let mut writer = self
+            .operator
+            .writer_with(path)
+            .append(true)
+            .instrument_await("creating the writer...")
+            .await?;
+        // we must use the write_all to ensure the buffer consumed by the OS.
+        // Please see the detail: 
https://doc.rust-lang.org/std/io/trait.Write.html#method.write_all
+        writer
+            .write_all(&*data)
+            .instrument_await("writing the data into buffer...")
+            .await?;
+        writer
+            .flush()
+            .instrument_await("committing the data into file...")
+            .await?;
+
+        Ok(())
+    }
+
+    pub async fn get_file_len(&self, path: &str) -> Result<i64> {
+        match self.operator.stat(path).await {
+            Ok(meta) => Ok(meta.content_length() as i64),
+            Err(_) => Ok(0),
+        }
+    }
+
+    pub async fn read(&self, path: &str, offset: i64, length: Option<i64>) -> 
Result<Bytes> {
+        if length.is_none() {
+            return Ok(Bytes::from(self.operator.read(path).await?));
+        }
+
+        let mut reader = self.operator.reader(path).await?;
+        reader.seek(SeekFrom::Start(offset as u64)).await?;
+
+        let mut buffer = vec![0; length.unwrap() as usize];
+        reader.read_exact(buffer.as_mut()).await?;
+
+        let mut bytes_buffer = BytesMut::new();
+        bytes_buffer.extend_from_slice(&*buffer);
+        Ok(bytes_buffer.freeze())
+    }
+
+    pub async fn delete(&self, path: &str) -> Result<()> {
+        self.operator.remove_all(path).await?;
+        Ok(())
+    }
+
+    fn mark_corrupted(&self) {
+        self.is_corrupted.store(true, Ordering::SeqCst);
+    }
+
+    fn mark_unhealthy(&self) {
+        self.is_healthy.store(false, Ordering::SeqCst);
+    }
+
+    fn mark_healthy(&self) {
+        self.is_healthy.store(true, Ordering::SeqCst);
+    }
+
+    pub fn is_corrupted(&self) -> Result<bool> {
+        Ok(self.is_corrupted.load(Ordering::SeqCst))
+    }
+
+    pub fn is_healthy(&self) -> Result<bool> {
+        Ok(self.is_healthy.load(Ordering::SeqCst))
+    }
+
+    fn get_disk_used_ratio(&self) -> Result<f64> {
+        // Get the total and available space in bytes
+        let available_space = fs2::available_space(&self.root)?;
+        let total_space = fs2::total_space(&self.root)?;
+        Ok(1.0 - (available_space as f64 / total_space as f64))
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use crate::runtime::manager::RuntimeManager;
+    use crate::store::local::disk::{LocalDisk, LocalDiskConfig};
+    use bytes::Bytes;
+    use std::time::Duration;
+
+    #[test]
+    fn test_local_disk_delete_operation() {
+        let temp_dir = 
tempdir::TempDir::new("test_local_disk_delete_operation-dir").unwrap();
+        let temp_path = temp_dir.path().to_str().unwrap().to_string();
+
+        println!("init the path: {}", &temp_path);
+
+        let runtime: RuntimeManager = Default::default();
+        let local_disk = LocalDisk::new(
+            temp_path.clone(),
+            LocalDiskConfig::default(),
+            runtime.clone(),
+        );
+
+        let data = b"hello!";
+        runtime.wait(local_disk.create_dir("a/")).unwrap();
+        runtime
+            .wait(local_disk.append(Bytes::copy_from_slice(data), "a/b"))
+            .unwrap();
+
+        assert_eq!(
+            true,
+            runtime
+                .wait(tokio::fs::try_exists(format!(
+                    "{}/{}",
+                    &temp_path,
+                    "a/b".to_string()
+                )))
+                .unwrap()
+        );
+
+        runtime
+            .wait(local_disk.delete("a/"))
+            .expect("TODO: panic message");
+        assert_eq!(
+            false,
+            runtime
+                .wait(tokio::fs::try_exists(format!(
+                    "{}/{}",
+                    &temp_path,
+                    "a/b".to_string()
+                )))
+                .unwrap()
+        );
+    }
+
+    #[test]
+    fn local_disk_corruption_healthy_check() {
+        let temp_dir = tempdir::TempDir::new("test_directory").unwrap();
+        let temp_path = temp_dir.path().to_str().unwrap().to_string();
+
+        let local_disk = LocalDisk::new(
+            temp_path.clone(),
+            LocalDiskConfig::create_mocked_config(),
+            Default::default(),
+        );
+
+        awaitility::at_most(Duration::from_secs(10)).until(|| 
local_disk.is_healthy().unwrap());
+        assert_eq!(false, local_disk.is_corrupted().unwrap());
+    }
+
+    #[test]
+    fn local_disk_test() {
+        let temp_dir = tempdir::TempDir::new("test_directory").unwrap();
+        let temp_path = temp_dir.path().to_str().unwrap().to_string();
+
+        let runtime: RuntimeManager = Default::default();
+        let local_disk = LocalDisk::new(
+            temp_path.clone(),
+            LocalDiskConfig::default(),
+            runtime.clone(),
+        );
+
+        let data = b"Hello, World!";
+
+        let relative_path = "app-id/test_file.txt";
+
+        runtime.wait(local_disk.create_dir("app-id/")).unwrap();
+
+        for _ in 0..2 {
+            let write_result =
+                runtime.wait(local_disk.append(Bytes::copy_from_slice(data), 
relative_path));
+            assert!(write_result.is_ok());
+        }
+
+        let read_result = runtime.wait(local_disk.read(relative_path, 0, 
Some(data.len() as i64)));
+        assert!(read_result.is_ok());
+        let read_data = read_result.unwrap();
+        let expected = b"Hello, World!";
+        assert_eq!(read_data.as_ref(), expected);
+
+        // read the middle word
+        let read_result = runtime.wait(local_disk.read(
+            relative_path,
+            data.len() as i64,
+            Some(data.len() as i64),
+        ));
+        assert_eq!(read_result.unwrap().as_ref(), expected);
+
+        // read all words
+        let read_result = runtime.wait(local_disk.read(relative_path, 0, 
None));
+        let expected = b"Hello, World!Hello, World!";
+        assert_eq!(read_result.unwrap().as_ref(), expected);
+
+        temp_dir.close().unwrap();
+    }
+}
diff --git a/rust/experimental/server/src/store/local/mod.rs 
b/rust/experimental/server/src/store/local/mod.rs
new file mode 100644
index 000000000..63ef41a5b
--- /dev/null
+++ b/rust/experimental/server/src/store/local/mod.rs
@@ -0,0 +1,18 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+pub mod disk;
diff --git a/rust/experimental/server/src/store/localfile.rs 
b/rust/experimental/server/src/store/localfile.rs
index e527d8cdb..cc2a881f6 100644
--- a/rust/experimental/server/src/store/localfile.rs
+++ b/rust/experimental/server/src/store/localfile.rs
@@ -28,38 +28,42 @@ use crate::store::{
     LocalDataIndex, PartitionedLocalData, Persistent, RequireBufferResponse, 
ResponseData,
     ResponseDataIndex, Store,
 };
+use std::ops::Deref;
+use std::path::Path;
 
 use anyhow::Result;
 use async_trait::async_trait;
 use await_tree::InstrumentAwait;
-use bytes::{BufMut, Bytes, BytesMut};
+use bytes::{BufMut, BytesMut};
 use dashmap::DashMap;
 
-use log::{debug, error, info, warn};
+use log::{debug, error, warn};
 
 use crate::runtime::manager::RuntimeManager;
-use std::io::SeekFrom;
-use std::path::Path;
-use std::sync::atomic::{AtomicBool, Ordering};
+use std::sync::atomic::{AtomicI64, Ordering};
 use std::sync::Arc;
-use std::time::Duration;
-use tokio::fs::OpenOptions;
-use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
-use tokio::sync::{RwLock, Semaphore};
-
-fn create_directory_if_not_exists(dir_path: &str) {
-    if !std::fs::metadata(dir_path).is_ok() {
-        std::fs::create_dir_all(dir_path).expect("Errors on creating dirs.");
+
+use crate::store::local::disk::{LocalDisk, LocalDiskConfig};
+
+struct LockedObj {
+    disk: Arc<LocalDisk>,
+    pointer: AtomicI64,
+}
+
+impl From<Arc<LocalDisk>> for LockedObj {
+    fn from(value: Arc<LocalDisk>) -> Self {
+        Self {
+            disk: value.clone(),
+            pointer: Default::default(),
+        }
     }
 }
 
 pub struct LocalFileStore {
     local_disks: Vec<Arc<LocalDisk>>,
-    partition_written_disk_map: DashMap<String, DashMap<i32, DashMap<i32, 
Arc<LocalDisk>>>>,
-    partition_file_locks: DashMap<String, Arc<RwLock<()>>>,
     healthy_check_min_disks: i32,
-
     runtime_manager: RuntimeManager,
+    partition_locks: DashMap<String, Arc<LockedObj>>,
 }
 
 impl Persistent for LocalFileStore {}
@@ -81,10 +85,9 @@ impl LocalFileStore {
         }
         LocalFileStore {
             local_disks: local_disk_instances,
-            partition_written_disk_map: DashMap::new(),
-            partition_file_locks: DashMap::new(),
             healthy_check_min_disks: 1,
             runtime_manager,
+            partition_locks: Default::default(),
         }
     }
 
@@ -101,10 +104,9 @@ impl LocalFileStore {
         }
         LocalFileStore {
             local_disks: local_disk_instances,
-            partition_written_disk_map: DashMap::new(),
-            partition_file_locks: DashMap::new(),
             healthy_check_min_disks: 
localfile_config.healthy_check_min_disks.unwrap_or(1),
             runtime_manager,
+            partition_locks: Default::default(),
         }
     }
 
@@ -129,69 +131,6 @@ impl LocalFileStore {
         )
     }
 
-    fn get_app_all_partitions(&self, app_id: &str) -> Vec<(i32, i32)> {
-        let stage_entry = self.partition_written_disk_map.get(app_id);
-        if stage_entry.is_none() {
-            return vec![];
-        }
-
-        let stages = stage_entry.unwrap();
-        let mut partition_ids = vec![];
-        for stage_item in stages.iter() {
-            let (shuffle_id, partitions) = stage_item.pair();
-            for partition_item in partitions.iter() {
-                let (partition_id, _) = partition_item.pair();
-                partition_ids.push((*shuffle_id, *partition_id));
-            }
-        }
-
-        partition_ids
-    }
-
-    fn delete_app(&self, app_id: &str) -> Result<()> {
-        self.partition_written_disk_map.remove(app_id);
-        Ok(())
-    }
-
-    fn get_owned_disk(&self, uid: PartitionedUId) -> Option<Arc<LocalDisk>> {
-        let app_id = uid.app_id;
-        let shuffle_id = uid.shuffle_id;
-        let partition_id = uid.partition_id;
-
-        let shuffle_entry = self
-            .partition_written_disk_map
-            .entry(app_id)
-            .or_insert_with(|| DashMap::new());
-        let partition_entry = shuffle_entry
-            .entry(shuffle_id)
-            .or_insert_with(|| DashMap::new());
-
-        partition_entry
-            .get(&partition_id)
-            .map(|v| v.value().clone())
-    }
-
-    async fn get_or_create_owned_disk(&self, uid: PartitionedUId) -> 
Result<Arc<LocalDisk>> {
-        let uid_ref = &uid.clone();
-        let app_id = uid.app_id;
-        let shuffle_id = uid.shuffle_id;
-        let partition_id = uid.partition_id;
-
-        let shuffle_entry = self
-            .partition_written_disk_map
-            .entry(app_id)
-            .or_insert_with(|| DashMap::new());
-        let partition_entry = shuffle_entry
-            .entry(shuffle_id)
-            .or_insert_with(|| DashMap::new());
-        let local_disk = partition_entry
-            .entry(partition_id)
-            .or_insert(self.select_disk(uid_ref).await?)
-            .clone();
-
-        Ok(local_disk)
-    }
-
     fn healthy_check(&self) -> Result<bool> {
         let mut available = 0;
         for local_disk in &self.local_disks {
@@ -207,7 +146,7 @@ impl LocalFileStore {
         Ok(available > self.healthy_check_min_disks)
     }
 
-    async fn select_disk(&self, uid: &PartitionedUId) -> 
Result<Arc<LocalDisk>, WorkerError> {
+    fn select_disk(&self, uid: &PartitionedUId) -> Result<Arc<LocalDisk>, 
WorkerError> {
         let hash_value = PartitionedUId::get_hash(uid);
 
         let mut candidates = vec![];
@@ -244,36 +183,33 @@ impl Store for LocalFileStore {
         }
 
         let uid = ctx.uid;
-        let _pid = uid.partition_id;
         let (data_file_path, index_file_path) =
             LocalFileStore::gen_relative_path_for_partition(&uid);
-        let local_disk = self.get_or_create_owned_disk(uid.clone()).await?;
-
-        if local_disk.is_corrupted()? {
-            return Err(WorkerError::PARTIAL_DATA_LOST(
-                local_disk.base_path.to_string(),
-            ));
-        }
 
-        let lock_cloned = self
-            .partition_file_locks
+        let mut parent_dir_is_created = false;
+        let locked_obj = self
+            .partition_locks
             .entry(data_file_path.clone())
-            .or_insert_with(|| Arc::new(RwLock::new(())))
+            .or_insert_with(|| {
+                parent_dir_is_created = true;
+                Arc::new(LockedObj::from(self.select_disk(&uid).unwrap()))
+            })
             .clone();
-        let _lock_guard = lock_cloned
-            .write()
-            .instrument_await(format!(
-                "localfile partition file lock. path: {}",
-                &data_file_path
-            ))
-            .await;
 
-        // write index file and data file
-        // todo: split multiple pieces
-        let mut next_offset = local_disk
-            .get_file_len(data_file_path.clone())
-            .instrument_await(format!("getting the file len. path: {}", 
&data_file_path))
-            .await?;
+        let local_disk = &locked_obj.disk;
+        let mut next_offset = locked_obj.pointer.load(Ordering::SeqCst);
+
+        if local_disk.is_corrupted()? {
+            return 
Err(WorkerError::PARTIAL_DATA_LOST(local_disk.root.to_string()));
+        }
+
+        if !parent_dir_is_created {
+            if let Some(path) = Path::new(&data_file_path).parent() {
+                local_disk
+                    .create_dir(format!("{:?}/", 
path.to_str().unwrap()).as_str())
+                    .await?;
+            }
+        }
 
         let mut index_bytes_holder = BytesMut::new();
         let mut data_bytes_holder = BytesMut::new();
@@ -296,25 +232,30 @@ impl Store for LocalFileStore {
             index_bytes_holder.put_i64(task_attempt_id);
 
             let data = block.data;
-            // if get_crc(&data) != crc {
-            //     error!("The crc value is not the same. partition id: {}, 
block id: {}", pid, block_id);
-            // }
 
             data_bytes_holder.extend_from_slice(&data);
             next_offset += length as i64;
         }
 
         local_disk
-            .write(data_bytes_holder.freeze(), data_file_path.clone())
-            .instrument_await(format!("localfile writing data. path: {}", 
data_file_path))
+            .append(data_bytes_holder.freeze(), &data_file_path)
+            .instrument_await(format!("localfile writing data. path: {}", 
&data_file_path))
             .await?;
         local_disk
-            .write(index_bytes_holder.freeze(), index_file_path.clone())
-            .instrument_await(format!("localfile writing index. path: {}", 
data_file_path))
+            .append(index_bytes_holder.freeze(), &index_file_path)
+            .instrument_await(format!(
+                "localfile writing index. path: {}",
+                &index_file_path
+            ))
             .await?;
 
         TOTAL_LOCALFILE_USED.inc_by(total_size as u64);
 
+        locked_obj
+            .deref()
+            .pointer
+            .store(next_offset, Ordering::SeqCst);
+
         Ok(())
     }
 
@@ -333,21 +274,10 @@ impl Store for LocalFileStore {
         }
 
         let (data_file_path, _) = 
LocalFileStore::gen_relative_path_for_partition(&uid);
-        let lock_cloned = self
-            .partition_file_locks
-            .entry(data_file_path.clone())
-            .or_insert_with(|| Arc::new(RwLock::new(())))
-            .clone();
-        let _lock_guard = lock_cloned
-            .read()
-            .instrument_await("getting file read lock")
-            .await;
-
-        let local_disk: Option<Arc<LocalDisk>> = 
self.get_owned_disk(uid.clone());
 
-        if local_disk.is_none() {
+        if !self.partition_locks.contains_key(&data_file_path) {
             warn!(
-                "This should not happen of local disk not found for [{:?}]",
+                "There is no cached data in localfile store for [{:?}]",
                 &uid
             );
             return Ok(ResponseData::Local(PartitionedLocalData {
@@ -355,17 +285,26 @@ impl Store for LocalFileStore {
             }));
         }
 
-        let local_disk = local_disk.unwrap();
+        let locked_object = self
+            .partition_locks
+            .entry(data_file_path.clone())
+            .or_insert_with(|| 
Arc::new(LockedObj::from(self.select_disk(&uid).unwrap())))
+            .clone();
+
+        let local_disk = &locked_object.disk;
 
         if local_disk.is_corrupted()? {
             return Err(WorkerError::LOCAL_DISK_OWNED_BY_PARTITION_CORRUPTED(
-                local_disk.base_path.to_string(),
+                local_disk.root.to_string(),
             ));
         }
 
         let data = local_disk
-            .read(data_file_path, offset, Some(len))
-            .instrument_await("getting data from localfile")
+            .read(&data_file_path, offset, Some(len))
+            .instrument_await(format!(
+                "getting data from localfile: {:?}",
+                &data_file_path
+            ))
             .await?;
         Ok(ResponseData::Local(PartitionedLocalData { data }))
     }
@@ -378,21 +317,9 @@ impl Store for LocalFileStore {
         let (data_file_path, index_file_path) =
             LocalFileStore::gen_relative_path_for_partition(&uid);
 
-        let lock_cloned = self
-            .partition_file_locks
-            .entry(data_file_path.clone())
-            .or_insert_with(|| Arc::new(RwLock::new(())))
-            .clone();
-        let _lock_guard = lock_cloned
-            .read()
-            .instrument_await("waiting file lock to read index data")
-            .await;
-
-        let local_disk: Option<Arc<LocalDisk>> = 
self.get_owned_disk(uid.clone());
-
-        if local_disk.is_none() {
+        if !self.partition_locks.contains_key(&data_file_path) {
             warn!(
-                "This should not happen of local disk not found for [{:?}]",
+                "There is no cached data in localfile store for [{:?}]",
                 &uid
             );
             return Ok(Local(LocalDataIndex {
@@ -401,21 +328,29 @@ impl Store for LocalFileStore {
             }));
         }
 
-        let local_disk = local_disk.unwrap();
+        let locked_object = self
+            .partition_locks
+            .entry(data_file_path.clone())
+            .or_insert_with(|| 
Arc::new(LockedObj::from(self.select_disk(&uid).unwrap())))
+            .clone();
 
+        let local_disk = &locked_object.disk;
         if local_disk.is_corrupted()? {
             return Err(WorkerError::LOCAL_DISK_OWNED_BY_PARTITION_CORRUPTED(
-                local_disk.base_path.to_string(),
+                local_disk.root.to_string(),
             ));
         }
 
         let index_data_result = local_disk
-            .read(index_file_path, 0, None)
-            .instrument_await("reading index data from file")
+            .read(&index_file_path, 0, None)
+            .instrument_await(format!(
+                "reading index data from file: {:?}",
+                &index_file_path
+            ))
             .await?;
         let len = local_disk
-            .get_file_len(data_file_path)
-            .instrument_await("getting file len from file")
+            .get_file_len(&data_file_path)
+            .instrument_await(format!("getting file len from file: {:?}", 
&data_file_path))
             .await?;
         Ok(Local(LocalDataIndex {
             index_data: index_data_result,
@@ -423,17 +358,6 @@ impl Store for LocalFileStore {
         }))
     }
 
-    async fn require_buffer(
-        &self,
-        _ctx: RequireBufferContext,
-    ) -> Result<RequireBufferResponse, WorkerError> {
-        todo!()
-    }
-
-    async fn release_buffer(&self, _ctx: ReleaseBufferContext) -> Result<i64, 
WorkerError> {
-        todo!()
-    }
-
     async fn purge(&self, ctx: PurgeDataContext) -> Result<()> {
         let app_id = ctx.app_id;
         let shuffle_id_option = ctx.shuffle_id;
@@ -445,28 +369,18 @@ impl Store for LocalFileStore {
 
         for local_disk_ref in &self.local_disks {
             let disk = local_disk_ref.clone();
-            disk.delete(data_relative_dir_path.to_string()).await?;
+            disk.delete(&data_relative_dir_path).await?;
         }
 
-        if shuffle_id_option.is_none() {
-            let all_partition_ids = self.get_app_all_partitions(&app_id);
-            if all_partition_ids.is_empty() {
-                return Ok(());
-            }
-
-            for (shuffle_id, partition_id) in all_partition_ids.into_iter() {
-                // delete lock
-                let uid = PartitionedUId {
-                    app_id: app_id.clone(),
-                    shuffle_id,
-                    partition_id,
-                };
-                let (data_file_path, _) = 
LocalFileStore::gen_relative_path_for_partition(&uid);
-                self.partition_file_locks.remove(&data_file_path);
-            }
+        let keys_to_delete: Vec<_> = self
+            .partition_locks
+            .iter()
+            .filter(|entry| entry.key().starts_with(&data_relative_dir_path))
+            .map(|entry| entry.key().to_string())
+            .collect();
 
-            // delete disk mapping
-            self.delete_app(&app_id)?;
+        for key in keys_to_delete {
+            self.partition_locks.remove(&key);
         }
 
         Ok(())
@@ -475,280 +389,16 @@ impl Store for LocalFileStore {
     async fn is_healthy(&self) -> Result<bool> {
         self.healthy_check()
     }
-}
-
-struct LocalDiskConfig {
-    high_watermark: f32,
-    low_watermark: f32,
-    max_concurrency: i32,
-}
-
-impl LocalDiskConfig {
-    fn create_mocked_config() -> Self {
-        LocalDiskConfig {
-            high_watermark: 1.0,
-            low_watermark: 0.6,
-            max_concurrency: 20,
-        }
-    }
-}
-
-impl Default for LocalDiskConfig {
-    fn default() -> Self {
-        LocalDiskConfig {
-            high_watermark: 0.8,
-            low_watermark: 0.6,
-            max_concurrency: 40,
-        }
-    }
-}
-
-struct LocalDisk {
-    base_path: String,
-    concurrency_limiter: Semaphore,
-    is_corrupted: AtomicBool,
-    is_healthy: AtomicBool,
-    config: LocalDiskConfig,
-}
-
-impl LocalDisk {
-    fn new(path: String, config: LocalDiskConfig, runtime_manager: 
RuntimeManager) -> Arc<Self> {
-        create_directory_if_not_exists(&path);
-        let instance = LocalDisk {
-            base_path: path,
-            concurrency_limiter: Semaphore::new(config.max_concurrency as 
usize),
-            is_corrupted: AtomicBool::new(false),
-            is_healthy: AtomicBool::new(true),
-            config,
-        };
-        let instance = Arc::new(instance);
-
-        let runtime = runtime_manager.default_runtime.clone();
-        let cloned = instance.clone();
-        runtime.spawn(async {
-            info!(
-                "Starting the disk healthy checking, base path: {}",
-                &cloned.base_path
-            );
-            LocalDisk::loop_check_disk(cloned).await;
-        });
-
-        instance
-    }
-
-    async fn write_read_check(local_disk: Arc<LocalDisk>) -> Result<()> {
-        let temp_path = format!("{}/{}", &local_disk.base_path, 
"corruption_check.file");
-        let data = Bytes::copy_from_slice(b"file corruption check");
-        {
-            let mut file = OpenOptions::new()
-                .write(true)
-                .create(true)
-                .open(&temp_path)
-                .await?;
-            file.write_all(&data).await?;
-            file.flush().await?;
-        }
-
-        let mut read_data = Vec::new();
-        {
-            let mut file = tokio::fs::File::open(&temp_path).await?;
-            file.read_to_end(&mut read_data).await?;
-
-            tokio::fs::remove_file(&temp_path).await?;
-        }
-
-        if data != Bytes::copy_from_slice(&read_data) {
-            local_disk.mark_corrupted();
-            error!(
-                "The local disk has been corrupted. path: {}",
-                &local_disk.base_path
-            );
-        }
-
-        Ok(())
-    }
-
-    async fn loop_check_disk(local_disk: Arc<LocalDisk>) {
-        loop {
-            tokio::time::sleep(Duration::from_secs(10)).await;
-
-            if local_disk.is_corrupted().unwrap() {
-                return;
-            }
-
-            let check_succeed: Result<()> = 
LocalDisk::write_read_check(local_disk.clone()).await;
-            if check_succeed.is_err() {
-                local_disk.mark_corrupted();
-                error!(
-                    "Errors on checking local disk corruption. err: {:#?}",
-                    check_succeed.err()
-                );
-            }
-
-            // check the capacity
-            let used_ratio = local_disk.get_disk_used_ratio();
-            if used_ratio.is_err() {
-                error!(
-                    "Errors on getting the used ratio of the disk capacity. 
err: {:?}",
-                    used_ratio.err()
-                );
-                continue;
-            }
-
-            let used_ratio = used_ratio.unwrap();
-            if local_disk.is_healthy().unwrap()
-                && used_ratio > local_disk.config.high_watermark as f64
-            {
-                warn!("Disk={} has been unhealthy.", &local_disk.base_path);
-                local_disk.mark_unhealthy();
-                continue;
-            }
-
-            if !local_disk.is_healthy().unwrap()
-                && used_ratio < local_disk.config.low_watermark as f64
-            {
-                warn!("Disk={} has been healthy.", &local_disk.base_path);
-                local_disk.mark_healthy();
-                continue;
-            }
-        }
-    }
-
-    fn append_path(&self, path: String) -> String {
-        format!("{}/{}", self.base_path.clone(), path)
-    }
-
-    async fn write(&self, data: Bytes, relative_file_path: String) -> 
Result<()> {
-        let _concurrency_guarder = self
-            .concurrency_limiter
-            .acquire()
-            .instrument_await("meet the concurrency limiter")
-            .await?;
-        let absolute_path = self.append_path(relative_file_path.clone());
-        let path = Path::new(&absolute_path);
-
-        match path.parent() {
-            Some(parent) => {
-                if !parent.exists() {
-                    create_directory_if_not_exists(parent.to_str().unwrap())
-                }
-            }
-            _ => todo!(),
-        }
-
-        debug!("data file: {}", &absolute_path);
-
-        let mut output_file = OpenOptions::new()
-            .append(true)
-            .create(true)
-            .open(absolute_path)
-            .await?;
-        output_file.write_all(data.as_ref()).await?;
-        output_file.flush().await?;
-
-        Ok(())
-    }
 
-    async fn get_file_len(&self, relative_file_path: String) -> Result<i64> {
-        let file_path = self.append_path(relative_file_path);
-
-        Ok(
-            match tokio::fs::metadata(file_path)
-                .instrument_await("getting metadata of path")
-                .await
-            {
-                Ok(metadata) => metadata.len() as i64,
-                _ => 0i64,
-            },
-        )
-    }
-
-    async fn read(
+    async fn require_buffer(
         &self,
-        relative_file_path: String,
-        offset: i64,
-        length: Option<i64>,
-    ) -> Result<Bytes> {
-        let file_path = self.append_path(relative_file_path);
-
-        let file = tokio::fs::File::open(&file_path)
-            .instrument_await(format!("opening file. path: {}", &file_path))
-            .await?;
-
-        let read_len = match length {
-            Some(len) => len,
-            _ => file
-                .metadata()
-                .instrument_await(format!("getting file metadata. path: {}", 
&file_path))
-                .await?
-                .len()
-                .try_into()
-                .unwrap(),
-        } as usize;
-
-        let mut reader = tokio::io::BufReader::new(file);
-        let mut buffer = vec![0; read_len];
-        reader
-            .seek(SeekFrom::Start(offset as u64))
-            .instrument_await(format!(
-                "seeking file [{}:{}] of path: {}",
-                offset, read_len, &file_path
-            ))
-            .await?;
-        reader
-            .read_exact(buffer.as_mut())
-            .instrument_await(format!(
-                "reading data of len: {} from path: {}",
-                read_len, &file_path
-            ))
-            .await?;
-
-        let mut bytes_buffer = BytesMut::new();
-        bytes_buffer.extend_from_slice(&*buffer);
-        Ok(bytes_buffer.freeze())
-    }
-
-    async fn delete(&self, relative_file_path: String) -> Result<()> {
-        let delete_path = self.append_path(relative_file_path);
-        if !tokio::fs::try_exists(&delete_path).await? {
-            info!("The path:{} does not exist, ignore purging.", &delete_path);
-            return Ok(());
-        }
-
-        let metadata = tokio::fs::metadata(&delete_path).await?;
-        if metadata.is_dir() {
-            tokio::fs::remove_dir_all(delete_path).await?;
-        } else {
-            tokio::fs::remove_file(delete_path).await?;
-        }
-        Ok(())
-    }
-
-    fn mark_corrupted(&self) {
-        self.is_corrupted.store(true, Ordering::SeqCst);
-    }
-
-    fn mark_unhealthy(&self) {
-        self.is_healthy.store(false, Ordering::SeqCst);
-    }
-
-    fn mark_healthy(&self) {
-        self.is_healthy.store(true, Ordering::SeqCst);
-    }
-
-    fn is_corrupted(&self) -> Result<bool> {
-        Ok(self.is_corrupted.load(Ordering::SeqCst))
-    }
-
-    fn is_healthy(&self) -> Result<bool> {
-        Ok(self.is_healthy.load(Ordering::SeqCst))
+        _ctx: RequireBufferContext,
+    ) -> Result<RequireBufferResponse, WorkerError> {
+        todo!()
     }
 
-    fn get_disk_used_ratio(&self) -> Result<f64> {
-        // Get the total and available space in bytes
-        let available_space = fs2::available_space(&self.base_path)?;
-        let total_space = fs2::total_space(&self.base_path)?;
-        Ok(1.0 - (available_space as f64 / total_space as f64))
+    async fn release_buffer(&self, _ctx: ReleaseBufferContext) -> Result<i64, 
WorkerError> {
+        todo!()
     }
 }
 
@@ -758,17 +408,12 @@ mod test {
         PartitionedUId, PurgeDataContext, ReadingIndexViewContext, 
ReadingOptions,
         ReadingViewContext, WritingViewContext,
     };
-    use crate::store::localfile::{LocalDisk, LocalDiskConfig, LocalFileStore};
+    use crate::store::localfile::LocalFileStore;
 
     use crate::store::{PartitionedDataBlock, ResponseData, ResponseDataIndex, 
Store};
     use bytes::{Buf, Bytes, BytesMut};
     use log::info;
 
-    use crate::runtime::manager::RuntimeManager;
-    use std::io::Read;
-    use std::thread;
-    use std::time::Duration;
-
     #[test]
     fn purge_test() -> anyhow::Result<()> {
         let temp_dir = tempdir::TempDir::new("test_local_store").unwrap();
@@ -840,10 +485,6 @@ mod test {
             false,
             runtime.wait(tokio::fs::try_exists(format!("{}/{}", &temp_path, 
&app_id)))?
         );
-        assert!(!local_store
-            .partition_file_locks
-            .contains_key(&format!("{}/{}/{}/{}.data", &temp_path, &app_id, 0, 
0)));
-        assert!(!local_store.partition_written_disk_map.contains_key(&app_id));
 
         Ok(())
     }
@@ -968,109 +609,4 @@ mod test {
 
         temp_dir.close().unwrap();
     }
-
-    #[test]
-    fn test_local_disk_delete_operation() {
-        let temp_dir = 
tempdir::TempDir::new("test_local_disk_delete_operation-dir").unwrap();
-        let temp_path = temp_dir.path().to_str().unwrap().to_string();
-
-        println!("init the path: {}", &temp_path);
-
-        let runtime: RuntimeManager = Default::default();
-        let local_disk = LocalDisk::new(
-            temp_path.clone(),
-            LocalDiskConfig::default(),
-            runtime.clone(),
-        );
-
-        let data = b"hello!";
-        runtime
-            .wait(local_disk.write(Bytes::copy_from_slice(data), 
"a/b".to_string()))
-            .unwrap();
-
-        assert_eq!(
-            true,
-            runtime
-                .wait(tokio::fs::try_exists(format!(
-                    "{}/{}",
-                    &temp_path,
-                    "a/b".to_string()
-                )))
-                .unwrap()
-        );
-
-        runtime
-            .wait(local_disk.delete("a/".to_string()))
-            .expect("TODO: panic message");
-        assert_eq!(
-            false,
-            runtime
-                .wait(tokio::fs::try_exists(format!(
-                    "{}/{}",
-                    &temp_path,
-                    "a/b".to_string()
-                )))
-                .unwrap()
-        );
-    }
-
-    #[test]
-    fn local_disk_corruption_healthy_check() {
-        let temp_dir = tempdir::TempDir::new("test_directory").unwrap();
-        let temp_path = temp_dir.path().to_str().unwrap().to_string();
-
-        let local_disk = LocalDisk::new(
-            temp_path.clone(),
-            LocalDiskConfig::create_mocked_config(),
-            Default::default(),
-        );
-
-        thread::sleep(Duration::from_secs(12));
-        assert_eq!(true, local_disk.is_healthy().unwrap());
-        assert_eq!(false, local_disk.is_corrupted().unwrap());
-    }
-
-    #[test]
-    fn local_disk_test() {
-        let temp_dir = tempdir::TempDir::new("test_directory").unwrap();
-        let temp_path = temp_dir.path().to_str().unwrap().to_string();
-
-        let runtime: RuntimeManager = Default::default();
-        let local_disk = LocalDisk::new(
-            temp_path.clone(),
-            LocalDiskConfig::default(),
-            runtime.clone(),
-        );
-
-        let data = b"Hello, World!";
-
-        let relative_path = "app-id/test_file.txt";
-        let write_result =
-            runtime.wait(local_disk.write(Bytes::copy_from_slice(data), 
relative_path.to_string()));
-        assert!(write_result.is_ok());
-
-        // test whether the content is written
-        let file_path = format!("{}/{}", local_disk.base_path, relative_path);
-        let mut file = std::fs::File::open(file_path).unwrap();
-        let mut file_content = Vec::new();
-        file.read_to_end(&mut file_content).unwrap();
-        assert_eq!(file_content, data);
-
-        // if the file has been created, append some content
-        let write_result =
-            runtime.wait(local_disk.write(Bytes::copy_from_slice(data), 
relative_path.to_string()));
-        assert!(write_result.is_ok());
-
-        let read_result = runtime.wait(local_disk.read(
-            relative_path.to_string(),
-            0,
-            Some(data.len() as i64 * 2),
-        ));
-        assert!(read_result.is_ok());
-        let read_data = read_result.unwrap();
-        let expected = b"Hello, World!Hello, World!";
-        assert_eq!(read_data.as_ref(), expected);
-
-        temp_dir.close().unwrap();
-    }
 }
diff --git a/rust/experimental/server/src/store/mod.rs 
b/rust/experimental/server/src/store/mod.rs
index f29c7bf56..caf00a068 100644
--- a/rust/experimental/server/src/store/mod.rs
+++ b/rust/experimental/server/src/store/mod.rs
@@ -18,6 +18,7 @@
 #[cfg(feature = "hdfs")]
 pub mod hdfs;
 pub mod hybrid;
+pub mod local;
 pub mod localfile;
 pub mod mem;
 pub mod memory;

Reply via email to