This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 5d027c0cc [#1407] feat(rust): refactor localfile store to speed up
writing (#1422)
5d027c0cc is described below
commit 5d027c0ccc4f1c24225e5639f0a44142d4b7b1ae
Author: Junfan Zhang <[email protected]>
AuthorDate: Mon Jan 8 11:41:48 2024 +0800
[#1407] feat(rust): refactor localfile store to speed up writing (#1422)
### What changes were proposed in this pull request?
1. using `opendal` crate to simplify read/write
2. avoid using dashmap.get to fix potential deadlock
### Why are the changes needed?
For: #1407
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Unit tests
### Commit logs
1. using opendal crate to simplify read/write
2. avoid using dashmap.get to fix potential deadlock
---
.gitignore | 1 -
rust/experimental/server/Cargo.lock | 645 +++++++++++++++++++++-
rust/experimental/server/Cargo.toml | 1 +
rust/experimental/server/README.md | 17 +-
rust/experimental/server/src/store/hybrid.rs | 21 +-
rust/experimental/server/src/store/local/disk.rs | 372 +++++++++++++
rust/experimental/server/src/store/local/mod.rs | 18 +
rust/experimental/server/src/store/localfile.rs | 670 ++++-------------------
rust/experimental/server/src/store/mod.rs | 1 +
9 files changed, 1151 insertions(+), 595 deletions(-)
diff --git a/.gitignore b/.gitignore
index df4bb57a5..6f7717539 100644
--- a/.gitignore
+++ b/.gitignore
@@ -29,7 +29,6 @@ deploy/kubernetes/docker/hadoopconfig/*
*.dll
*.so
*.dylib
-local
vendor
VERSION
testbin/*
diff --git a/rust/experimental/server/Cargo.lock
b/rust/experimental/server/Cargo.lock
index 94a281877..0d3e7ac2f 100644
--- a/rust/experimental/server/Cargo.lock
+++ b/rust/experimental/server/Cargo.lock
@@ -38,6 +38,21 @@ dependencies = [
"memchr",
]
+[[package]]
+name = "android-tzdata"
+version = "0.1.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e999941b234f3131b00bc13c22d06e8c5ff726d1b6318ac7eb276997bbb4fef0"
+
+[[package]]
+name = "android_system_properties"
+version = "0.1.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "819e7219dbd41043ac279b19830f2efc897156490d7fd6ea916720117ee66311"
+dependencies = [
+ "libc",
+]
+
[[package]]
name = "anyhow"
version = "1.0.75"
@@ -61,6 +76,19 @@ dependencies = [
"futures-core",
]
+[[package]]
+name = "async-compat"
+version = "0.2.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f68a707c1feb095d8c07f8a65b9f506b117d30af431cab89374357de7c11461b"
+dependencies = [
+ "futures-core",
+ "futures-io",
+ "once_cell",
+ "pin-project-lite",
+ "tokio",
+]
+
[[package]]
name = "async-trait"
version = "0.1.73"
@@ -157,6 +185,18 @@ dependencies = [
"tower-service",
]
+[[package]]
+name = "backon"
+version = "0.4.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "0c1a6197b2120bb2185a267f6515038558b019e92b832bb0320e96d66268dcf9"
+dependencies = [
+ "fastrand 1.9.0",
+ "futures-core",
+ "pin-project 1.1.3",
+ "tokio",
+]
+
[[package]]
name = "backtrace"
version = "0.3.69"
@@ -184,6 +224,12 @@ version = "0.21.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9ba43ea6f343b788c8764558649e08df62f86c6ef251fdaeb1ffd010a9ae50a2"
+[[package]]
+name = "base64ct"
+version = "1.6.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b"
+
[[package]]
name = "bindgen"
version = "0.64.0"
@@ -293,6 +339,20 @@ version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
+[[package]]
+name = "chrono"
+version = "0.4.31"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "7f2c685bad3eb3d45a01354cedb7d5faa66194d1d58ba6e267a8de788f79db38"
+dependencies = [
+ "android-tzdata",
+ "iana-time-zone",
+ "js-sys",
+ "num-traits",
+ "wasm-bindgen",
+ "windows-targets 0.48.5",
+]
+
[[package]]
name = "clang-sys"
version = "1.7.0"
@@ -437,6 +497,32 @@ dependencies = [
"tracing-subscriber",
]
+[[package]]
+name = "const-oid"
+version = "0.9.6"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "c2459377285ad874054d797f3ccebf984978aa39129f6eafde5cdc8315b612f8"
+
+[[package]]
+name = "const-random"
+version = "0.1.17"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "5aaf16c9c2c612020bcfd042e170f6e32de9b9d75adb5277cdbbd2e2c8c8299a"
+dependencies = [
+ "const-random-macro",
+]
+
+[[package]]
+name = "const-random-macro"
+version = "0.1.16"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f9d839f2a20b0aee515dc581a6172f2321f96cab76c1a38a4c584a194955390e"
+dependencies = [
+ "getrandom",
+ "once_cell",
+ "tiny-keccak",
+]
+
[[package]]
name = "core-foundation"
version = "0.9.3"
@@ -607,6 +693,12 @@ dependencies = [
"winapi",
]
+[[package]]
+name = "crunchy"
+version = "0.2.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "7a81dae078cea95a014a339291cec439d2f232ebe854a9d672b796c6afafa9b7"
+
[[package]]
name = "crypto-common"
version = "0.1.6"
@@ -674,6 +766,17 @@ dependencies = [
"uuid",
]
+[[package]]
+name = "der"
+version = "0.7.8"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "fffa369a668c8af7dbf8b5e56c9f744fbd399949ed171606040001947de40b1c"
+dependencies = [
+ "const-oid",
+ "pem-rfc7468",
+ "zeroize",
+]
+
[[package]]
name = "deranged"
version = "0.3.8"
@@ -718,7 +821,9 @@ source =
"registry+https://github.com/rust-lang/crates.io-index"
checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292"
dependencies = [
"block-buffer",
+ "const-oid",
"crypto-common",
+ "subtle",
]
[[package]]
@@ -741,6 +846,15 @@ dependencies = [
"winapi",
]
+[[package]]
+name = "dlv-list"
+version = "0.5.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "442039f5147480ba31067cb00ada1adae6892028e40e45fc5de7b7df6dcc1b5f"
+dependencies = [
+ "const-random",
+]
+
[[package]]
name = "either"
version = "1.9.0"
@@ -812,6 +926,15 @@ dependencies = [
"once_cell",
]
+[[package]]
+name = "fastrand"
+version = "1.9.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e51093e27b0797c359783294ca4f0a911c270184cb10f85783b118614a1501be"
+dependencies = [
+ "instant",
+]
+
[[package]]
name = "fastrand"
version = "2.0.0"
@@ -836,6 +959,12 @@ version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80"
+[[package]]
+name = "flagset"
+version = "0.4.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d52a7e408202050813e6f1d9addadcaafef3dca7530c7ddfb005d4081cce6779"
+
[[package]]
name = "flate2"
version = "1.0.27"
@@ -1047,8 +1176,10 @@ source =
"registry+https://github.com/rust-lang/crates.io-index"
checksum = "be4136b2a15dd319360be1c07d9933517ccf0be8f16bf62a3bee4f0d618df427"
dependencies = [
"cfg-if",
+ "js-sys",
"libc",
"wasi",
+ "wasm-bindgen",
]
[[package]]
@@ -1184,6 +1315,15 @@ version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70"
+[[package]]
+name = "hmac"
+version = "0.12.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "6c49c37c09c17a53d937dfbb742eb3a961d65a994e6bcdcf37e7399d0cc8ab5e"
+dependencies = [
+ "digest",
+]
+
[[package]]
name = "home"
version = "0.5.5"
@@ -1257,6 +1397,20 @@ dependencies = [
"want",
]
+[[package]]
+name = "hyper-rustls"
+version = "0.24.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "ec3efd23720e2049821a693cbc7e65ea87c72f1c58ff2f9522ff332b1491e590"
+dependencies = [
+ "futures-util",
+ "http",
+ "hyper",
+ "rustls 0.21.7",
+ "tokio",
+ "tokio-rustls 0.24.1",
+]
+
[[package]]
name = "hyper-timeout"
version = "0.4.1"
@@ -1282,6 +1436,29 @@ dependencies = [
"tokio-native-tls",
]
+[[package]]
+name = "iana-time-zone"
+version = "0.1.59"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b6a67363e2aa4443928ce15e57ebae94fd8949958fd1223c4cfc0cd473ad7539"
+dependencies = [
+ "android_system_properties",
+ "core-foundation-sys",
+ "iana-time-zone-haiku",
+ "js-sys",
+ "wasm-bindgen",
+ "windows-core",
+]
+
+[[package]]
+name = "iana-time-zone-haiku"
+version = "0.1.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f31827a206f56af32e590ba56d5d2d085f558508192593743f16b2306495269f"
+dependencies = [
+ "cc",
+]
+
[[package]]
name = "ident_case"
version = "1.0.1"
@@ -1343,11 +1520,20 @@ dependencies = [
"log",
"num-format",
"once_cell",
- "quick-xml",
+ "quick-xml 0.26.0",
"rgb",
"str_stack",
]
+[[package]]
+name = "instant"
+version = "0.1.12"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "7a5bbe824c507c5da5956355e86a746d82e0e1464f65d862cc5e71da70e94b2c"
+dependencies = [
+ "cfg-if",
+]
+
[[package]]
name = "io-lifetimes"
version = "1.0.11"
@@ -1400,11 +1586,29 @@ dependencies = [
"wasm-bindgen",
]
+[[package]]
+name = "jsonwebtoken"
+version = "9.2.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "5c7ea04a7c5c055c175f189b6dc6ba036fd62306b58c66c9f6389036c503a3f4"
+dependencies = [
+ "base64 0.21.4",
+ "js-sys",
+ "pem",
+ "ring 0.17.7",
+ "serde",
+ "serde_json",
+ "simple_asn1",
+]
+
[[package]]
name = "lazy_static"
version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
+dependencies = [
+ "spin 0.5.2",
+]
[[package]]
name = "lazycell"
@@ -1449,6 +1653,12 @@ dependencies = [
"windows-sys 0.48.0",
]
+[[package]]
+name = "libm"
+version = "0.2.8"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "4ec2a862134d2a7d32d7983ddcdd1c4923530833c9f2ea1a44fc5fa473989058"
+
[[package]]
name = "linux-raw-sys"
version = "0.1.4"
@@ -1492,6 +1702,16 @@ version = "0.7.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94"
+[[package]]
+name = "md-5"
+version = "0.10.6"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d89e7ee0cfbedfc4da3340218492196241d89eefb6dab27de5df917a6d2e78cf"
+dependencies = [
+ "cfg-if",
+ "digest",
+]
+
[[package]]
name = "memchr"
version = "2.6.3"
@@ -1604,6 +1824,34 @@ dependencies = [
"winapi",
]
+[[package]]
+name = "num-bigint"
+version = "0.4.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "608e7659b5c3d7cba262d894801b9ec9d00de989e8a82bd4bef91d08da45cdc0"
+dependencies = [
+ "autocfg",
+ "num-integer",
+ "num-traits",
+]
+
+[[package]]
+name = "num-bigint-dig"
+version = "0.8.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "dc84195820f291c7697304f3cbdadd1cb7199c0efc917ff5eafd71225c136151"
+dependencies = [
+ "byteorder",
+ "lazy_static",
+ "libm",
+ "num-integer",
+ "num-iter",
+ "num-traits",
+ "rand 0.8.5",
+ "smallvec",
+ "zeroize",
+]
+
[[package]]
name = "num-format"
version = "0.4.4"
@@ -1614,6 +1862,27 @@ dependencies = [
"itoa",
]
+[[package]]
+name = "num-integer"
+version = "0.1.45"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "225d3389fb3509a24c93f5c29eb6bde2586b98d9f016636dff58d7c6f7569cd9"
+dependencies = [
+ "autocfg",
+ "num-traits",
+]
+
+[[package]]
+name = "num-iter"
+version = "0.1.43"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "7d03e6c028c5dc5cac6e2dec0efda81fc887605bb3d884578bb6d6bf7514e252"
+dependencies = [
+ "autocfg",
+ "num-integer",
+ "num-traits",
+]
+
[[package]]
name = "num-traits"
version = "0.2.16"
@@ -1621,6 +1890,7 @@ source =
"registry+https://github.com/rust-lang/crates.io-index"
checksum = "f30b0abd723be7e2ffca1272140fac1a2f084c77ec3e123c192b66af1ee9e6c2"
dependencies = [
"autocfg",
+ "libm",
]
[[package]]
@@ -1648,6 +1918,39 @@ version = "1.18.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dd8b5dd2ae5ed71462c540258bedcb51965123ad7e7ccf4b9a8cafaa4a63576d"
+[[package]]
+name = "opendal"
+version = "0.44.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "c32736a48ef08a5d2212864e2295c8e54f4d6b352b7f49aa0c29a12fc410ff66"
+dependencies = [
+ "anyhow",
+ "async-compat",
+ "async-trait",
+ "backon",
+ "base64 0.21.4",
+ "bytes 1.5.0",
+ "chrono",
+ "flagset",
+ "futures",
+ "getrandom",
+ "http",
+ "log",
+ "md-5",
+ "once_cell",
+ "parking_lot",
+ "percent-encoding",
+ "pin-project 1.1.3",
+ "quick-xml 0.30.0",
+ "reqsign",
+ "reqwest",
+ "serde",
+ "serde_json",
+ "sha2",
+ "tokio",
+ "uuid",
+]
+
[[package]]
name = "openssl"
version = "0.10.57"
@@ -1692,6 +1995,16 @@ dependencies = [
"vcpkg",
]
+[[package]]
+name = "ordered-multimap"
+version = "0.7.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a4d6a8c22fc714f0c2373e6091bf6f5e9b37b1bc0b1184874b7e0a4e303d318f"
+dependencies = [
+ "dlv-list",
+ "hashbrown 0.14.0",
+]
+
[[package]]
name = "os_str_bytes"
version = "6.5.1"
@@ -1745,6 +2058,25 @@ version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "19b17cddbe7ec3f8bc800887bab5e717348c95ea2ca0b1bf0837fb964dc67099"
+[[package]]
+name = "pem"
+version = "3.0.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "1b8fcc794035347fb64beda2d3b462595dd2753e3f268d89c5aae77e8cf2c310"
+dependencies = [
+ "base64 0.21.4",
+ "serde",
+]
+
+[[package]]
+name = "pem-rfc7468"
+version = "0.7.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "88b39c9bfcfc231068454382784bb460aae594343fb030d46e9f50a645418412"
+dependencies = [
+ "base64ct",
+]
+
[[package]]
name = "percent-encoding"
version = "2.3.0"
@@ -1813,6 +2145,27 @@ version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
+[[package]]
+name = "pkcs1"
+version = "0.7.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "c8ffb9f10fa047879315e6625af03c164b16962a5368d724ed16323b68ace47f"
+dependencies = [
+ "der",
+ "pkcs8",
+ "spki",
+]
+
+[[package]]
+name = "pkcs8"
+version = "0.10.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f950b2377845cebe5cf8b5165cb3cc1a5e0fa5cfa3e1f7f55707d8fd82e0a7b7"
+dependencies = [
+ "der",
+ "spki",
+]
+
[[package]]
name = "pkg-config"
version = "0.3.27"
@@ -2065,6 +2418,26 @@ dependencies = [
"memchr",
]
+[[package]]
+name = "quick-xml"
+version = "0.30.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "eff6510e86862b57b210fd8cbe8ed3f0d7d600b9c2863cd4549a2e033c66e956"
+dependencies = [
+ "memchr",
+ "serde",
+]
+
+[[package]]
+name = "quick-xml"
+version = "0.31.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "1004a344b30a54e2ee58d66a71b32d2db2feb0a31f9a2d302bf0536f15de2a33"
+dependencies = [
+ "memchr",
+ "serde",
+]
+
[[package]]
name = "quote"
version = "1.0.33"
@@ -2236,6 +2609,38 @@ dependencies = [
"winapi",
]
+[[package]]
+name = "reqsign"
+version = "0.14.6"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "dce87f66ba6c6acef277a729f989a0eca946cb9ce6a15bcc036bda0f72d4b9fd"
+dependencies = [
+ "anyhow",
+ "async-trait",
+ "base64 0.21.4",
+ "chrono",
+ "form_urlencoded",
+ "getrandom",
+ "hex",
+ "hmac",
+ "home",
+ "http",
+ "jsonwebtoken",
+ "log",
+ "once_cell",
+ "percent-encoding",
+ "quick-xml 0.31.0",
+ "rand 0.8.5",
+ "reqwest",
+ "rsa",
+ "rust-ini",
+ "serde",
+ "serde_json",
+ "sha1",
+ "sha2",
+ "tokio",
+]
+
[[package]]
name = "reqwest"
version = "0.11.20"
@@ -2251,6 +2656,7 @@ dependencies = [
"http",
"http-body",
"hyper",
+ "hyper-rustls",
"hyper-tls",
"ipnet",
"js-sys",
@@ -2260,15 +2666,21 @@ dependencies = [
"once_cell",
"percent-encoding",
"pin-project-lite",
+ "rustls 0.21.7",
+ "rustls-native-certs",
+ "rustls-pemfile",
"serde",
"serde_json",
"serde_urlencoded",
"tokio",
"tokio-native-tls",
+ "tokio-rustls 0.24.1",
+ "tokio-util 0.7.9",
"tower-service",
"url",
"wasm-bindgen",
"wasm-bindgen-futures",
+ "wasm-streams",
"web-sys",
"winreg",
]
@@ -2301,11 +2713,25 @@ dependencies = [
"libc",
"once_cell",
"spin 0.5.2",
- "untrusted",
+ "untrusted 0.7.1",
"web-sys",
"winapi",
]
+[[package]]
+name = "ring"
+version = "0.17.7"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "688c63d65483050968b2a8937f7995f443e27041a0f7700aa59b0822aedebb74"
+dependencies = [
+ "cc",
+ "getrandom",
+ "libc",
+ "spin 0.9.8",
+ "untrusted 0.9.0",
+ "windows-sys 0.48.0",
+]
+
[[package]]
name = "roxmltree"
version = "0.18.1"
@@ -2315,6 +2741,36 @@ dependencies = [
"xmlparser",
]
+[[package]]
+name = "rsa"
+version = "0.9.6"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "5d0e5124fcb30e76a7e79bfee683a2746db83784b86289f6251b54b7950a0dfc"
+dependencies = [
+ "const-oid",
+ "digest",
+ "num-bigint-dig",
+ "num-integer",
+ "num-traits",
+ "pkcs1",
+ "pkcs8",
+ "rand_core 0.6.4",
+ "signature",
+ "spki",
+ "subtle",
+ "zeroize",
+]
+
+[[package]]
+name = "rust-ini"
+version = "0.20.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "3e0698206bcb8882bf2a9ecb4c1e7785db57ff052297085a6efd4fe42302068a"
+dependencies = [
+ "cfg-if",
+ "ordered-multimap",
+]
+
[[package]]
name = "rustc-demangle"
version = "0.1.23"
@@ -2362,7 +2818,7 @@ checksum =
"35edb675feee39aec9c99fa5ff985081995a06d594114ae14cbe797ad7b7a6d7"
dependencies = [
"base64 0.13.1",
"log",
- "ring",
+ "ring 0.16.20",
"sct 0.6.1",
"webpki",
]
@@ -2374,11 +2830,23 @@ source =
"registry+https://github.com/rust-lang/crates.io-index"
checksum = "cd8d6c9f025a446bc4d18ad9632e69aec8f287aa84499ee335599fabd20c3fd8"
dependencies = [
"log",
- "ring",
+ "ring 0.16.20",
"rustls-webpki",
"sct 0.7.0",
]
+[[package]]
+name = "rustls-native-certs"
+version = "0.6.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a9aace74cb666635c918e9c12bc0d348266037aa8eb599b5cba565709a8dff00"
+dependencies = [
+ "openssl-probe",
+ "rustls-pemfile",
+ "schannel",
+ "security-framework",
+]
+
[[package]]
name = "rustls-pemfile"
version = "1.0.3"
@@ -2394,8 +2862,8 @@ version = "0.101.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3c7d5dece342910d9ba34d259310cae3e0154b873b35408b787b59bce53d34fe"
dependencies = [
- "ring",
- "untrusted",
+ "ring 0.16.20",
+ "untrusted 0.7.1",
]
[[package]]
@@ -2431,8 +2899,8 @@ version = "0.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b362b83898e0e69f38515b82ee15aa80636befe47c3b6d3d89a911e78fc228ce"
dependencies = [
- "ring",
- "untrusted",
+ "ring 0.16.20",
+ "untrusted 0.7.1",
]
[[package]]
@@ -2441,8 +2909,8 @@ version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d53dcdb7c9f8158937a7981b48accfd39a43af418591a5d008c7b22b5e1b7ca4"
dependencies = [
- "ring",
- "untrusted",
+ "ring 0.16.20",
+ "untrusted 0.7.1",
]
[[package]]
@@ -2531,6 +2999,17 @@ dependencies = [
"digest",
]
+[[package]]
+name = "sha2"
+version = "0.10.8"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "793db75ad2bcafc3ffa7c68b215fee268f537982cd901d132f89c6343f3a3dc8"
+dependencies = [
+ "cfg-if",
+ "cpufeatures",
+ "digest",
+]
+
[[package]]
name = "sharded-slab"
version = "0.1.4"
@@ -2576,6 +3055,28 @@ dependencies = [
"libc",
]
+[[package]]
+name = "signature"
+version = "2.2.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "77549399552de45a898a580c1b41d445bf730df867cc44e6c0233bbc4b8329de"
+dependencies = [
+ "digest",
+ "rand_core 0.6.4",
+]
+
+[[package]]
+name = "simple_asn1"
+version = "0.6.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "adc4e5204eb1910f40f9cfa375f6f05b68c3abac4b6fd879c8ff5e7ae8a0a085"
+dependencies = [
+ "num-bigint",
+ "num-traits",
+ "thiserror",
+ "time",
+]
+
[[package]]
name = "slab"
version = "0.4.9"
@@ -2626,6 +3127,16 @@ dependencies = [
"lock_api",
]
+[[package]]
+name = "spki"
+version = "0.7.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d91ed6c858b01f942cd56b37a94b3e0a1798290327d1236e4d9cf4eaca44d29d"
+dependencies = [
+ "base64ct",
+ "der",
+]
+
[[package]]
name = "sse-codec"
version = "0.3.2"
@@ -2662,6 +3173,12 @@ version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623"
+[[package]]
+name = "subtle"
+version = "2.5.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "81cdd64d312baedb58e21336b31bc043b77e01cc99033ce76ef539f78e965ebc"
+
[[package]]
name = "symbolic-common"
version = "10.2.1"
@@ -2730,7 +3247,7 @@ source =
"registry+https://github.com/rust-lang/crates.io-index"
checksum = "cb94d2f3cc536af71caac6b6fcebf65860b347e7ce0cc9ebe8f70d3e521054ef"
dependencies = [
"cfg-if",
- "fastrand",
+ "fastrand 2.0.0",
"redox_syscall 0.3.5",
"rustix 0.38.14",
"windows-sys 0.48.0",
@@ -2840,6 +3357,15 @@ dependencies = [
"time-core",
]
+[[package]]
+name = "tiny-keccak"
+version = "2.0.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "2c9d3793400a45f954c52e73d068316d76b6f4e36977e3fcebb13a2721e80237"
+dependencies = [
+ "crunchy",
+]
+
[[package]]
name = "tinyvec"
version = "1.6.0"
@@ -3278,6 +3804,7 @@ dependencies = [
"hyper",
"log",
"once_cell",
+ "opendal",
"pin-project-lite",
"poem",
"pprof",
@@ -3315,6 +3842,12 @@ version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a"
+[[package]]
+name = "untrusted"
+version = "0.9.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1"
+
[[package]]
name = "url"
version = "2.4.1"
@@ -3342,6 +3875,7 @@ source =
"registry+https://github.com/rust-lang/crates.io-index"
checksum = "79daa5ed5740825c40b389c5e50312b9c86df53fccd33f281df655642b43869d"
dependencies = [
"getrandom",
+ "serde",
]
[[package]]
@@ -3443,6 +3977,19 @@ version = "0.2.87"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ca6ad05a4870b2bf5fe995117d3728437bd27d7cd5f06f13c17443ef369775a1"
+[[package]]
+name = "wasm-streams"
+version = "0.3.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b4609d447824375f43e1ffbc051b50ad8f4b3ae8219680c94452ea05eb240ac7"
+dependencies = [
+ "futures-util",
+ "js-sys",
+ "wasm-bindgen",
+ "wasm-bindgen-futures",
+ "web-sys",
+]
+
[[package]]
name = "web-sys"
version = "0.3.64"
@@ -3459,8 +4006,8 @@ version = "0.21.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b8e38c0608262c46d4a56202ebabdeb094cef7e560ca7a226c6bf055188aa4ea"
dependencies = [
- "ring",
- "untrusted",
+ "ring 0.16.20",
+ "untrusted 0.7.1",
]
[[package]]
@@ -3506,6 +4053,15 @@ version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
+[[package]]
+name = "windows-core"
+version = "0.52.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "33ab640c8d7e35bf8ba19b884ba838ceb4fba93a4e8c65a9059d08afcfc683d9"
+dependencies = [
+ "windows-targets 0.52.0",
+]
+
[[package]]
name = "windows-sys"
version = "0.45.0"
@@ -3554,6 +4110,21 @@ dependencies = [
"windows_x86_64_msvc 0.48.5",
]
+[[package]]
+name = "windows-targets"
+version = "0.52.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "8a18201040b24831fbb9e4eb208f8892e1f50a37feb53cc7ff887feb8f50e7cd"
+dependencies = [
+ "windows_aarch64_gnullvm 0.52.0",
+ "windows_aarch64_msvc 0.52.0",
+ "windows_i686_gnu 0.52.0",
+ "windows_i686_msvc 0.52.0",
+ "windows_x86_64_gnu 0.52.0",
+ "windows_x86_64_gnullvm 0.52.0",
+ "windows_x86_64_msvc 0.52.0",
+]
+
[[package]]
name = "windows_aarch64_gnullvm"
version = "0.42.2"
@@ -3566,6 +4137,12 @@ version = "0.48.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2b38e32f0abccf9987a4e3079dfb67dcd799fb61361e53e2882c3cbaf0d905d8"
+[[package]]
+name = "windows_aarch64_gnullvm"
+version = "0.52.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "cb7764e35d4db8a7921e09562a0304bf2f93e0a51bfccee0bd0bb0b666b015ea"
+
[[package]]
name = "windows_aarch64_msvc"
version = "0.42.2"
@@ -3578,6 +4155,12 @@ version = "0.48.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dc35310971f3b2dbbf3f0690a219f40e2d9afcf64f9ab7cc1be722937c26b4bc"
+[[package]]
+name = "windows_aarch64_msvc"
+version = "0.52.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "bbaa0368d4f1d2aaefc55b6fcfee13f41544ddf36801e793edbbfd7d7df075ef"
+
[[package]]
name = "windows_i686_gnu"
version = "0.42.2"
@@ -3590,6 +4173,12 @@ version = "0.48.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a75915e7def60c94dcef72200b9a8e58e5091744960da64ec734a6c6e9b3743e"
+[[package]]
+name = "windows_i686_gnu"
+version = "0.52.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a28637cb1fa3560a16915793afb20081aba2c92ee8af57b4d5f28e4b3e7df313"
+
[[package]]
name = "windows_i686_msvc"
version = "0.42.2"
@@ -3602,6 +4191,12 @@ version = "0.48.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8f55c233f70c4b27f66c523580f78f1004e8b5a8b659e05a4eb49d4166cca406"
+[[package]]
+name = "windows_i686_msvc"
+version = "0.52.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "ffe5e8e31046ce6230cc7215707b816e339ff4d4d67c65dffa206fd0f7aa7b9a"
+
[[package]]
name = "windows_x86_64_gnu"
version = "0.42.2"
@@ -3614,6 +4209,12 @@ version = "0.48.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "53d40abd2583d23e4718fddf1ebec84dbff8381c07cae67ff7768bbf19c6718e"
+[[package]]
+name = "windows_x86_64_gnu"
+version = "0.52.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "3d6fa32db2bc4a2f5abeacf2b69f7992cd09dca97498da74a151a3132c26befd"
+
[[package]]
name = "windows_x86_64_gnullvm"
version = "0.42.2"
@@ -3626,6 +4227,12 @@ version = "0.48.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b7b52767868a23d5bab768e390dc5f5c55825b6d30b86c844ff2dc7414044cc"
+[[package]]
+name = "windows_x86_64_gnullvm"
+version = "0.52.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "1a657e1e9d3f514745a572a6846d3c7aa7dbe1658c056ed9c3344c4109a6949e"
+
[[package]]
name = "windows_x86_64_msvc"
version = "0.42.2"
@@ -3638,6 +4245,12 @@ version = "0.48.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538"
+[[package]]
+name = "windows_x86_64_msvc"
+version = "0.52.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "dff9641d1cd4be8d1a070daf9e3773c5f67e78b4d9d42263020c057706765c04"
+
[[package]]
name = "winnow"
version = "0.5.15"
@@ -3662,3 +4275,9 @@ name = "xmlparser"
version = "0.13.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "66fee0b777b0f5ac1c69bb06d361268faafa61cd4682ae064a171c16c433e9e4"
+
+[[package]]
+name = "zeroize"
+version = "1.7.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "525b4ec142c6b68a2d10f01f7bbf6755599ca3f81ea53b8431b7dd348f5fdb2d"
diff --git a/rust/experimental/server/Cargo.toml
b/rust/experimental/server/Cargo.toml
index acb082986..ad9f5fc92 100644
--- a/rust/experimental/server/Cargo.toml
+++ b/rust/experimental/server/Cargo.toml
@@ -97,6 +97,7 @@ clap = "3.0.14"
socket2 = { version="0.4", features = ["all"]}
cap = "0.1.2"
spin = "0.9.8"
+opendal = { version = "0.44.0", features = ["services-fs"]}
[dependencies.hdfs-native]
version = "0.5.0"
diff --git a/rust/experimental/server/README.md
b/rust/experimental/server/README.md
index 73d48e833..9cec4df48 100644
--- a/rust/experimental/server/README.md
+++ b/rust/experimental/server/README.md
@@ -71,14 +71,15 @@ memory_spill_low_watermark = 0.7
```
#### TeraSort cost times
-| type/buffer capacity | 250G (compressed) |
comment |
-|--------------------------------------|:------------------:|:--------------------------------------------------------:|
-| vanilla uniffle (grpc-based) / 10g | 5.3min (2.3m/3m) |
1.9G/s |
-| vanilla uniffle (grpc-based) / 300g | 5.6min (3.7m/1.9m) | GC
occurs frequently / 2.5G/s |
-| vanilla uniffle (netty-based) / 10g | / | read failed.
2.5G/s (write is better due to zero copy) |
-| vanilla uniffle (netty-based) / 300g | / |
app hang |
-| rust based shuffle server / 10g | 4.6min (2.2m/2.4m) |
2.0 G/s |
-| rust based shuffle server / 300g | 4min (1.5m/2.5m) |
3.5 G/s |
+| type/buffer capacity | 250G (compressed) |
comment |
+|--------------------------------------|:------------------:|:------------------------------------------------------------------------:|
+| vanilla spark ess | 5.0min (2.2/2.8) | ess use 400
nodes but uniffle only one. But the rss speed is still fast! |
+| vanilla uniffle (grpc-based) / 10g | 5.3min (2.3m/3m) |
1.9G/s |
+| vanilla uniffle (grpc-based) / 300g | 5.6min (3.7m/1.9m) |
GC occurs frequently / 2.5G/s |
+| vanilla uniffle (netty-based) / 10g | / | read
failed. 2.5G/s (write is better due to zero copy) |
+| vanilla uniffle (netty-based) / 300g | / |
app hang |
+| rust based shuffle server / 10g | 4.6min (2.2m/2.4m) |
2.4 G/s |
+| rust based shuffle server / 300g | 4min (1.5m/2.5m) |
3.5 G/s |
Compared with grpc based server, rust-based server has less memory footprint
and stable performance.
diff --git a/rust/experimental/server/src/store/hybrid.rs
b/rust/experimental/server/src/store/hybrid.rs
index 32be53511..f9feaa091 100644
--- a/rust/experimental/server/src/store/hybrid.rs
+++ b/rust/experimental/server/src/store/hybrid.rs
@@ -230,7 +230,10 @@ impl HybridStore {
ctx.data_blocks.sort_by_key(|block| block.task_attempt_id);
// when throwing the data lost error, it should fast fail for this
partition data.
- let inserted = candidate_store.insert(ctx).await;
+ let inserted = candidate_store
+ .insert(ctx)
+ .instrument_await("inserting into the persistent store, invoking
[write]")
+ .await;
if let Err(err) = inserted {
match err {
WorkerError::PARTIAL_DATA_LOST(msg) => {
@@ -335,15 +338,19 @@ impl Store for HybridStore {
let store = self.clone();
let concurrency_limiter =
Arc::new(Semaphore::new(store.memory_spill_max_concurrency as
usize));
- self.runtime_manager.default_runtime.spawn(async move {
+ self.runtime_manager.write_runtime.spawn(async move {
while let Ok(message) = store.memory_spill_recv.recv().await {
let await_root = await_tree_registry
- .register(format!("hot->warm flush."))
+ .register(format!("hot->warm flush. uid: {:#?}",
&message.ctx.uid))
.await;
// using acquire_owned(), refer to
https://github.com/tokio-rs/tokio/issues/1998
- let concurrency_guarder =
- concurrency_limiter.clone().acquire_owned().await.unwrap();
+ let concurrency_guarder = concurrency_limiter
+ .clone()
+ .acquire_owned()
+ .instrument_await("waiting for the spill concurrent lock.")
+ .await
+ .unwrap();
TOTAL_MEMORY_SPILL_OPERATION.inc();
GAUGE_MEMORY_SPILL_OPERATION.inc();
@@ -358,6 +365,7 @@ impl Store for HybridStore {
}
match store_cloned
.memory_spill_to_persistent_store(message.ctx.clone(), message.id)
+
.instrument_await("memory_spill_to_persistent_store.")
.await
{
Ok(msg) => {
@@ -768,8 +776,9 @@ mod tests {
let reading_view_ctx = ReadingViewContext {
uid: uid.clone(),
reading_options:
ReadingOptions::FILE_OFFSET_AND_LEN(offset, length as i64),
- serialized_expected_task_ids_bitmap:
Default::default(),
+ serialized_expected_task_ids_bitmap: None,
};
+ println!("reading. offset: {:?}. len: {:?}", offset,
length);
let read_data = store.get(reading_view_ctx).await.unwrap();
match read_data {
ResponseData::Local(local_data) => {
diff --git a/rust/experimental/server/src/store/local/disk.rs
b/rust/experimental/server/src/store/local/disk.rs
new file mode 100644
index 000000000..a0e16af74
--- /dev/null
+++ b/rust/experimental/server/src/store/local/disk.rs
@@ -0,0 +1,372 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::runtime::manager::RuntimeManager;
+use anyhow::{anyhow, Result};
+use await_tree::InstrumentAwait;
+use bytes::{Bytes, BytesMut};
+use futures::AsyncWriteExt;
+use log::{error, info, warn};
+use opendal::services::Fs;
+use opendal::Operator;
+use std::io::SeekFrom;
+use std::sync::atomic::{AtomicBool, Ordering};
+use std::sync::Arc;
+use std::time::Duration;
+use tokio::io::{AsyncReadExt, AsyncSeekExt};
+use tokio::sync::Semaphore;
+
+pub struct LocalDiskConfig {
+ pub(crate) high_watermark: f32,
+ pub(crate) low_watermark: f32,
+ pub(crate) max_concurrency: i32,
+}
+
+impl LocalDiskConfig {
+ pub fn create_mocked_config() -> Self {
+ LocalDiskConfig {
+ high_watermark: 1.0,
+ low_watermark: 0.6,
+ max_concurrency: 20,
+ }
+ }
+}
+
+impl Default for LocalDiskConfig {
+ fn default() -> Self {
+ LocalDiskConfig {
+ high_watermark: 0.8,
+ low_watermark: 0.6,
+ max_concurrency: 40,
+ }
+ }
+}
+
+pub struct LocalDisk {
+ pub(crate) root: String,
+ operator: Operator,
+ concurrency_limiter: Semaphore,
+ is_corrupted: AtomicBool,
+ is_healthy: AtomicBool,
+ config: LocalDiskConfig,
+}
+
+impl LocalDisk {
+ pub fn new(
+ root: String,
+ config: LocalDiskConfig,
+ runtime_manager: RuntimeManager,
+ ) -> Arc<Self> {
+ let mut builder = Fs::default();
+ builder.root(&root);
+ let operator: Operator = Operator::new(builder).unwrap().finish();
+
+ let instance = LocalDisk {
+ root,
+ operator,
+ concurrency_limiter: Semaphore::new(config.max_concurrency as
usize),
+ is_corrupted: AtomicBool::new(false),
+ is_healthy: AtomicBool::new(true),
+ config,
+ };
+ let instance = Arc::new(instance);
+
+ let runtime = runtime_manager.default_runtime.clone();
+ let cloned = instance.clone();
+ runtime.spawn(async {
+ info!("Starting the disk healthy check, root: {}", &cloned.root);
+ LocalDisk::loop_check_disk(cloned).await;
+ });
+
+ instance
+ }
+
+ async fn write_read_check(local_disk: Arc<LocalDisk>) -> Result<()> {
+ let temp_path = "corruption_check.file";
+ // cleanup remaining files before checking.
+ local_disk.delete(temp_path).await?;
+
+ let written_data = Bytes::copy_from_slice(b"file corruption check");
+ local_disk.write(written_data.clone(), temp_path).await?;
+ let read_data = local_disk.read(temp_path, 0, None).await?;
+ local_disk.delete(temp_path).await?;
+
+ if written_data != read_data {
+ let msg = format!(
+ "The local disk has been corrupted. path: {}. expected: {:?},
actual: {:?}",
+ &local_disk.root, &written_data, &read_data
+ );
+ Err(anyhow!(msg))
+ } else {
+ Ok(())
+ }
+ }
+
+ async fn loop_check_disk(local_disk: Arc<LocalDisk>) {
+ loop {
+ tokio::time::sleep(Duration::from_secs(10)).await;
+
+ if local_disk.is_corrupted().unwrap() {
+ return;
+ }
+
+ let check_succeed: Result<()> =
LocalDisk::write_read_check(local_disk.clone()).await;
+ if check_succeed.is_err() {
+ local_disk.mark_corrupted();
+ error!(
+ "Errors on checking local disk corruption. err: {:#?}",
+ check_succeed.err()
+ );
+ }
+
+ // check the capacity
+ let used_ratio = local_disk.get_disk_used_ratio();
+ if used_ratio.is_err() {
+ error!(
+ "Errors on getting the used ratio of the disk capacity.
err: {:?}",
+ used_ratio.err()
+ );
+ continue;
+ }
+
+ let used_ratio = used_ratio.unwrap();
+ if local_disk.is_healthy().unwrap()
+ && used_ratio > local_disk.config.high_watermark as f64
+ {
+ warn!("Disk={} has been unhealthy.", &local_disk.root);
+ local_disk.mark_unhealthy();
+ continue;
+ }
+
+ if !local_disk.is_healthy().unwrap()
+ && used_ratio < local_disk.config.low_watermark as f64
+ {
+ warn!("Disk={} has been healthy.", &local_disk.root);
+ local_disk.mark_healthy();
+ continue;
+ }
+ }
+ }
+
+ pub async fn create_dir(&self, dir: &str) -> Result<()> {
+ self.operator.create_dir(dir).await?;
+ Ok(())
+ }
+
+ // this will ensure the data flushed into the file
+ async fn write(&self, data: Bytes, path: &str) -> Result<()> {
+ self.operator.write(path, data).await?;
+ Ok(())
+ }
+
+ pub async fn append(&self, data: Bytes, path: &str) -> Result<()> {
+ let _concurrency_guarder = self
+ .concurrency_limiter
+ .acquire()
+ .instrument_await("meet the concurrency limiter")
+ .await?;
+
+ let mut writer = self
+ .operator
+ .writer_with(path)
+ .append(true)
+ .instrument_await("creating the writer...")
+ .await?;
+ // we must use the write_all to ensure the buffer consumed by the OS.
+ // Please see the detail:
https://doc.rust-lang.org/std/io/trait.Write.html#method.write_all
+ writer
+ .write_all(&*data)
+ .instrument_await("writing the data into buffer...")
+ .await?;
+ writer
+ .flush()
+ .instrument_await("committing the data into file...")
+ .await?;
+
+ Ok(())
+ }
+
+ pub async fn get_file_len(&self, path: &str) -> Result<i64> {
+ match self.operator.stat(path).await {
+ Ok(meta) => Ok(meta.content_length() as i64),
+ Err(_) => Ok(0),
+ }
+ }
+
+ pub async fn read(&self, path: &str, offset: i64, length: Option<i64>) ->
Result<Bytes> {
+ if length.is_none() {
+ return Ok(Bytes::from(self.operator.read(path).await?));
+ }
+
+ let mut reader = self.operator.reader(path).await?;
+ reader.seek(SeekFrom::Start(offset as u64)).await?;
+
+ let mut buffer = vec![0; length.unwrap() as usize];
+ reader.read_exact(buffer.as_mut()).await?;
+
+ let mut bytes_buffer = BytesMut::new();
+ bytes_buffer.extend_from_slice(&*buffer);
+ Ok(bytes_buffer.freeze())
+ }
+
+ pub async fn delete(&self, path: &str) -> Result<()> {
+ self.operator.remove_all(path).await?;
+ Ok(())
+ }
+
+ fn mark_corrupted(&self) {
+ self.is_corrupted.store(true, Ordering::SeqCst);
+ }
+
+ fn mark_unhealthy(&self) {
+ self.is_healthy.store(false, Ordering::SeqCst);
+ }
+
+ fn mark_healthy(&self) {
+ self.is_healthy.store(true, Ordering::SeqCst);
+ }
+
+ pub fn is_corrupted(&self) -> Result<bool> {
+ Ok(self.is_corrupted.load(Ordering::SeqCst))
+ }
+
+ pub fn is_healthy(&self) -> Result<bool> {
+ Ok(self.is_healthy.load(Ordering::SeqCst))
+ }
+
+ fn get_disk_used_ratio(&self) -> Result<f64> {
+ // Get the total and available space in bytes
+ let available_space = fs2::available_space(&self.root)?;
+ let total_space = fs2::total_space(&self.root)?;
+ Ok(1.0 - (available_space as f64 / total_space as f64))
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use crate::runtime::manager::RuntimeManager;
+ use crate::store::local::disk::{LocalDisk, LocalDiskConfig};
+ use bytes::Bytes;
+ use std::time::Duration;
+
+ #[test]
+ fn test_local_disk_delete_operation() {
+ let temp_dir =
tempdir::TempDir::new("test_local_disk_delete_operation-dir").unwrap();
+ let temp_path = temp_dir.path().to_str().unwrap().to_string();
+
+ println!("init the path: {}", &temp_path);
+
+ let runtime: RuntimeManager = Default::default();
+ let local_disk = LocalDisk::new(
+ temp_path.clone(),
+ LocalDiskConfig::default(),
+ runtime.clone(),
+ );
+
+ let data = b"hello!";
+ runtime.wait(local_disk.create_dir("a/")).unwrap();
+ runtime
+ .wait(local_disk.append(Bytes::copy_from_slice(data), "a/b"))
+ .unwrap();
+
+ assert_eq!(
+ true,
+ runtime
+ .wait(tokio::fs::try_exists(format!(
+ "{}/{}",
+ &temp_path,
+ "a/b".to_string()
+ )))
+ .unwrap()
+ );
+
+ runtime
+ .wait(local_disk.delete("a/"))
+ .expect("TODO: panic message");
+ assert_eq!(
+ false,
+ runtime
+ .wait(tokio::fs::try_exists(format!(
+ "{}/{}",
+ &temp_path,
+ "a/b".to_string()
+ )))
+ .unwrap()
+ );
+ }
+
+ #[test]
+ fn local_disk_corruption_healthy_check() {
+ let temp_dir = tempdir::TempDir::new("test_directory").unwrap();
+ let temp_path = temp_dir.path().to_str().unwrap().to_string();
+
+ let local_disk = LocalDisk::new(
+ temp_path.clone(),
+ LocalDiskConfig::create_mocked_config(),
+ Default::default(),
+ );
+
+ awaitility::at_most(Duration::from_secs(10)).until(||
local_disk.is_healthy().unwrap());
+ assert_eq!(false, local_disk.is_corrupted().unwrap());
+ }
+
+ #[test]
+ fn local_disk_test() {
+ let temp_dir = tempdir::TempDir::new("test_directory").unwrap();
+ let temp_path = temp_dir.path().to_str().unwrap().to_string();
+
+ let runtime: RuntimeManager = Default::default();
+ let local_disk = LocalDisk::new(
+ temp_path.clone(),
+ LocalDiskConfig::default(),
+ runtime.clone(),
+ );
+
+ let data = b"Hello, World!";
+
+ let relative_path = "app-id/test_file.txt";
+
+ runtime.wait(local_disk.create_dir("app-id/")).unwrap();
+
+ for _ in 0..2 {
+ let write_result =
+ runtime.wait(local_disk.append(Bytes::copy_from_slice(data),
relative_path));
+ assert!(write_result.is_ok());
+ }
+
+ let read_result = runtime.wait(local_disk.read(relative_path, 0,
Some(data.len() as i64)));
+ assert!(read_result.is_ok());
+ let read_data = read_result.unwrap();
+ let expected = b"Hello, World!";
+ assert_eq!(read_data.as_ref(), expected);
+
+ // read the middle word
+ let read_result = runtime.wait(local_disk.read(
+ relative_path,
+ data.len() as i64,
+ Some(data.len() as i64),
+ ));
+ assert_eq!(read_result.unwrap().as_ref(), expected);
+
+ // read all words
+ let read_result = runtime.wait(local_disk.read(relative_path, 0,
None));
+ let expected = b"Hello, World!Hello, World!";
+ assert_eq!(read_result.unwrap().as_ref(), expected);
+
+ temp_dir.close().unwrap();
+ }
+}
diff --git a/rust/experimental/server/src/store/local/mod.rs
b/rust/experimental/server/src/store/local/mod.rs
new file mode 100644
index 000000000..63ef41a5b
--- /dev/null
+++ b/rust/experimental/server/src/store/local/mod.rs
@@ -0,0 +1,18 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+pub mod disk;
diff --git a/rust/experimental/server/src/store/localfile.rs
b/rust/experimental/server/src/store/localfile.rs
index e527d8cdb..cc2a881f6 100644
--- a/rust/experimental/server/src/store/localfile.rs
+++ b/rust/experimental/server/src/store/localfile.rs
@@ -28,38 +28,42 @@ use crate::store::{
LocalDataIndex, PartitionedLocalData, Persistent, RequireBufferResponse,
ResponseData,
ResponseDataIndex, Store,
};
+use std::ops::Deref;
+use std::path::Path;
use anyhow::Result;
use async_trait::async_trait;
use await_tree::InstrumentAwait;
-use bytes::{BufMut, Bytes, BytesMut};
+use bytes::{BufMut, BytesMut};
use dashmap::DashMap;
-use log::{debug, error, info, warn};
+use log::{debug, error, warn};
use crate::runtime::manager::RuntimeManager;
-use std::io::SeekFrom;
-use std::path::Path;
-use std::sync::atomic::{AtomicBool, Ordering};
+use std::sync::atomic::{AtomicI64, Ordering};
use std::sync::Arc;
-use std::time::Duration;
-use tokio::fs::OpenOptions;
-use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
-use tokio::sync::{RwLock, Semaphore};
-
-fn create_directory_if_not_exists(dir_path: &str) {
- if !std::fs::metadata(dir_path).is_ok() {
- std::fs::create_dir_all(dir_path).expect("Errors on creating dirs.");
+
+use crate::store::local::disk::{LocalDisk, LocalDiskConfig};
+
+struct LockedObj {
+ disk: Arc<LocalDisk>,
+ pointer: AtomicI64,
+}
+
+impl From<Arc<LocalDisk>> for LockedObj {
+ fn from(value: Arc<LocalDisk>) -> Self {
+ Self {
+ disk: value.clone(),
+ pointer: Default::default(),
+ }
}
}
pub struct LocalFileStore {
local_disks: Vec<Arc<LocalDisk>>,
- partition_written_disk_map: DashMap<String, DashMap<i32, DashMap<i32,
Arc<LocalDisk>>>>,
- partition_file_locks: DashMap<String, Arc<RwLock<()>>>,
healthy_check_min_disks: i32,
-
runtime_manager: RuntimeManager,
+ partition_locks: DashMap<String, Arc<LockedObj>>,
}
impl Persistent for LocalFileStore {}
@@ -81,10 +85,9 @@ impl LocalFileStore {
}
LocalFileStore {
local_disks: local_disk_instances,
- partition_written_disk_map: DashMap::new(),
- partition_file_locks: DashMap::new(),
healthy_check_min_disks: 1,
runtime_manager,
+ partition_locks: Default::default(),
}
}
@@ -101,10 +104,9 @@ impl LocalFileStore {
}
LocalFileStore {
local_disks: local_disk_instances,
- partition_written_disk_map: DashMap::new(),
- partition_file_locks: DashMap::new(),
healthy_check_min_disks:
localfile_config.healthy_check_min_disks.unwrap_or(1),
runtime_manager,
+ partition_locks: Default::default(),
}
}
@@ -129,69 +131,6 @@ impl LocalFileStore {
)
}
- fn get_app_all_partitions(&self, app_id: &str) -> Vec<(i32, i32)> {
- let stage_entry = self.partition_written_disk_map.get(app_id);
- if stage_entry.is_none() {
- return vec![];
- }
-
- let stages = stage_entry.unwrap();
- let mut partition_ids = vec![];
- for stage_item in stages.iter() {
- let (shuffle_id, partitions) = stage_item.pair();
- for partition_item in partitions.iter() {
- let (partition_id, _) = partition_item.pair();
- partition_ids.push((*shuffle_id, *partition_id));
- }
- }
-
- partition_ids
- }
-
- fn delete_app(&self, app_id: &str) -> Result<()> {
- self.partition_written_disk_map.remove(app_id);
- Ok(())
- }
-
- fn get_owned_disk(&self, uid: PartitionedUId) -> Option<Arc<LocalDisk>> {
- let app_id = uid.app_id;
- let shuffle_id = uid.shuffle_id;
- let partition_id = uid.partition_id;
-
- let shuffle_entry = self
- .partition_written_disk_map
- .entry(app_id)
- .or_insert_with(|| DashMap::new());
- let partition_entry = shuffle_entry
- .entry(shuffle_id)
- .or_insert_with(|| DashMap::new());
-
- partition_entry
- .get(&partition_id)
- .map(|v| v.value().clone())
- }
-
- async fn get_or_create_owned_disk(&self, uid: PartitionedUId) ->
Result<Arc<LocalDisk>> {
- let uid_ref = &uid.clone();
- let app_id = uid.app_id;
- let shuffle_id = uid.shuffle_id;
- let partition_id = uid.partition_id;
-
- let shuffle_entry = self
- .partition_written_disk_map
- .entry(app_id)
- .or_insert_with(|| DashMap::new());
- let partition_entry = shuffle_entry
- .entry(shuffle_id)
- .or_insert_with(|| DashMap::new());
- let local_disk = partition_entry
- .entry(partition_id)
- .or_insert(self.select_disk(uid_ref).await?)
- .clone();
-
- Ok(local_disk)
- }
-
fn healthy_check(&self) -> Result<bool> {
let mut available = 0;
for local_disk in &self.local_disks {
@@ -207,7 +146,7 @@ impl LocalFileStore {
Ok(available > self.healthy_check_min_disks)
}
- async fn select_disk(&self, uid: &PartitionedUId) ->
Result<Arc<LocalDisk>, WorkerError> {
+ fn select_disk(&self, uid: &PartitionedUId) -> Result<Arc<LocalDisk>,
WorkerError> {
let hash_value = PartitionedUId::get_hash(uid);
let mut candidates = vec![];
@@ -244,36 +183,33 @@ impl Store for LocalFileStore {
}
let uid = ctx.uid;
- let _pid = uid.partition_id;
let (data_file_path, index_file_path) =
LocalFileStore::gen_relative_path_for_partition(&uid);
- let local_disk = self.get_or_create_owned_disk(uid.clone()).await?;
-
- if local_disk.is_corrupted()? {
- return Err(WorkerError::PARTIAL_DATA_LOST(
- local_disk.base_path.to_string(),
- ));
- }
- let lock_cloned = self
- .partition_file_locks
+ let mut parent_dir_is_created = false;
+ let locked_obj = self
+ .partition_locks
.entry(data_file_path.clone())
- .or_insert_with(|| Arc::new(RwLock::new(())))
+ .or_insert_with(|| {
+ parent_dir_is_created = true;
+ Arc::new(LockedObj::from(self.select_disk(&uid).unwrap()))
+ })
.clone();
- let _lock_guard = lock_cloned
- .write()
- .instrument_await(format!(
- "localfile partition file lock. path: {}",
- &data_file_path
- ))
- .await;
- // write index file and data file
- // todo: split multiple pieces
- let mut next_offset = local_disk
- .get_file_len(data_file_path.clone())
- .instrument_await(format!("getting the file len. path: {}",
&data_file_path))
- .await?;
+ let local_disk = &locked_obj.disk;
+ let mut next_offset = locked_obj.pointer.load(Ordering::SeqCst);
+
+ if local_disk.is_corrupted()? {
+ return
Err(WorkerError::PARTIAL_DATA_LOST(local_disk.root.to_string()));
+ }
+
+ if !parent_dir_is_created {
+ if let Some(path) = Path::new(&data_file_path).parent() {
+ local_disk
+ .create_dir(format!("{:?}/",
path.to_str().unwrap()).as_str())
+ .await?;
+ }
+ }
let mut index_bytes_holder = BytesMut::new();
let mut data_bytes_holder = BytesMut::new();
@@ -296,25 +232,30 @@ impl Store for LocalFileStore {
index_bytes_holder.put_i64(task_attempt_id);
let data = block.data;
- // if get_crc(&data) != crc {
- // error!("The crc value is not the same. partition id: {},
block id: {}", pid, block_id);
- // }
data_bytes_holder.extend_from_slice(&data);
next_offset += length as i64;
}
local_disk
- .write(data_bytes_holder.freeze(), data_file_path.clone())
- .instrument_await(format!("localfile writing data. path: {}",
data_file_path))
+ .append(data_bytes_holder.freeze(), &data_file_path)
+ .instrument_await(format!("localfile writing data. path: {}",
&data_file_path))
.await?;
local_disk
- .write(index_bytes_holder.freeze(), index_file_path.clone())
- .instrument_await(format!("localfile writing index. path: {}",
data_file_path))
+ .append(index_bytes_holder.freeze(), &index_file_path)
+ .instrument_await(format!(
+ "localfile writing index. path: {}",
+ &index_file_path
+ ))
.await?;
TOTAL_LOCALFILE_USED.inc_by(total_size as u64);
+ locked_obj
+ .deref()
+ .pointer
+ .store(next_offset, Ordering::SeqCst);
+
Ok(())
}
@@ -333,21 +274,10 @@ impl Store for LocalFileStore {
}
let (data_file_path, _) =
LocalFileStore::gen_relative_path_for_partition(&uid);
- let lock_cloned = self
- .partition_file_locks
- .entry(data_file_path.clone())
- .or_insert_with(|| Arc::new(RwLock::new(())))
- .clone();
- let _lock_guard = lock_cloned
- .read()
- .instrument_await("getting file read lock")
- .await;
-
- let local_disk: Option<Arc<LocalDisk>> =
self.get_owned_disk(uid.clone());
- if local_disk.is_none() {
+ if !self.partition_locks.contains_key(&data_file_path) {
warn!(
- "This should not happen of local disk not found for [{:?}]",
+ "There is no cached data in localfile store for [{:?}]",
&uid
);
return Ok(ResponseData::Local(PartitionedLocalData {
@@ -355,17 +285,26 @@ impl Store for LocalFileStore {
}));
}
- let local_disk = local_disk.unwrap();
+ let locked_object = self
+ .partition_locks
+ .entry(data_file_path.clone())
+ .or_insert_with(||
Arc::new(LockedObj::from(self.select_disk(&uid).unwrap())))
+ .clone();
+
+ let local_disk = &locked_object.disk;
if local_disk.is_corrupted()? {
return Err(WorkerError::LOCAL_DISK_OWNED_BY_PARTITION_CORRUPTED(
- local_disk.base_path.to_string(),
+ local_disk.root.to_string(),
));
}
let data = local_disk
- .read(data_file_path, offset, Some(len))
- .instrument_await("getting data from localfile")
+ .read(&data_file_path, offset, Some(len))
+ .instrument_await(format!(
+ "getting data from localfile: {:?}",
+ &data_file_path
+ ))
.await?;
Ok(ResponseData::Local(PartitionedLocalData { data }))
}
@@ -378,21 +317,9 @@ impl Store for LocalFileStore {
let (data_file_path, index_file_path) =
LocalFileStore::gen_relative_path_for_partition(&uid);
- let lock_cloned = self
- .partition_file_locks
- .entry(data_file_path.clone())
- .or_insert_with(|| Arc::new(RwLock::new(())))
- .clone();
- let _lock_guard = lock_cloned
- .read()
- .instrument_await("waiting file lock to read index data")
- .await;
-
- let local_disk: Option<Arc<LocalDisk>> =
self.get_owned_disk(uid.clone());
-
- if local_disk.is_none() {
+ if !self.partition_locks.contains_key(&data_file_path) {
warn!(
- "This should not happen of local disk not found for [{:?}]",
+ "There is no cached data in localfile store for [{:?}]",
&uid
);
return Ok(Local(LocalDataIndex {
@@ -401,21 +328,29 @@ impl Store for LocalFileStore {
}));
}
- let local_disk = local_disk.unwrap();
+ let locked_object = self
+ .partition_locks
+ .entry(data_file_path.clone())
+ .or_insert_with(||
Arc::new(LockedObj::from(self.select_disk(&uid).unwrap())))
+ .clone();
+ let local_disk = &locked_object.disk;
if local_disk.is_corrupted()? {
return Err(WorkerError::LOCAL_DISK_OWNED_BY_PARTITION_CORRUPTED(
- local_disk.base_path.to_string(),
+ local_disk.root.to_string(),
));
}
let index_data_result = local_disk
- .read(index_file_path, 0, None)
- .instrument_await("reading index data from file")
+ .read(&index_file_path, 0, None)
+ .instrument_await(format!(
+ "reading index data from file: {:?}",
+ &index_file_path
+ ))
.await?;
let len = local_disk
- .get_file_len(data_file_path)
- .instrument_await("getting file len from file")
+ .get_file_len(&data_file_path)
+ .instrument_await(format!("getting file len from file: {:?}",
&data_file_path))
.await?;
Ok(Local(LocalDataIndex {
index_data: index_data_result,
@@ -423,17 +358,6 @@ impl Store for LocalFileStore {
}))
}
- async fn require_buffer(
- &self,
- _ctx: RequireBufferContext,
- ) -> Result<RequireBufferResponse, WorkerError> {
- todo!()
- }
-
- async fn release_buffer(&self, _ctx: ReleaseBufferContext) -> Result<i64,
WorkerError> {
- todo!()
- }
-
async fn purge(&self, ctx: PurgeDataContext) -> Result<()> {
let app_id = ctx.app_id;
let shuffle_id_option = ctx.shuffle_id;
@@ -445,28 +369,18 @@ impl Store for LocalFileStore {
for local_disk_ref in &self.local_disks {
let disk = local_disk_ref.clone();
- disk.delete(data_relative_dir_path.to_string()).await?;
+ disk.delete(&data_relative_dir_path).await?;
}
- if shuffle_id_option.is_none() {
- let all_partition_ids = self.get_app_all_partitions(&app_id);
- if all_partition_ids.is_empty() {
- return Ok(());
- }
-
- for (shuffle_id, partition_id) in all_partition_ids.into_iter() {
- // delete lock
- let uid = PartitionedUId {
- app_id: app_id.clone(),
- shuffle_id,
- partition_id,
- };
- let (data_file_path, _) =
LocalFileStore::gen_relative_path_for_partition(&uid);
- self.partition_file_locks.remove(&data_file_path);
- }
+ let keys_to_delete: Vec<_> = self
+ .partition_locks
+ .iter()
+ .filter(|entry| entry.key().starts_with(&data_relative_dir_path))
+ .map(|entry| entry.key().to_string())
+ .collect();
- // delete disk mapping
- self.delete_app(&app_id)?;
+ for key in keys_to_delete {
+ self.partition_locks.remove(&key);
}
Ok(())
@@ -475,280 +389,16 @@ impl Store for LocalFileStore {
async fn is_healthy(&self) -> Result<bool> {
self.healthy_check()
}
-}
-
-struct LocalDiskConfig {
- high_watermark: f32,
- low_watermark: f32,
- max_concurrency: i32,
-}
-
-impl LocalDiskConfig {
- fn create_mocked_config() -> Self {
- LocalDiskConfig {
- high_watermark: 1.0,
- low_watermark: 0.6,
- max_concurrency: 20,
- }
- }
-}
-
-impl Default for LocalDiskConfig {
- fn default() -> Self {
- LocalDiskConfig {
- high_watermark: 0.8,
- low_watermark: 0.6,
- max_concurrency: 40,
- }
- }
-}
-
-struct LocalDisk {
- base_path: String,
- concurrency_limiter: Semaphore,
- is_corrupted: AtomicBool,
- is_healthy: AtomicBool,
- config: LocalDiskConfig,
-}
-
-impl LocalDisk {
- fn new(path: String, config: LocalDiskConfig, runtime_manager:
RuntimeManager) -> Arc<Self> {
- create_directory_if_not_exists(&path);
- let instance = LocalDisk {
- base_path: path,
- concurrency_limiter: Semaphore::new(config.max_concurrency as
usize),
- is_corrupted: AtomicBool::new(false),
- is_healthy: AtomicBool::new(true),
- config,
- };
- let instance = Arc::new(instance);
-
- let runtime = runtime_manager.default_runtime.clone();
- let cloned = instance.clone();
- runtime.spawn(async {
- info!(
- "Starting the disk healthy checking, base path: {}",
- &cloned.base_path
- );
- LocalDisk::loop_check_disk(cloned).await;
- });
-
- instance
- }
-
- async fn write_read_check(local_disk: Arc<LocalDisk>) -> Result<()> {
- let temp_path = format!("{}/{}", &local_disk.base_path,
"corruption_check.file");
- let data = Bytes::copy_from_slice(b"file corruption check");
- {
- let mut file = OpenOptions::new()
- .write(true)
- .create(true)
- .open(&temp_path)
- .await?;
- file.write_all(&data).await?;
- file.flush().await?;
- }
-
- let mut read_data = Vec::new();
- {
- let mut file = tokio::fs::File::open(&temp_path).await?;
- file.read_to_end(&mut read_data).await?;
-
- tokio::fs::remove_file(&temp_path).await?;
- }
-
- if data != Bytes::copy_from_slice(&read_data) {
- local_disk.mark_corrupted();
- error!(
- "The local disk has been corrupted. path: {}",
- &local_disk.base_path
- );
- }
-
- Ok(())
- }
-
- async fn loop_check_disk(local_disk: Arc<LocalDisk>) {
- loop {
- tokio::time::sleep(Duration::from_secs(10)).await;
-
- if local_disk.is_corrupted().unwrap() {
- return;
- }
-
- let check_succeed: Result<()> =
LocalDisk::write_read_check(local_disk.clone()).await;
- if check_succeed.is_err() {
- local_disk.mark_corrupted();
- error!(
- "Errors on checking local disk corruption. err: {:#?}",
- check_succeed.err()
- );
- }
-
- // check the capacity
- let used_ratio = local_disk.get_disk_used_ratio();
- if used_ratio.is_err() {
- error!(
- "Errors on getting the used ratio of the disk capacity.
err: {:?}",
- used_ratio.err()
- );
- continue;
- }
-
- let used_ratio = used_ratio.unwrap();
- if local_disk.is_healthy().unwrap()
- && used_ratio > local_disk.config.high_watermark as f64
- {
- warn!("Disk={} has been unhealthy.", &local_disk.base_path);
- local_disk.mark_unhealthy();
- continue;
- }
-
- if !local_disk.is_healthy().unwrap()
- && used_ratio < local_disk.config.low_watermark as f64
- {
- warn!("Disk={} has been healthy.", &local_disk.base_path);
- local_disk.mark_healthy();
- continue;
- }
- }
- }
-
- fn append_path(&self, path: String) -> String {
- format!("{}/{}", self.base_path.clone(), path)
- }
-
- async fn write(&self, data: Bytes, relative_file_path: String) ->
Result<()> {
- let _concurrency_guarder = self
- .concurrency_limiter
- .acquire()
- .instrument_await("meet the concurrency limiter")
- .await?;
- let absolute_path = self.append_path(relative_file_path.clone());
- let path = Path::new(&absolute_path);
-
- match path.parent() {
- Some(parent) => {
- if !parent.exists() {
- create_directory_if_not_exists(parent.to_str().unwrap())
- }
- }
- _ => todo!(),
- }
-
- debug!("data file: {}", &absolute_path);
-
- let mut output_file = OpenOptions::new()
- .append(true)
- .create(true)
- .open(absolute_path)
- .await?;
- output_file.write_all(data.as_ref()).await?;
- output_file.flush().await?;
-
- Ok(())
- }
- async fn get_file_len(&self, relative_file_path: String) -> Result<i64> {
- let file_path = self.append_path(relative_file_path);
-
- Ok(
- match tokio::fs::metadata(file_path)
- .instrument_await("getting metadata of path")
- .await
- {
- Ok(metadata) => metadata.len() as i64,
- _ => 0i64,
- },
- )
- }
-
- async fn read(
+ async fn require_buffer(
&self,
- relative_file_path: String,
- offset: i64,
- length: Option<i64>,
- ) -> Result<Bytes> {
- let file_path = self.append_path(relative_file_path);
-
- let file = tokio::fs::File::open(&file_path)
- .instrument_await(format!("opening file. path: {}", &file_path))
- .await?;
-
- let read_len = match length {
- Some(len) => len,
- _ => file
- .metadata()
- .instrument_await(format!("getting file metadata. path: {}",
&file_path))
- .await?
- .len()
- .try_into()
- .unwrap(),
- } as usize;
-
- let mut reader = tokio::io::BufReader::new(file);
- let mut buffer = vec![0; read_len];
- reader
- .seek(SeekFrom::Start(offset as u64))
- .instrument_await(format!(
- "seeking file [{}:{}] of path: {}",
- offset, read_len, &file_path
- ))
- .await?;
- reader
- .read_exact(buffer.as_mut())
- .instrument_await(format!(
- "reading data of len: {} from path: {}",
- read_len, &file_path
- ))
- .await?;
-
- let mut bytes_buffer = BytesMut::new();
- bytes_buffer.extend_from_slice(&*buffer);
- Ok(bytes_buffer.freeze())
- }
-
- async fn delete(&self, relative_file_path: String) -> Result<()> {
- let delete_path = self.append_path(relative_file_path);
- if !tokio::fs::try_exists(&delete_path).await? {
- info!("The path:{} does not exist, ignore purging.", &delete_path);
- return Ok(());
- }
-
- let metadata = tokio::fs::metadata(&delete_path).await?;
- if metadata.is_dir() {
- tokio::fs::remove_dir_all(delete_path).await?;
- } else {
- tokio::fs::remove_file(delete_path).await?;
- }
- Ok(())
- }
-
- fn mark_corrupted(&self) {
- self.is_corrupted.store(true, Ordering::SeqCst);
- }
-
- fn mark_unhealthy(&self) {
- self.is_healthy.store(false, Ordering::SeqCst);
- }
-
- fn mark_healthy(&self) {
- self.is_healthy.store(true, Ordering::SeqCst);
- }
-
- fn is_corrupted(&self) -> Result<bool> {
- Ok(self.is_corrupted.load(Ordering::SeqCst))
- }
-
- fn is_healthy(&self) -> Result<bool> {
- Ok(self.is_healthy.load(Ordering::SeqCst))
+ _ctx: RequireBufferContext,
+ ) -> Result<RequireBufferResponse, WorkerError> {
+ todo!()
}
- fn get_disk_used_ratio(&self) -> Result<f64> {
- // Get the total and available space in bytes
- let available_space = fs2::available_space(&self.base_path)?;
- let total_space = fs2::total_space(&self.base_path)?;
- Ok(1.0 - (available_space as f64 / total_space as f64))
+ async fn release_buffer(&self, _ctx: ReleaseBufferContext) -> Result<i64,
WorkerError> {
+ todo!()
}
}
@@ -758,17 +408,12 @@ mod test {
PartitionedUId, PurgeDataContext, ReadingIndexViewContext,
ReadingOptions,
ReadingViewContext, WritingViewContext,
};
- use crate::store::localfile::{LocalDisk, LocalDiskConfig, LocalFileStore};
+ use crate::store::localfile::LocalFileStore;
use crate::store::{PartitionedDataBlock, ResponseData, ResponseDataIndex,
Store};
use bytes::{Buf, Bytes, BytesMut};
use log::info;
- use crate::runtime::manager::RuntimeManager;
- use std::io::Read;
- use std::thread;
- use std::time::Duration;
-
#[test]
fn purge_test() -> anyhow::Result<()> {
let temp_dir = tempdir::TempDir::new("test_local_store").unwrap();
@@ -840,10 +485,6 @@ mod test {
false,
runtime.wait(tokio::fs::try_exists(format!("{}/{}", &temp_path,
&app_id)))?
);
- assert!(!local_store
- .partition_file_locks
- .contains_key(&format!("{}/{}/{}/{}.data", &temp_path, &app_id, 0,
0)));
- assert!(!local_store.partition_written_disk_map.contains_key(&app_id));
Ok(())
}
@@ -968,109 +609,4 @@ mod test {
temp_dir.close().unwrap();
}
-
- #[test]
- fn test_local_disk_delete_operation() {
- let temp_dir =
tempdir::TempDir::new("test_local_disk_delete_operation-dir").unwrap();
- let temp_path = temp_dir.path().to_str().unwrap().to_string();
-
- println!("init the path: {}", &temp_path);
-
- let runtime: RuntimeManager = Default::default();
- let local_disk = LocalDisk::new(
- temp_path.clone(),
- LocalDiskConfig::default(),
- runtime.clone(),
- );
-
- let data = b"hello!";
- runtime
- .wait(local_disk.write(Bytes::copy_from_slice(data),
"a/b".to_string()))
- .unwrap();
-
- assert_eq!(
- true,
- runtime
- .wait(tokio::fs::try_exists(format!(
- "{}/{}",
- &temp_path,
- "a/b".to_string()
- )))
- .unwrap()
- );
-
- runtime
- .wait(local_disk.delete("a/".to_string()))
- .expect("TODO: panic message");
- assert_eq!(
- false,
- runtime
- .wait(tokio::fs::try_exists(format!(
- "{}/{}",
- &temp_path,
- "a/b".to_string()
- )))
- .unwrap()
- );
- }
-
- #[test]
- fn local_disk_corruption_healthy_check() {
- let temp_dir = tempdir::TempDir::new("test_directory").unwrap();
- let temp_path = temp_dir.path().to_str().unwrap().to_string();
-
- let local_disk = LocalDisk::new(
- temp_path.clone(),
- LocalDiskConfig::create_mocked_config(),
- Default::default(),
- );
-
- thread::sleep(Duration::from_secs(12));
- assert_eq!(true, local_disk.is_healthy().unwrap());
- assert_eq!(false, local_disk.is_corrupted().unwrap());
- }
-
- #[test]
- fn local_disk_test() {
- let temp_dir = tempdir::TempDir::new("test_directory").unwrap();
- let temp_path = temp_dir.path().to_str().unwrap().to_string();
-
- let runtime: RuntimeManager = Default::default();
- let local_disk = LocalDisk::new(
- temp_path.clone(),
- LocalDiskConfig::default(),
- runtime.clone(),
- );
-
- let data = b"Hello, World!";
-
- let relative_path = "app-id/test_file.txt";
- let write_result =
- runtime.wait(local_disk.write(Bytes::copy_from_slice(data),
relative_path.to_string()));
- assert!(write_result.is_ok());
-
- // test whether the content is written
- let file_path = format!("{}/{}", local_disk.base_path, relative_path);
- let mut file = std::fs::File::open(file_path).unwrap();
- let mut file_content = Vec::new();
- file.read_to_end(&mut file_content).unwrap();
- assert_eq!(file_content, data);
-
- // if the file has been created, append some content
- let write_result =
- runtime.wait(local_disk.write(Bytes::copy_from_slice(data),
relative_path.to_string()));
- assert!(write_result.is_ok());
-
- let read_result = runtime.wait(local_disk.read(
- relative_path.to_string(),
- 0,
- Some(data.len() as i64 * 2),
- ));
- assert!(read_result.is_ok());
- let read_data = read_result.unwrap();
- let expected = b"Hello, World!Hello, World!";
- assert_eq!(read_data.as_ref(), expected);
-
- temp_dir.close().unwrap();
- }
}
diff --git a/rust/experimental/server/src/store/mod.rs
b/rust/experimental/server/src/store/mod.rs
index f29c7bf56..caf00a068 100644
--- a/rust/experimental/server/src/store/mod.rs
+++ b/rust/experimental/server/src/store/mod.rs
@@ -18,6 +18,7 @@
#[cfg(feature = "hdfs")]
pub mod hdfs;
pub mod hybrid;
+pub mod local;
pub mod localfile;
pub mod mem;
pub mod memory;