This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 602611cc1 [#1414] feat(rust): introduce native hdfs client (#1415)
602611cc1 is described below

commit 602611cc1adff995ab23b5e3776b1600f258d095
Author: Junfan Zhang <[email protected]>
AuthorDate: Thu Jan 4 10:19:10 2024 +0800

    [#1414] feat(rust): introduce native hdfs client (#1415)
    
    ### What changes were proposed in this pull request?
    
    Introduce hdfs-native crate as the native integration.
    
    ### Why are the changes needed?
    
    For: #1414
    
    Native rust hdfs client could simplify the hadoop dependency and avoid JNI 
invoking cost.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Internal tests on Hadoop 3.1.2
---
 .github/workflows/rust.yml                 |   9 +-
 rust/experimental/server/Cargo.lock        | 302 ++++++++++++++++++-----------
 rust/experimental/server/Cargo.toml        |   8 +-
 rust/experimental/server/README.md         |  15 +-
 rust/experimental/server/build.rs          |  42 +---
 rust/experimental/server/src/store/hdfs.rs | 119 ++++--------
 6 files changed, 250 insertions(+), 245 deletions(-)

diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml
index e3d2273a3..29c60fcbd 100644
--- a/.github/workflows/rust.yml
+++ b/.github/workflows/rust.yml
@@ -31,11 +31,10 @@ jobs:
           - "memory-prof"
     runs-on: ubuntu-latest
     steps:
-      - name: Set up JDK 8
-        uses: actions/setup-java@v3
-        with:
-          java-version: 8
-          distribution: 'temurin'
+
+      - name: Install native libs
+        run: sudo apt-get install -y libkrb5-dev libgsasl-dev
+        
       - name: Install Protoc
         uses: arduino/setup-protoc@v2
         with:
diff --git a/rust/experimental/server/Cargo.lock 
b/rust/experimental/server/Cargo.lock
index 6c0466225..94a281877 100644
--- a/rust/experimental/server/Cargo.lock
+++ b/rust/experimental/server/Cargo.lock
@@ -61,21 +61,6 @@ dependencies = [
  "futures-core",
 ]
 
-[[package]]
-name = "async-lock"
-version = "2.8.0"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "287272293e9d8c41773cec55e365490fe034813a2f172f502d6ddcf75b2f582b"
-dependencies = [
- "event-listener",
-]
-
-[[package]]
-name = "async-task"
-version = "4.4.0"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "ecc7ab41815b3c653ccd2978ec3255c81349336702dfdf62ee6f7069b12a3aae"
-
 [[package]]
 name = "async-trait"
 version = "0.1.73"
@@ -87,12 +72,6 @@ dependencies = [
  "syn 2.0.37",
 ]
 
-[[package]]
-name = "atomic-waker"
-version = "1.1.1"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "1181e1e0d1fce796a03db1ae795d67167da795f9cf4a39c37589e85ef57f26d3"
-
 [[package]]
 name = "atty"
 version = "0.2.14"
@@ -205,6 +184,28 @@ version = "0.21.4"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "9ba43ea6f343b788c8764558649e08df62f86c6ef251fdaeb1ffd010a9ae50a2"
 
+[[package]]
+name = "bindgen"
+version = "0.64.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "c4243e6031260db77ede97ad86c27e501d646a27ab57b59a574f725d98ab1fb4"
+dependencies = [
+ "bitflags 1.3.2",
+ "cexpr",
+ "clang-sys",
+ "lazy_static",
+ "lazycell",
+ "log",
+ "peeking_take_while",
+ "proc-macro2",
+ "quote",
+ "regex",
+ "rustc-hash",
+ "shlex",
+ "syn 1.0.109",
+ "which",
+]
+
 [[package]]
 name = "bitflags"
 version = "1.3.2"
@@ -226,21 +227,6 @@ dependencies = [
  "generic-array",
 ]
 
-[[package]]
-name = "blocking"
-version = "1.3.1"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "77231a1c8f801696fc0123ec6150ce92cffb8e164a02afb9c8ddee0e9b65ad65"
-dependencies = [
- "async-channel",
- "async-lock",
- "async-task",
- "atomic-waker",
- "fastrand 1.9.0",
- "futures-lite",
- "log",
-]
-
 [[package]]
 name = "bumpalo"
 version = "3.14.0"
@@ -292,12 +278,32 @@ dependencies = [
  "libc",
 ]
 
+[[package]]
+name = "cexpr"
+version = "0.6.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "6fac387a98bb7c37292057cffc56d62ecb629900026402633ae9160df93a8766"
+dependencies = [
+ "nom",
+]
+
 [[package]]
 name = "cfg-if"
 version = "1.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
 
+[[package]]
+name = "clang-sys"
+version = "1.7.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "67523a3b4be3ce1989d607a828d036249522dd9c1c8de7f4dd2dae43a37369d1"
+dependencies = [
+ "glob",
+ "libc",
+ "libloading",
+]
+
 [[package]]
 name = "clap"
 version = "3.2.25"
@@ -465,6 +471,21 @@ dependencies = [
  "libc",
 ]
 
+[[package]]
+name = "crc"
+version = "3.0.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "86ec7a15cbe22e59248fc7eadb1907dab5ba09372595da4d73dd805ed4417dfe"
+dependencies = [
+ "crc-catalog",
+]
+
+[[package]]
+name = "crc-catalog"
+version = "2.4.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "19d374276b40fb8bbdee95aef7c7fa6b5316ec764510eb64b8dd0e2ed0d7e7f5"
+
 [[package]]
 name = "crc32fast"
 version = "1.3.2"
@@ -791,15 +812,6 @@ dependencies = [
  "once_cell",
 ]
 
-[[package]]
-name = "fastrand"
-version = "1.9.0"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "e51093e27b0797c359783294ca4f0a911c270184cb10f85783b118614a1501be"
-dependencies = [
- "instant",
-]
-
 [[package]]
 name = "fastrand"
 version = "2.0.0"
@@ -937,21 +949,6 @@ version = "0.3.28"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "4fff74096e71ed47f8e023204cfd0aa1289cd54ae5430a9523be060cdb849964"
 
-[[package]]
-name = "futures-lite"
-version = "1.13.0"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "49a9d51ce47660b1e808d3c990b4709f2f415d928835a17dfd16991515c46bce"
-dependencies = [
- "fastrand 1.9.0",
- "futures-core",
- "futures-io",
- "memchr",
- "parking",
- "pin-project-lite",
- "waker-fn",
-]
-
 [[package]]
 name = "futures-macro"
 version = "0.3.28"
@@ -1005,6 +1002,34 @@ dependencies = [
  "pin-project 0.4.30",
 ]
 
+[[package]]
+name = "g2gen"
+version = "1.0.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "fc2c7625b2fc250dd90b63f7887a6bb0f7ec1d714c8278415bea2669ef20820e"
+dependencies = [
+ "g2poly",
+ "proc-macro2",
+ "quote",
+ "syn 1.0.109",
+]
+
+[[package]]
+name = "g2p"
+version = "1.0.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "fc36d9bdc3d2da057775a9f4fa7d7b09edab3e0eda7a92cc353358fa63b8519e"
+dependencies = [
+ "g2gen",
+ "g2poly",
+]
+
+[[package]]
+name = "g2poly"
+version = "1.0.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "af6a86e750338603ea2c14b1c0bfe58cd61f87ca67a0021d9334996024608e12"
+
 [[package]]
 name = "generic-array"
 version = "0.14.7"
@@ -1070,13 +1095,29 @@ source = 
"registry+https://github.com/rust-lang/crates.io-index";
 checksum = "2c6201b9ff9fd90a5a3bac2e56a830d0caa509576f0e503818ee82c181b3437a"
 
 [[package]]
-name = "hdfs-sys"
-version = "0.3.0"
+name = "hdfs-native"
+version = "0.5.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "33e2d5cefba2d51a26b44d2a493f963a32725a0f6593c91be4a610ad449c49cb"
+checksum = "824c00b5e1b0ba3aeb5678debcb78bf3a26d3d6473e7fd0f53d1310c988c1f02"
 dependencies = [
- "cc",
- "java-locator",
+ "base64 0.21.4",
+ "bytes 1.5.0",
+ "crc",
+ "futures",
+ "g2p",
+ "libc",
+ "libgssapi",
+ "log",
+ "num-traits",
+ "prost",
+ "prost-types",
+ "roxmltree",
+ "socket2 0.5.4",
+ "thiserror",
+ "tokio",
+ "url",
+ "users",
+ "uuid",
 ]
 
 [[package]]
@@ -1092,20 +1133,6 @@ dependencies = [
  "num-traits",
 ]
 
-[[package]]
-name = "hdrs"
-version = "0.3.1"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "00a7b465f2d12e45db2af56af13a1cbfe1d5616d54355f5610b26c0961dec7b7"
-dependencies = [
- "blocking",
- "errno",
- "futures",
- "hdfs-sys",
- "libc",
- "log",
-]
-
 [[package]]
 name = "headers"
 version = "0.3.9"
@@ -1321,15 +1348,6 @@ dependencies = [
  "str_stack",
 ]
 
-[[package]]
-name = "instant"
-version = "0.1.12"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "7a5bbe824c507c5da5956355e86a746d82e0e1464f65d862cc5e71da70e94b2c"
-dependencies = [
- "cfg-if",
-]
-
 [[package]]
 name = "io-lifetimes"
 version = "1.0.11"
@@ -1373,16 +1391,6 @@ version = "1.0.9"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "af150ab688ff2122fcef229be89cb50dd66af9e01a4ff320cc137eecc9bacc38"
 
-[[package]]
-name = "java-locator"
-version = "0.1.5"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "90003f2fd9c52f212c21d8520f1128da0080bad6fff16b68fe6e7f2f0c3780c2"
-dependencies = [
- "glob",
- "lazy_static",
-]
-
 [[package]]
 name = "js-sys"
 version = "0.3.64"
@@ -1398,12 +1406,49 @@ version = "1.4.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
 
+[[package]]
+name = "lazycell"
+version = "1.3.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55"
+
 [[package]]
 name = "libc"
 version = "0.2.148"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "9cdc71e17332e86d2e1d38c1f99edcb6288ee11b815fb1a4b049eaa2114d369b"
 
+[[package]]
+name = "libgssapi"
+version = "0.6.4"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "9dcfb7f77cbefc242a46ea667491c4f1129712f563cd368623d3f1b261a90e5f"
+dependencies = [
+ "bitflags 2.4.0",
+ "bytes 1.5.0",
+ "lazy_static",
+ "libgssapi-sys",
+]
+
+[[package]]
+name = "libgssapi-sys"
+version = "0.3.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "efdcdd31923aa6280d41ff2636fd93a18cc60fe25983b24887d1a8d24478cbfb"
+dependencies = [
+ "bindgen",
+]
+
+[[package]]
+name = "libloading"
+version = "0.8.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "c571b676ddfc9a8c12f1f3d3085a7b163966a8fd8098a90640953ce5f6170161"
+dependencies = [
+ "cfg-if",
+ "windows-sys 0.48.0",
+]
+
 [[package]]
 name = "linux-raw-sys"
 version = "0.1.4"
@@ -1665,12 +1710,6 @@ version = "3.5.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "c1b04fb49957986fdce4d6ee7a65027d55d4b6d2265e5848bbb507b58ccfdb6f"
 
-[[package]]
-name = "parking"
-version = "2.1.0"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "14f2252c834a40ed9bb5422029649578e63aa341ac401f74e719dd1afda8394e"
-
 [[package]]
 name = "parking_lot"
 version = "0.12.1"
@@ -1700,6 +1739,12 @@ version = "1.0.14"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "de3145af08024dea9fa9914f381a17b8fc6034dfb00f3a84013f7ff43f29ed4c"
 
+[[package]]
+name = "peeking_take_while"
+version = "0.1.2"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "19b17cddbe7ec3f8bc800887bab5e717348c95ea2ca0b1bf0837fb964dc67099"
+
 [[package]]
 name = "percent-encoding"
 version = "2.3.0"
@@ -2261,12 +2306,27 @@ dependencies = [
  "winapi",
 ]
 
+[[package]]
+name = "roxmltree"
+version = "0.18.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "862340e351ce1b271a378ec53f304a5558f7db87f3769dc655a8f6ecbb68b302"
+dependencies = [
+ "xmlparser",
+]
+
 [[package]]
 name = "rustc-demangle"
 version = "0.1.23"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76"
 
+[[package]]
+name = "rustc-hash"
+version = "1.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2"
+
 [[package]]
 name = "rustix"
 version = "0.36.15"
@@ -2480,6 +2540,12 @@ dependencies = [
  "lazy_static",
 ]
 
+[[package]]
+name = "shlex"
+version = "1.2.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "a7cee0529a6d40f580e7a5e6c495c8fbfe21b7b52795ed4bb5e62cdf92bc6380"
+
 [[package]]
 name = "signal-hook"
 version = "0.3.17"
@@ -2664,7 +2730,7 @@ source = 
"registry+https://github.com/rust-lang/crates.io-index";
 checksum = "cb94d2f3cc536af71caac6b6fcebf65860b347e7ce0cc9ebe8f70d3e521054ef"
 dependencies = [
  "cfg-if",
- "fastrand 2.0.0",
+ "fastrand",
  "redox_syscall 0.3.5",
  "rustix 0.38.14",
  "windows-sys 0.48.0",
@@ -3208,7 +3274,7 @@ dependencies = [
  "env_logger",
  "fs2",
  "futures",
- "hdrs",
+ "hdfs-native",
  "hyper",
  "log",
  "once_cell",
@@ -3260,11 +3326,23 @@ dependencies = [
  "percent-encoding",
 ]
 
+[[package]]
+name = "users"
+version = "0.11.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "24cc0f6d6f267b73e5a2cadf007ba8f9bc39c6a6f9666f8cf25ea809a153b032"
+dependencies = [
+ "libc",
+]
+
 [[package]]
 name = "uuid"
 version = "1.4.1"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "79daa5ed5740825c40b389c5e50312b9c86df53fccd33f281df655642b43869d"
+dependencies = [
+ "getrandom",
+]
 
 [[package]]
 name = "valuable"
@@ -3284,12 +3362,6 @@ version = "0.9.4"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f"
 
-[[package]]
-name = "waker-fn"
-version = "1.1.0"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "9d5b2c62b4012a3e1eca5a7e077d13b3bf498c4073e33ccd58626607748ceeca"
-
 [[package]]
 name = "want"
 version = "0.3.1"
@@ -3584,3 +3656,9 @@ dependencies = [
  "cfg-if",
  "windows-sys 0.48.0",
 ]
+
+[[package]]
+name = "xmlparser"
+version = "0.13.6"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "66fee0b777b0f5ac1c69bb06d361268faafa61cd4682ae064a171c16c433e9e4"
diff --git a/rust/experimental/server/Cargo.toml 
b/rust/experimental/server/Cargo.toml
index 8c1f97b39..acb082986 100644
--- a/rust/experimental/server/Cargo.toml
+++ b/rust/experimental/server/Cargo.toml
@@ -45,7 +45,7 @@ memory-prof = [
 ]
 
 hdfs = [
-    "dep:hdrs"
+    "dep:hdfs-native"
 ]
 
 [dependencies]
@@ -98,10 +98,10 @@ socket2 = { version="0.4", features = ["all"]}
 cap = "0.1.2"
 spin = "0.9.8"
 
-[dependencies.hdrs]
-version = "0.3.0"
+[dependencies.hdfs-native]
+version = "0.5.0"
 optional = true
-features = ["async_file"]
+features = ["kerberos"]
 
 # jemalloc related optional dependencies
 [dependencies.tikv-jemalloc-ctl]
diff --git a/rust/experimental/server/README.md 
b/rust/experimental/server/README.md
index cc8d1ec8d..73d48e833 100644
--- a/rust/experimental/server/README.md
+++ b/rust/experimental/server/README.md
@@ -133,15 +133,16 @@ push_gateway_endpoint = 
"http://xxxxxxxxxxxxxx/pushgateway";
 
 ### HDFS Setup 
 
-```shell
-export JAVA_HOME=/path/to/java
-export LD_LIBRARY_PATH=${JAVA_HOME}/jre/lib/amd64/server:${LD_LIBRARY_PATH}
-
-export HADOOP_HOME=/path/to/hadoop
-export CLASSPATH=$(${HADOOP_HOME}/bin/hadoop classpath --glob)
+Benefit from the hdfs-native crate, there is no need to setup the JAVA_HOME 
and relative dependencies.
 
+```shell
 cargo build --features hdfs --release
-``` 
+```
+
+```shell
+# configure the kerberos and conf env
+HADOOP_CONF_DIR=/etc/hadoop/conf KRB5_CONFIG=/etc/krb5.conf 
KRB5CCNAME=/tmp/krb5cc_2002 LOG=info ./uniffle-worker
+```
 
 ## Profiling
 
diff --git a/rust/experimental/server/build.rs 
b/rust/experimental/server/build.rs
index 2432c7328..74195cf8c 100644
--- a/rust/experimental/server/build.rs
+++ b/rust/experimental/server/build.rs
@@ -15,8 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use std::fs;
 use std::path::Path;
-use std::{env, fs};
 
 fn main() -> Result<(), Box<dyn std::error::Error>> {
     // generate the uniffle code for grpc server
@@ -31,11 +31,6 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
     // rename the generated filename to uniffle.rs
     rename_file("src/proto/rss.common.rs", "src/proto/uniffle.rs");
 
-    // only setup ld library path in debug mode
-    let profile = std::env::var("PROFILE").unwrap();
-    if profile == "debug" {
-        setup_ld_library_path();
-    }
     Ok(())
 }
 
@@ -46,38 +41,3 @@ fn rename_file(file_path: impl AsRef<Path>, renamed_path: 
impl AsRef<Path>) {
     }
     fs::rename(&f, renamed_path).expect("Errors on renaming file.");
 }
-
-fn setup_ld_library_path() {
-    // java_home is required now to build and test
-    let java_home = env::var("JAVA_HOME").expect("JAVA_HOME must be set");
-    let possible_lib_paths = vec![
-        format!("{java_home}/jre/lib/amd64/server/"),
-        format!("{java_home}/lib/server"),
-        format!("{java_home}/jre/lib/server"),
-        format!("{java_home}/jre/lib/amd64/server"),
-    ];
-    let lib_jvm_path = possible_lib_paths
-        .iter()
-        .find(|&path| {
-            let path = Path::new(&path);
-            path.exists()
-        })
-        .expect("java_home is not valid");
-    match env::consts::OS {
-        "linux" => {
-            let ld_path = 
env::var_os("LD_LIBRARY_PATH").unwrap_or("".parse().unwrap());
-            let ld_path = format!("{}:{}", ld_path.to_str().unwrap(), 
lib_jvm_path);
-            // this might be anti-pattern, but it works for our current setup
-            println!("cargo:rustc-env=LD_LIBRARY_PATH={}", ld_path);
-        }
-        "macos" => {
-            let ld_path = 
env::var_os("DYLD_LIBRARY_PATH").unwrap_or("".parse().unwrap());
-            let ld_path = format!("{}:{}", ld_path.to_str().unwrap(), 
lib_jvm_path);
-            // this might be anti-pattern, but it works for our current setup
-            println!("cargo:rustc-env=DYLD_LIBRARY_PATH={}", ld_path);
-        }
-        _ => {
-            // do nothing
-        }
-    }
-}
diff --git a/rust/experimental/server/src/store/hdfs.rs 
b/rust/experimental/server/src/store/hdfs.rs
index c36af4732..78d903543 100644
--- a/rust/experimental/server/src/store/hdfs.rs
+++ b/rust/experimental/server/src/store/hdfs.rs
@@ -31,20 +31,16 @@ use await_tree::InstrumentAwait;
 use bytes::{BufMut, Bytes, BytesMut};
 use dashmap::DashMap;
 
-use futures::AsyncWriteExt;
-use hdrs::{Client, ClientBuilder};
-use log::{error, info};
+use log::info;
 
 use std::path::Path;
 
+use hdfs_native::{Client, WriteOptions};
 use std::sync::Arc;
-use std::{env, io};
 use tokio::sync::{Mutex, Semaphore};
 
 use tracing::debug;
 
-use url::Url;
-
 struct PartitionCachedMeta {
     is_file_created: bool,
     data_len: i64,
@@ -67,7 +63,7 @@ impl Default for PartitionCachedMeta {
 
 pub struct HdfsStore {
     root: String,
-    filesystem: Box<Hdrs>,
+    filesystem: Box<HdfsNativeClient>,
     concurrency_access_limiter: Semaphore,
 
     partition_file_locks: DashMap<String, Arc<Mutex<()>>>,
@@ -81,24 +77,11 @@ impl Persistent for HdfsStore {}
 impl HdfsStore {
     pub fn from(conf: HdfsStoreConfig) -> Self {
         let data_path = conf.data_path;
-        let data_url = Url::parse(data_path.as_str()).unwrap();
-
-        let name_node = match data_url.host_str() {
-            Some(host) => format!("{}://{}", data_url.scheme(), host),
-            _ => "default".to_string(),
-        };
-        let krb5_cache = env::var("KRB5CACHE_PATH").map_or(None, |v| Some(v));
-        let hdfs_user = env::var("HDFS_USER").map_or(None, |v| Some(v));
 
-        let fs = Hdrs::new(name_node.as_str(), krb5_cache, hdfs_user);
-        if fs.is_err() {
-            error!("Errors on connecting the hdfs. error: {:?}", fs.err());
-            panic!();
-        }
-        let filesystem = fs.unwrap();
+        let filesystem = HdfsNativeClient::new();
 
         HdfsStore {
-            root: data_url.to_string(),
+            root: data_path,
             filesystem: Box::new(filesystem),
             partition_file_locks: DashMap::new(),
             concurrency_access_limiter: 
Semaphore::new(conf.max_concurrency.unwrap_or(1) as usize),
@@ -110,6 +93,11 @@ impl HdfsStore {
         format!("{}/{}/", &self.root, app_id)
     }
 
+    /// the dir created with app_id/shuffle_id
+    fn get_shuffle_dir(&self, app_id: &str, shuffle_id: i32) -> String {
+        format!("{}/{}/{}/", &self.root, app_id, shuffle_id)
+    }
+
     fn get_file_path_by_uid(&self, uid: &PartitionedUId) -> (String, String) {
         let app_id = &uid.app_id;
         let shuffle_id = &uid.shuffle_id;
@@ -252,12 +240,16 @@ impl Store for HdfsStore {
 
     async fn purge(&self, ctx: PurgeDataContext) -> Result<()> {
         let app_id = ctx.app_id;
-        let app_dir = self.get_app_dir(app_id.as_str());
+
+        let dir = match ctx.shuffle_id {
+            Some(shuffle_id) => self.get_shuffle_dir(app_id.as_str(), 
shuffle_id),
+            _ => self.get_app_dir(app_id.as_str()),
+        };
 
         let keys_to_delete: Vec<_> = self
             .partition_file_locks
             .iter()
-            .filter(|entry| entry.key().contains(app_dir.as_str()))
+            .filter(|entry| entry.key().starts_with(dir.as_str()))
             .map(|entry| entry.key().to_string())
             .collect();
 
@@ -266,11 +258,11 @@ impl Store for HdfsStore {
             self.partition_cached_meta.remove(&deleted_key);
         }
 
-        info!("The hdfs data for {} has been deleted", &app_dir);
-        self.filesystem.delete_dir(app_dir.as_str()).await
+        info!("The hdfs data for {} has been deleted", &dir);
+        self.filesystem.delete_dir(dir.as_str()).await
     }
 
-    async fn is_healthy(&self) -> anyhow::Result<bool> {
+    async fn is_healthy(&self) -> Result<bool> {
         Ok(true)
     }
 }
@@ -285,73 +277,48 @@ trait HdfsDelegator {
     async fn delete_dir(&self, dir: &str) -> Result<()>;
 }
 
-struct Hdrs {
+struct HdfsNativeClient {
     client: Client,
 }
 
-#[async_trait]
-impl HdfsDelegator for Hdrs {
-    async fn touch(&self, file_path: &str) -> Result<()> {
-        let metadata = self.client.metadata(file_path);
-        if metadata.is_err() && metadata.unwrap_err().kind() == 
io::ErrorKind::NotFound {
-            debug!("Creating the file, path: {}", file_path);
-            let mut write = self
-                .client
-                .open_file()
-                .create(true)
-                .write(true)
-                .async_open(file_path)
-                .await?;
-            write.write("".as_bytes()).await?;
-            write.flush().await?;
-            write.close().await?;
-            debug!("the file: {} is created!", file_path);
-        }
-        Ok(())
+impl HdfsNativeClient {
+    fn new() -> Self {
+        let client = Client::default();
+        Self { client }
     }
+}
 
-    async fn append(&self, file_path: &str, data: Bytes) -> Result<()> {
-        let mut data_writer = self
-            .client
-            .open_file()
-            .create(true)
-            .append(true)
-            .async_open(file_path)
+#[async_trait]
+impl HdfsDelegator for HdfsNativeClient {
+    async fn touch(&self, file_path: &str) -> Result<()> {
+        self.client
+            .create(file_path, WriteOptions::default())
+            .await?
+            .close()
             .await?;
-        data_writer.write_all(data.as_ref()).await?;
-        data_writer.flush().await?;
-        data_writer.close().await?;
-        debug!("data has been flushed. path: {}", file_path);
         Ok(())
     }
 
-    async fn create_dir(&self, dir: &str) -> Result<()> {
-        self.client.create_dir(dir)?;
+    async fn append(&self, file_path: &str, data: Bytes) -> Result<()> {
+        let mut file_writer = self.client.append(file_path).await?;
+        file_writer.write(data).await?;
+        file_writer.close().await?;
         Ok(())
     }
 
     async fn len(&self, file_path: &str) -> Result<u64> {
-        let meta = self.client.metadata(file_path)?;
-        Ok(meta.len())
+        let file_info = self.client.get_file_info(file_path).await?;
+        Ok(file_info.length as u64)
     }
 
-    async fn delete_dir(&self, dir: &str) -> Result<()> {
-        self.client.remove_dir_all(dir)?;
+    async fn create_dir(&self, dir: &str) -> Result<()> {
+        let _ = self.client.mkdirs(dir, 777, true).await?;
         Ok(())
     }
-}
 
-impl Hdrs {
-    fn new(name_node: &str, krb5_cache: Option<String>, user: Option<String>) 
-> Result<Self> {
-        let mut builder = ClientBuilder::new(name_node);
-        if krb5_cache.is_some() {
-            builder = 
builder.with_kerberos_ticket_cache_path(krb5_cache.unwrap().as_str());
-        }
-        if user.is_some() {
-            builder = builder.with_user(user.unwrap().as_str())
-        }
-        let client = builder.connect()?;
-        Ok(Hdrs { client })
+    async fn delete_dir(&self, dir: &str) -> Result<()> {
+        self.client.delete(dir, true).await?;
+        Ok(())
     }
 }
 

Reply via email to