This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 602611cc1 [#1414] feat(rust): introduce native hdfs client (#1415)
602611cc1 is described below
commit 602611cc1adff995ab23b5e3776b1600f258d095
Author: Junfan Zhang <[email protected]>
AuthorDate: Thu Jan 4 10:19:10 2024 +0800
[#1414] feat(rust): introduce native hdfs client (#1415)
### What changes were proposed in this pull request?
Introduce hdfs-native crate as the native integration.
### Why are the changes needed?
For: #1414
Native rust hdfs client could simplify the hadoop dependency and avoid JNI
invoking cost.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Internal tests on Hadoop 3.1.2
---
.github/workflows/rust.yml | 9 +-
rust/experimental/server/Cargo.lock | 302 ++++++++++++++++++-----------
rust/experimental/server/Cargo.toml | 8 +-
rust/experimental/server/README.md | 15 +-
rust/experimental/server/build.rs | 42 +---
rust/experimental/server/src/store/hdfs.rs | 119 ++++--------
6 files changed, 250 insertions(+), 245 deletions(-)
diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml
index e3d2273a3..29c60fcbd 100644
--- a/.github/workflows/rust.yml
+++ b/.github/workflows/rust.yml
@@ -31,11 +31,10 @@ jobs:
- "memory-prof"
runs-on: ubuntu-latest
steps:
- - name: Set up JDK 8
- uses: actions/setup-java@v3
- with:
- java-version: 8
- distribution: 'temurin'
+
+ - name: Install native libs
+ run: sudo apt-get install -y libkrb5-dev libgsasl-dev
+
- name: Install Protoc
uses: arduino/setup-protoc@v2
with:
diff --git a/rust/experimental/server/Cargo.lock
b/rust/experimental/server/Cargo.lock
index 6c0466225..94a281877 100644
--- a/rust/experimental/server/Cargo.lock
+++ b/rust/experimental/server/Cargo.lock
@@ -61,21 +61,6 @@ dependencies = [
"futures-core",
]
-[[package]]
-name = "async-lock"
-version = "2.8.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "287272293e9d8c41773cec55e365490fe034813a2f172f502d6ddcf75b2f582b"
-dependencies = [
- "event-listener",
-]
-
-[[package]]
-name = "async-task"
-version = "4.4.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "ecc7ab41815b3c653ccd2978ec3255c81349336702dfdf62ee6f7069b12a3aae"
-
[[package]]
name = "async-trait"
version = "0.1.73"
@@ -87,12 +72,6 @@ dependencies = [
"syn 2.0.37",
]
-[[package]]
-name = "atomic-waker"
-version = "1.1.1"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "1181e1e0d1fce796a03db1ae795d67167da795f9cf4a39c37589e85ef57f26d3"
-
[[package]]
name = "atty"
version = "0.2.14"
@@ -205,6 +184,28 @@ version = "0.21.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9ba43ea6f343b788c8764558649e08df62f86c6ef251fdaeb1ffd010a9ae50a2"
+[[package]]
+name = "bindgen"
+version = "0.64.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "c4243e6031260db77ede97ad86c27e501d646a27ab57b59a574f725d98ab1fb4"
+dependencies = [
+ "bitflags 1.3.2",
+ "cexpr",
+ "clang-sys",
+ "lazy_static",
+ "lazycell",
+ "log",
+ "peeking_take_while",
+ "proc-macro2",
+ "quote",
+ "regex",
+ "rustc-hash",
+ "shlex",
+ "syn 1.0.109",
+ "which",
+]
+
[[package]]
name = "bitflags"
version = "1.3.2"
@@ -226,21 +227,6 @@ dependencies = [
"generic-array",
]
-[[package]]
-name = "blocking"
-version = "1.3.1"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "77231a1c8f801696fc0123ec6150ce92cffb8e164a02afb9c8ddee0e9b65ad65"
-dependencies = [
- "async-channel",
- "async-lock",
- "async-task",
- "atomic-waker",
- "fastrand 1.9.0",
- "futures-lite",
- "log",
-]
-
[[package]]
name = "bumpalo"
version = "3.14.0"
@@ -292,12 +278,32 @@ dependencies = [
"libc",
]
+[[package]]
+name = "cexpr"
+version = "0.6.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "6fac387a98bb7c37292057cffc56d62ecb629900026402633ae9160df93a8766"
+dependencies = [
+ "nom",
+]
+
[[package]]
name = "cfg-if"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
+[[package]]
+name = "clang-sys"
+version = "1.7.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "67523a3b4be3ce1989d607a828d036249522dd9c1c8de7f4dd2dae43a37369d1"
+dependencies = [
+ "glob",
+ "libc",
+ "libloading",
+]
+
[[package]]
name = "clap"
version = "3.2.25"
@@ -465,6 +471,21 @@ dependencies = [
"libc",
]
+[[package]]
+name = "crc"
+version = "3.0.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "86ec7a15cbe22e59248fc7eadb1907dab5ba09372595da4d73dd805ed4417dfe"
+dependencies = [
+ "crc-catalog",
+]
+
+[[package]]
+name = "crc-catalog"
+version = "2.4.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "19d374276b40fb8bbdee95aef7c7fa6b5316ec764510eb64b8dd0e2ed0d7e7f5"
+
[[package]]
name = "crc32fast"
version = "1.3.2"
@@ -791,15 +812,6 @@ dependencies = [
"once_cell",
]
-[[package]]
-name = "fastrand"
-version = "1.9.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "e51093e27b0797c359783294ca4f0a911c270184cb10f85783b118614a1501be"
-dependencies = [
- "instant",
-]
-
[[package]]
name = "fastrand"
version = "2.0.0"
@@ -937,21 +949,6 @@ version = "0.3.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4fff74096e71ed47f8e023204cfd0aa1289cd54ae5430a9523be060cdb849964"
-[[package]]
-name = "futures-lite"
-version = "1.13.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "49a9d51ce47660b1e808d3c990b4709f2f415d928835a17dfd16991515c46bce"
-dependencies = [
- "fastrand 1.9.0",
- "futures-core",
- "futures-io",
- "memchr",
- "parking",
- "pin-project-lite",
- "waker-fn",
-]
-
[[package]]
name = "futures-macro"
version = "0.3.28"
@@ -1005,6 +1002,34 @@ dependencies = [
"pin-project 0.4.30",
]
+[[package]]
+name = "g2gen"
+version = "1.0.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "fc2c7625b2fc250dd90b63f7887a6bb0f7ec1d714c8278415bea2669ef20820e"
+dependencies = [
+ "g2poly",
+ "proc-macro2",
+ "quote",
+ "syn 1.0.109",
+]
+
+[[package]]
+name = "g2p"
+version = "1.0.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "fc36d9bdc3d2da057775a9f4fa7d7b09edab3e0eda7a92cc353358fa63b8519e"
+dependencies = [
+ "g2gen",
+ "g2poly",
+]
+
+[[package]]
+name = "g2poly"
+version = "1.0.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "af6a86e750338603ea2c14b1c0bfe58cd61f87ca67a0021d9334996024608e12"
+
[[package]]
name = "generic-array"
version = "0.14.7"
@@ -1070,13 +1095,29 @@ source =
"registry+https://github.com/rust-lang/crates.io-index"
checksum = "2c6201b9ff9fd90a5a3bac2e56a830d0caa509576f0e503818ee82c181b3437a"
[[package]]
-name = "hdfs-sys"
-version = "0.3.0"
+name = "hdfs-native"
+version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "33e2d5cefba2d51a26b44d2a493f963a32725a0f6593c91be4a610ad449c49cb"
+checksum = "824c00b5e1b0ba3aeb5678debcb78bf3a26d3d6473e7fd0f53d1310c988c1f02"
dependencies = [
- "cc",
- "java-locator",
+ "base64 0.21.4",
+ "bytes 1.5.0",
+ "crc",
+ "futures",
+ "g2p",
+ "libc",
+ "libgssapi",
+ "log",
+ "num-traits",
+ "prost",
+ "prost-types",
+ "roxmltree",
+ "socket2 0.5.4",
+ "thiserror",
+ "tokio",
+ "url",
+ "users",
+ "uuid",
]
[[package]]
@@ -1092,20 +1133,6 @@ dependencies = [
"num-traits",
]
-[[package]]
-name = "hdrs"
-version = "0.3.1"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "00a7b465f2d12e45db2af56af13a1cbfe1d5616d54355f5610b26c0961dec7b7"
-dependencies = [
- "blocking",
- "errno",
- "futures",
- "hdfs-sys",
- "libc",
- "log",
-]
-
[[package]]
name = "headers"
version = "0.3.9"
@@ -1321,15 +1348,6 @@ dependencies = [
"str_stack",
]
-[[package]]
-name = "instant"
-version = "0.1.12"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "7a5bbe824c507c5da5956355e86a746d82e0e1464f65d862cc5e71da70e94b2c"
-dependencies = [
- "cfg-if",
-]
-
[[package]]
name = "io-lifetimes"
version = "1.0.11"
@@ -1373,16 +1391,6 @@ version = "1.0.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "af150ab688ff2122fcef229be89cb50dd66af9e01a4ff320cc137eecc9bacc38"
-[[package]]
-name = "java-locator"
-version = "0.1.5"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "90003f2fd9c52f212c21d8520f1128da0080bad6fff16b68fe6e7f2f0c3780c2"
-dependencies = [
- "glob",
- "lazy_static",
-]
-
[[package]]
name = "js-sys"
version = "0.3.64"
@@ -1398,12 +1406,49 @@ version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
+[[package]]
+name = "lazycell"
+version = "1.3.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55"
+
[[package]]
name = "libc"
version = "0.2.148"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9cdc71e17332e86d2e1d38c1f99edcb6288ee11b815fb1a4b049eaa2114d369b"
+[[package]]
+name = "libgssapi"
+version = "0.6.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "9dcfb7f77cbefc242a46ea667491c4f1129712f563cd368623d3f1b261a90e5f"
+dependencies = [
+ "bitflags 2.4.0",
+ "bytes 1.5.0",
+ "lazy_static",
+ "libgssapi-sys",
+]
+
+[[package]]
+name = "libgssapi-sys"
+version = "0.3.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "efdcdd31923aa6280d41ff2636fd93a18cc60fe25983b24887d1a8d24478cbfb"
+dependencies = [
+ "bindgen",
+]
+
+[[package]]
+name = "libloading"
+version = "0.8.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "c571b676ddfc9a8c12f1f3d3085a7b163966a8fd8098a90640953ce5f6170161"
+dependencies = [
+ "cfg-if",
+ "windows-sys 0.48.0",
+]
+
[[package]]
name = "linux-raw-sys"
version = "0.1.4"
@@ -1665,12 +1710,6 @@ version = "3.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c1b04fb49957986fdce4d6ee7a65027d55d4b6d2265e5848bbb507b58ccfdb6f"
-[[package]]
-name = "parking"
-version = "2.1.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "14f2252c834a40ed9bb5422029649578e63aa341ac401f74e719dd1afda8394e"
-
[[package]]
name = "parking_lot"
version = "0.12.1"
@@ -1700,6 +1739,12 @@ version = "1.0.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "de3145af08024dea9fa9914f381a17b8fc6034dfb00f3a84013f7ff43f29ed4c"
+[[package]]
+name = "peeking_take_while"
+version = "0.1.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "19b17cddbe7ec3f8bc800887bab5e717348c95ea2ca0b1bf0837fb964dc67099"
+
[[package]]
name = "percent-encoding"
version = "2.3.0"
@@ -2261,12 +2306,27 @@ dependencies = [
"winapi",
]
+[[package]]
+name = "roxmltree"
+version = "0.18.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "862340e351ce1b271a378ec53f304a5558f7db87f3769dc655a8f6ecbb68b302"
+dependencies = [
+ "xmlparser",
+]
+
[[package]]
name = "rustc-demangle"
version = "0.1.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76"
+[[package]]
+name = "rustc-hash"
+version = "1.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2"
+
[[package]]
name = "rustix"
version = "0.36.15"
@@ -2480,6 +2540,12 @@ dependencies = [
"lazy_static",
]
+[[package]]
+name = "shlex"
+version = "1.2.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a7cee0529a6d40f580e7a5e6c495c8fbfe21b7b52795ed4bb5e62cdf92bc6380"
+
[[package]]
name = "signal-hook"
version = "0.3.17"
@@ -2664,7 +2730,7 @@ source =
"registry+https://github.com/rust-lang/crates.io-index"
checksum = "cb94d2f3cc536af71caac6b6fcebf65860b347e7ce0cc9ebe8f70d3e521054ef"
dependencies = [
"cfg-if",
- "fastrand 2.0.0",
+ "fastrand",
"redox_syscall 0.3.5",
"rustix 0.38.14",
"windows-sys 0.48.0",
@@ -3208,7 +3274,7 @@ dependencies = [
"env_logger",
"fs2",
"futures",
- "hdrs",
+ "hdfs-native",
"hyper",
"log",
"once_cell",
@@ -3260,11 +3326,23 @@ dependencies = [
"percent-encoding",
]
+[[package]]
+name = "users"
+version = "0.11.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "24cc0f6d6f267b73e5a2cadf007ba8f9bc39c6a6f9666f8cf25ea809a153b032"
+dependencies = [
+ "libc",
+]
+
[[package]]
name = "uuid"
version = "1.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "79daa5ed5740825c40b389c5e50312b9c86df53fccd33f281df655642b43869d"
+dependencies = [
+ "getrandom",
+]
[[package]]
name = "valuable"
@@ -3284,12 +3362,6 @@ version = "0.9.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f"
-[[package]]
-name = "waker-fn"
-version = "1.1.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "9d5b2c62b4012a3e1eca5a7e077d13b3bf498c4073e33ccd58626607748ceeca"
-
[[package]]
name = "want"
version = "0.3.1"
@@ -3584,3 +3656,9 @@ dependencies = [
"cfg-if",
"windows-sys 0.48.0",
]
+
+[[package]]
+name = "xmlparser"
+version = "0.13.6"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "66fee0b777b0f5ac1c69bb06d361268faafa61cd4682ae064a171c16c433e9e4"
diff --git a/rust/experimental/server/Cargo.toml
b/rust/experimental/server/Cargo.toml
index 8c1f97b39..acb082986 100644
--- a/rust/experimental/server/Cargo.toml
+++ b/rust/experimental/server/Cargo.toml
@@ -45,7 +45,7 @@ memory-prof = [
]
hdfs = [
- "dep:hdrs"
+ "dep:hdfs-native"
]
[dependencies]
@@ -98,10 +98,10 @@ socket2 = { version="0.4", features = ["all"]}
cap = "0.1.2"
spin = "0.9.8"
-[dependencies.hdrs]
-version = "0.3.0"
+[dependencies.hdfs-native]
+version = "0.5.0"
optional = true
-features = ["async_file"]
+features = ["kerberos"]
# jemalloc related optional dependencies
[dependencies.tikv-jemalloc-ctl]
diff --git a/rust/experimental/server/README.md
b/rust/experimental/server/README.md
index cc8d1ec8d..73d48e833 100644
--- a/rust/experimental/server/README.md
+++ b/rust/experimental/server/README.md
@@ -133,15 +133,16 @@ push_gateway_endpoint =
"http://xxxxxxxxxxxxxx/pushgateway"
### HDFS Setup
-```shell
-export JAVA_HOME=/path/to/java
-export LD_LIBRARY_PATH=${JAVA_HOME}/jre/lib/amd64/server:${LD_LIBRARY_PATH}
-
-export HADOOP_HOME=/path/to/hadoop
-export CLASSPATH=$(${HADOOP_HOME}/bin/hadoop classpath --glob)
+Benefit from the hdfs-native crate, there is no need to setup the JAVA_HOME
and relative dependencies.
+```shell
cargo build --features hdfs --release
-```
+```
+
+```shell
+# configure the kerberos and conf env
+HADOOP_CONF_DIR=/etc/hadoop/conf KRB5_CONFIG=/etc/krb5.conf
KRB5CCNAME=/tmp/krb5cc_2002 LOG=info ./uniffle-worker
+```
## Profiling
diff --git a/rust/experimental/server/build.rs
b/rust/experimental/server/build.rs
index 2432c7328..74195cf8c 100644
--- a/rust/experimental/server/build.rs
+++ b/rust/experimental/server/build.rs
@@ -15,8 +15,8 @@
// specific language governing permissions and limitations
// under the License.
+use std::fs;
use std::path::Path;
-use std::{env, fs};
fn main() -> Result<(), Box<dyn std::error::Error>> {
// generate the uniffle code for grpc server
@@ -31,11 +31,6 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
// rename the generated filename to uniffle.rs
rename_file("src/proto/rss.common.rs", "src/proto/uniffle.rs");
- // only setup ld library path in debug mode
- let profile = std::env::var("PROFILE").unwrap();
- if profile == "debug" {
- setup_ld_library_path();
- }
Ok(())
}
@@ -46,38 +41,3 @@ fn rename_file(file_path: impl AsRef<Path>, renamed_path:
impl AsRef<Path>) {
}
fs::rename(&f, renamed_path).expect("Errors on renaming file.");
}
-
-fn setup_ld_library_path() {
- // java_home is required now to build and test
- let java_home = env::var("JAVA_HOME").expect("JAVA_HOME must be set");
- let possible_lib_paths = vec![
- format!("{java_home}/jre/lib/amd64/server/"),
- format!("{java_home}/lib/server"),
- format!("{java_home}/jre/lib/server"),
- format!("{java_home}/jre/lib/amd64/server"),
- ];
- let lib_jvm_path = possible_lib_paths
- .iter()
- .find(|&path| {
- let path = Path::new(&path);
- path.exists()
- })
- .expect("java_home is not valid");
- match env::consts::OS {
- "linux" => {
- let ld_path =
env::var_os("LD_LIBRARY_PATH").unwrap_or("".parse().unwrap());
- let ld_path = format!("{}:{}", ld_path.to_str().unwrap(),
lib_jvm_path);
- // this might be anti-pattern, but it works for our current setup
- println!("cargo:rustc-env=LD_LIBRARY_PATH={}", ld_path);
- }
- "macos" => {
- let ld_path =
env::var_os("DYLD_LIBRARY_PATH").unwrap_or("".parse().unwrap());
- let ld_path = format!("{}:{}", ld_path.to_str().unwrap(),
lib_jvm_path);
- // this might be anti-pattern, but it works for our current setup
- println!("cargo:rustc-env=DYLD_LIBRARY_PATH={}", ld_path);
- }
- _ => {
- // do nothing
- }
- }
-}
diff --git a/rust/experimental/server/src/store/hdfs.rs
b/rust/experimental/server/src/store/hdfs.rs
index c36af4732..78d903543 100644
--- a/rust/experimental/server/src/store/hdfs.rs
+++ b/rust/experimental/server/src/store/hdfs.rs
@@ -31,20 +31,16 @@ use await_tree::InstrumentAwait;
use bytes::{BufMut, Bytes, BytesMut};
use dashmap::DashMap;
-use futures::AsyncWriteExt;
-use hdrs::{Client, ClientBuilder};
-use log::{error, info};
+use log::info;
use std::path::Path;
+use hdfs_native::{Client, WriteOptions};
use std::sync::Arc;
-use std::{env, io};
use tokio::sync::{Mutex, Semaphore};
use tracing::debug;
-use url::Url;
-
struct PartitionCachedMeta {
is_file_created: bool,
data_len: i64,
@@ -67,7 +63,7 @@ impl Default for PartitionCachedMeta {
pub struct HdfsStore {
root: String,
- filesystem: Box<Hdrs>,
+ filesystem: Box<HdfsNativeClient>,
concurrency_access_limiter: Semaphore,
partition_file_locks: DashMap<String, Arc<Mutex<()>>>,
@@ -81,24 +77,11 @@ impl Persistent for HdfsStore {}
impl HdfsStore {
pub fn from(conf: HdfsStoreConfig) -> Self {
let data_path = conf.data_path;
- let data_url = Url::parse(data_path.as_str()).unwrap();
-
- let name_node = match data_url.host_str() {
- Some(host) => format!("{}://{}", data_url.scheme(), host),
- _ => "default".to_string(),
- };
- let krb5_cache = env::var("KRB5CACHE_PATH").map_or(None, |v| Some(v));
- let hdfs_user = env::var("HDFS_USER").map_or(None, |v| Some(v));
- let fs = Hdrs::new(name_node.as_str(), krb5_cache, hdfs_user);
- if fs.is_err() {
- error!("Errors on connecting the hdfs. error: {:?}", fs.err());
- panic!();
- }
- let filesystem = fs.unwrap();
+ let filesystem = HdfsNativeClient::new();
HdfsStore {
- root: data_url.to_string(),
+ root: data_path,
filesystem: Box::new(filesystem),
partition_file_locks: DashMap::new(),
concurrency_access_limiter:
Semaphore::new(conf.max_concurrency.unwrap_or(1) as usize),
@@ -110,6 +93,11 @@ impl HdfsStore {
format!("{}/{}/", &self.root, app_id)
}
+ /// the dir created with app_id/shuffle_id
+ fn get_shuffle_dir(&self, app_id: &str, shuffle_id: i32) -> String {
+ format!("{}/{}/{}/", &self.root, app_id, shuffle_id)
+ }
+
fn get_file_path_by_uid(&self, uid: &PartitionedUId) -> (String, String) {
let app_id = &uid.app_id;
let shuffle_id = &uid.shuffle_id;
@@ -252,12 +240,16 @@ impl Store for HdfsStore {
async fn purge(&self, ctx: PurgeDataContext) -> Result<()> {
let app_id = ctx.app_id;
- let app_dir = self.get_app_dir(app_id.as_str());
+
+ let dir = match ctx.shuffle_id {
+ Some(shuffle_id) => self.get_shuffle_dir(app_id.as_str(),
shuffle_id),
+ _ => self.get_app_dir(app_id.as_str()),
+ };
let keys_to_delete: Vec<_> = self
.partition_file_locks
.iter()
- .filter(|entry| entry.key().contains(app_dir.as_str()))
+ .filter(|entry| entry.key().starts_with(dir.as_str()))
.map(|entry| entry.key().to_string())
.collect();
@@ -266,11 +258,11 @@ impl Store for HdfsStore {
self.partition_cached_meta.remove(&deleted_key);
}
- info!("The hdfs data for {} has been deleted", &app_dir);
- self.filesystem.delete_dir(app_dir.as_str()).await
+ info!("The hdfs data for {} has been deleted", &dir);
+ self.filesystem.delete_dir(dir.as_str()).await
}
- async fn is_healthy(&self) -> anyhow::Result<bool> {
+ async fn is_healthy(&self) -> Result<bool> {
Ok(true)
}
}
@@ -285,73 +277,48 @@ trait HdfsDelegator {
async fn delete_dir(&self, dir: &str) -> Result<()>;
}
-struct Hdrs {
+struct HdfsNativeClient {
client: Client,
}
-#[async_trait]
-impl HdfsDelegator for Hdrs {
- async fn touch(&self, file_path: &str) -> Result<()> {
- let metadata = self.client.metadata(file_path);
- if metadata.is_err() && metadata.unwrap_err().kind() ==
io::ErrorKind::NotFound {
- debug!("Creating the file, path: {}", file_path);
- let mut write = self
- .client
- .open_file()
- .create(true)
- .write(true)
- .async_open(file_path)
- .await?;
- write.write("".as_bytes()).await?;
- write.flush().await?;
- write.close().await?;
- debug!("the file: {} is created!", file_path);
- }
- Ok(())
+impl HdfsNativeClient {
+ fn new() -> Self {
+ let client = Client::default();
+ Self { client }
}
+}
- async fn append(&self, file_path: &str, data: Bytes) -> Result<()> {
- let mut data_writer = self
- .client
- .open_file()
- .create(true)
- .append(true)
- .async_open(file_path)
+#[async_trait]
+impl HdfsDelegator for HdfsNativeClient {
+ async fn touch(&self, file_path: &str) -> Result<()> {
+ self.client
+ .create(file_path, WriteOptions::default())
+ .await?
+ .close()
.await?;
- data_writer.write_all(data.as_ref()).await?;
- data_writer.flush().await?;
- data_writer.close().await?;
- debug!("data has been flushed. path: {}", file_path);
Ok(())
}
- async fn create_dir(&self, dir: &str) -> Result<()> {
- self.client.create_dir(dir)?;
+ async fn append(&self, file_path: &str, data: Bytes) -> Result<()> {
+ let mut file_writer = self.client.append(file_path).await?;
+ file_writer.write(data).await?;
+ file_writer.close().await?;
Ok(())
}
async fn len(&self, file_path: &str) -> Result<u64> {
- let meta = self.client.metadata(file_path)?;
- Ok(meta.len())
+ let file_info = self.client.get_file_info(file_path).await?;
+ Ok(file_info.length as u64)
}
- async fn delete_dir(&self, dir: &str) -> Result<()> {
- self.client.remove_dir_all(dir)?;
+ async fn create_dir(&self, dir: &str) -> Result<()> {
+ let _ = self.client.mkdirs(dir, 777, true).await?;
Ok(())
}
-}
-impl Hdrs {
- fn new(name_node: &str, krb5_cache: Option<String>, user: Option<String>)
-> Result<Self> {
- let mut builder = ClientBuilder::new(name_node);
- if krb5_cache.is_some() {
- builder =
builder.with_kerberos_ticket_cache_path(krb5_cache.unwrap().as_str());
- }
- if user.is_some() {
- builder = builder.with_user(user.unwrap().as_str())
- }
- let client = builder.connect()?;
- Ok(Hdrs { client })
+ async fn delete_dir(&self, dir: &str) -> Result<()> {
+ self.client.delete(dir, true).await?;
+ Ok(())
}
}