This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new 47c47b7e6 feat(core): add Alluxio e2e test (#3573)
47c47b7e6 is described below

commit 47c47b7e6c224fe27569771a72433d1bc304ea14
Author: hoslo <[email protected]>
AuthorDate: Tue Nov 14 00:00:38 2023 +0800

    feat(core): add Alluxio e2e test (#3573)
---
 .env.example                                |  3 ++
 .github/services/alluxio/alluxio/action.yml | 35 ++++++++++++++
 .github/workflows/ci.yml                    |  1 +
 bindings/java/Cargo.toml                    |  2 +
 bindings/nodejs/Cargo.toml                  |  2 +
 bindings/python/Cargo.toml                  |  2 +
 core/src/services/alluxio/backend.rs        |  7 ++-
 core/src/services/alluxio/core.rs           | 66 +++++++++++++++++---------
 core/src/services/alluxio/error.rs          | 57 ++++++++++++++++-------
 core/src/services/alluxio/pager.rs          | 42 ++++++++++++-----
 core/src/types/operator/builder.rs          |  2 +
 core/src/types/scheme.rs                    |  3 ++
 fixtures/alluxio/docker-compose-alluxio.yml | 72 +++++++++++++++++++++++++++++
 13 files changed, 241 insertions(+), 53 deletions(-)

diff --git a/.env.example b/.env.example
index 32eee467b..7f11ebf03 100644
--- a/.env.example
+++ b/.env.example
@@ -166,3 +166,6 @@ OPENDAL_GRIDFS_CONNECTION_STRING=mongodb://localhost:27017
 OPENDAL_GRIDFS_DATABASE=<database>
 OPENDAL_GRIDFS_BUCKET=<fs>
 OPENDAL_GRIDFS_CHUNK_SIZE=<chunk_size>
+# alluxio
+OPENDAL_ALLUXIO_ENDPOINT=<endpoint>
+OPENDAL_ALLUXIO_ROOT=/path/to/dor
diff --git a/.github/services/alluxio/alluxio/action.yml 
b/.github/services/alluxio/alluxio/action.yml
new file mode 100644
index 000000000..25c05761c
--- /dev/null
+++ b/.github/services/alluxio/alluxio/action.yml
@@ -0,0 +1,35 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+name: alluxio
+description: "Behavior test for Alluxio"
+
+runs:
+  using: "composite"
+  steps:
+    - name: Setup Alluxio service
+      shell: bash
+      working-directory: fixtures/alluxio
+      run: |
+        docker compose -f docker-compose-alluxio.yml up -d --wait
+    - name: Set environment variables
+      shell: bash
+      run: |
+        cat << EOF >> $GITHUB_ENV
+        OPENDAL_ALLUXIO_ENDPOINT=http://127.0.0.1:39999
+        OPENDAL_ALLUXIO_ROOT=/
+        EOF
diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index de199f5d3..5b9acf14e 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -202,6 +202,7 @@ jobs:
         shell: bash
         run: |
           FEATURES=(
+            services-alluxio
             services-azblob
             services-azdls
             services-cacache
diff --git a/bindings/java/Cargo.toml b/bindings/java/Cargo.toml
index c93d9bfcf..2fcff3da7 100644
--- a/bindings/java/Cargo.toml
+++ b/bindings/java/Cargo.toml
@@ -85,6 +85,7 @@ services-all = [
   "services-azfile",
   "services-libsql",
   "services-swift",
+  "services-alluxio",
 ]
 
 # Default services provided by opendal.
@@ -135,6 +136,7 @@ services-swift = ["opendal/services-swift"]
 services-tikv = ["opendal/services-tikv"]
 services-vercel-artifacts = ["opendal/services-vercel-artifacts"]
 services-wasabi = ["opendal/services-wasabi"]
+services-alluxio = ["opendal/services-alluxio"]
 
 [dependencies]
 anyhow = "1.0.71"
diff --git a/bindings/nodejs/Cargo.toml b/bindings/nodejs/Cargo.toml
index 9a29697a8..92d7c451c 100644
--- a/bindings/nodejs/Cargo.toml
+++ b/bindings/nodejs/Cargo.toml
@@ -81,6 +81,7 @@ services-all = [
   "services-gridfs",
   "services-sqlite",
   "services-libsql",
+  "services-alluxio",
 ]
 
 # Default services provided by opendal.
@@ -131,6 +132,7 @@ services-swift = ["opendal/services-swift"]
 services-tikv = ["opendal/services-tikv"]
 services-vercel-artifacts = ["opendal/services-vercel-artifacts"]
 services-wasabi = ["opendal/services-wasabi"]
+services-alluxio = ["opendal/services-alluxio"]
 
 [lib]
 crate-type = ["cdylib"]
diff --git a/bindings/python/Cargo.toml b/bindings/python/Cargo.toml
index d34be0d40..553575f71 100644
--- a/bindings/python/Cargo.toml
+++ b/bindings/python/Cargo.toml
@@ -81,6 +81,7 @@ services-all = [
   "services-gridfs",
   "services-sqlite",
   "services-libsql",
+  "services-alluxio",
 ]
 
 # Default services provided by opendal.
@@ -131,6 +132,7 @@ services-swift = ["opendal/services-swift"]
 services-tikv = ["opendal/services-tikv"]
 services-vercel-artifacts = ["opendal/services-vercel-artifacts"]
 services-wasabi = ["opendal/services-wasabi"]
+services-alluxio = ["opendal/services-alluxio"]
 
 [lib]
 crate-type = ["cdylib"]
diff --git a/core/src/services/alluxio/backend.rs 
b/core/src/services/alluxio/backend.rs
index 25046e068..58d33c07a 100644
--- a/core/src/services/alluxio/backend.rs
+++ b/core/src/services/alluxio/backend.rs
@@ -152,7 +152,7 @@ impl Builder for AlluxioBuilder {
             Some(endpoint) => Ok(endpoint.clone()),
             None => Err(Error::new(ErrorKind::ConfigInvalid, "endpoint is 
empty")
                 .with_operation("Builder::build")
-                .with_context("service", Scheme::Azfile)),
+                .with_context("service", Scheme::Alluxio)),
         }?;
         debug!("backend use endpoint {}", &endpoint);
 
@@ -161,7 +161,7 @@ impl Builder for AlluxioBuilder {
         } else {
             HttpClient::new().map_err(|err| {
                 err.with_operation("Builder::build")
-                    .with_context("service", Scheme::S3)
+                    .with_context("service", Scheme::Alluxio)
             })?
         };
 
@@ -204,7 +204,6 @@ impl Accessor for AlluxioBackend {
 
                 create_dir: true,
                 delete: true,
-                rename: true,
 
                 list: true,
                 list_without_recursive: true,
@@ -230,7 +229,7 @@ impl Accessor for AlluxioBackend {
     }
 
     async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
-        let w = AlluxioWriter::new(self.core.clone(), args.clone(), 
path.to_string());
+        let w = AlluxioWriter::new(self.core.clone(), args, path.to_string());
         let w = OneShotWriter::new(w);
 
         Ok((RpWrite::default(), w))
diff --git a/core/src/services/alluxio/core.rs 
b/core/src/services/alluxio/core.rs
index b746e4b1a..86bf36923 100644
--- a/core/src/services/alluxio/core.rs
+++ b/core/src/services/alluxio/core.rs
@@ -38,9 +38,12 @@ struct CreateFileRequest {
 }
 
 #[derive(Debug, Serialize)]
+#[serde(rename_all = "camelCase")]
 struct CreateDirRequest {
     #[serde(skip_serializing_if = "Option::is_none")]
     recursive: Option<bool>,
+    #[serde(skip_serializing_if = "Option::is_none")]
+    allow_exists: Option<bool>,
 }
 
 /// Metadata of alluxio object
@@ -97,17 +100,20 @@ impl Debug for AlluxioCore {
 
 impl AlluxioCore {
     pub async fn create_dir(&self, path: &str) -> Result<()> {
-        let path = build_abs_path(&self.root, path);
+        let path = build_rooted_abs_path(&self.root, path);
 
         let r = CreateDirRequest {
             recursive: Some(true),
+            allow_exists: Some(true),
         };
 
         let body = serde_json::to_vec(&r).map_err(new_json_serialize_error)?;
         let body = bytes::Bytes::from(body);
+
         let mut req = Request::post(format!(
-            "{}/api/v1/paths//{}/create-directory",
-            self.endpoint, path
+            "{}/api/v1/paths/{}/create-directory",
+            self.endpoint,
+            percent_encode_path(&path)
         ));
 
         req = req.header("Content-Type", "application/json");
@@ -126,7 +132,7 @@ impl AlluxioCore {
     }
 
     pub async fn create_file(&self, path: &str) -> Result<u64> {
-        let path = build_abs_path(&self.root, path);
+        let path = build_rooted_abs_path(&self.root, path);
 
         let r = CreateFileRequest {
             recursive: Some(true),
@@ -135,8 +141,9 @@ impl AlluxioCore {
         let body = serde_json::to_vec(&r).map_err(new_json_serialize_error)?;
         let body = bytes::Bytes::from(body);
         let mut req = Request::post(format!(
-            "{}/api/v1/paths//{}/create-file",
-            self.endpoint, path
+            "{}/api/v1/paths/{}/create-file",
+            self.endpoint,
+            percent_encode_path(&path)
         ));
 
         req = req.header("Content-Type", "application/json");
@@ -160,11 +167,12 @@ impl AlluxioCore {
     }
 
     pub(super) async fn open_file(&self, path: &str) -> Result<u64> {
-        let path = build_abs_path(&self.root, path);
+        let path = build_rooted_abs_path(&self.root, path);
 
         let req = Request::post(format!(
-            "{}/api/v1/paths//{}/open-file",
-            self.endpoint, path
+            "{}/api/v1/paths/{}/open-file",
+            self.endpoint,
+            percent_encode_path(&path)
         ));
         let req = req
             .body(AsyncBody::Empty)
@@ -185,9 +193,13 @@ impl AlluxioCore {
     }
 
     pub(super) async fn delete(&self, path: &str) -> Result<()> {
-        let path = build_abs_path(&self.root, path);
+        let path = build_rooted_abs_path(&self.root, path);
 
-        let req = Request::post(format!("{}/api/v1/paths//{}/delete", 
self.endpoint, path));
+        let req = Request::post(format!(
+            "{}/api/v1/paths/{}/delete",
+            self.endpoint,
+            percent_encode_path(&path)
+        ));
         let req = req
             .body(AsyncBody::Empty)
             .map_err(new_request_build_error)?;
@@ -197,16 +209,25 @@ impl AlluxioCore {
 
         match status {
             StatusCode::OK => Ok(()),
-            _ => Err(parse_error(resp).await?),
+            _ => {
+                let err = parse_error(resp).await?;
+                if err.kind() == ErrorKind::NotFound {
+                    return Ok(());
+                }
+                Err(err)
+            }
         }
     }
 
     pub(super) async fn rename(&self, path: &str, dst: &str) -> Result<()> {
-        let path = build_abs_path(&self.root, path);
+        let path = build_rooted_abs_path(&self.root, path);
+        let dst = build_rooted_abs_path(&self.root, dst);
 
         let req = Request::post(format!(
-            "{}/api/v1/paths//{}/rename?dst=/{}",
-            self.endpoint, path, dst
+            "{}/api/v1/paths/{}/rename?dst={}",
+            self.endpoint,
+            percent_encode_path(&path),
+            percent_encode_path(&dst)
         ));
 
         let req = req
@@ -224,16 +245,18 @@ impl AlluxioCore {
     }
 
     pub(super) async fn get_status(&self, path: &str) -> Result<FileInfo> {
-        let path = build_abs_path(&self.root, path);
+        let path = build_rooted_abs_path(&self.root, path);
 
         let req = Request::post(format!(
-            "{}/api/v1/paths//{}/get-status",
-            self.endpoint, path
+            "{}/api/v1/paths/{}/get-status",
+            self.endpoint,
+            percent_encode_path(&path)
         ));
 
         let req = req
             .body(AsyncBody::Empty)
             .map_err(new_request_build_error)?;
+
         let resp = self.client.send(req).await?;
 
         let status = resp.status();
@@ -250,11 +273,12 @@ impl AlluxioCore {
     }
 
     pub(super) async fn list_status(&self, path: &str) -> 
Result<Vec<FileInfo>> {
-        let path = build_abs_path(&self.root, path);
+        let path = build_rooted_abs_path(&self.root, path);
 
         let req = Request::post(format!(
-            "{}/api/v1/paths//{}/list-status",
-            self.endpoint, path
+            "{}/api/v1/paths/{}/list-status",
+            self.endpoint,
+            percent_encode_path(&path)
         ));
 
         let req = req
diff --git a/core/src/services/alluxio/error.rs 
b/core/src/services/alluxio/error.rs
index 8a8941b10..0f19d7a29 100644
--- a/core/src/services/alluxio/error.rs
+++ b/core/src/services/alluxio/error.rs
@@ -46,9 +46,9 @@ pub async fn parse_error(resp: Response<IncomingAsyncBody>) 
-> Result<Error> {
 
     if let Some(alluxio_err) = alluxio_err {
         kind = match alluxio_err.status_code.as_str() {
-            "AlreadyExists" => ErrorKind::AlreadyExists,
-            "NotFound" => ErrorKind::NotFound,
-            "InvalidArgument" => ErrorKind::InvalidInput,
+            "ALREADY_EXISTS" => ErrorKind::AlreadyExists,
+            "NOT_FOUND" => ErrorKind::NotFound,
+            "INVALID_ARGUMENT" => ErrorKind::InvalidInput,
             _ => ErrorKind::Unexpected,
         }
     }
@@ -63,23 +63,46 @@ pub async fn parse_error(resp: Response<IncomingAsyncBody>) 
-> Result<Error> {
 #[cfg(test)]
 mod tests {
     use super::*;
+    use futures::stream;
+    use http::StatusCode;
 
     /// Error response example is from 
https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html
-    #[test]
-    fn test_parse_error() {
-        let bs = bytes::Bytes::from(
-            r#"
-            {
-                "statusCode": "AlreadyExists",
-                "message": "The resource you requested already exist"
-            }
-"#,
-        );
+    #[tokio::test]
+    async fn test_parse_error() {
+        let err_res = vec![
+            (
+                r#"{"statusCode":"ALREADY_EXISTS","message":"The resource you 
requested already exist"}"#,
+                ErrorKind::AlreadyExists,
+            ),
+            (
+                r#"{"statusCode":"NOT_FOUND","message":"The resource you 
requested does not exist"}"#,
+                ErrorKind::NotFound,
+            ),
+            (
+                r#"{"statusCode":"INVALID_ARGUMENT","message":"The argument 
you provided is invalid"}"#,
+                ErrorKind::InvalidInput,
+            ),
+            (
+                r#"{"statusCode":"INTERNAL_SERVER_ERROR","message":"Internal 
server error"}"#,
+                ErrorKind::Unexpected,
+            ),
+        ];
 
-        let out: AlluxioError = 
serde_json::from_reader(bs.reader()).expect("must success");
-        println!("{out:?}");
+        for res in err_res {
+            let bs = bytes::Bytes::from(res.0);
+            let body = IncomingAsyncBody::new(
+                Box::new(oio::into_stream(stream::iter(vec![Ok(bs.clone())]))),
+                None,
+            );
+            let resp = Response::builder()
+                .status(StatusCode::INTERNAL_SERVER_ERROR)
+                .body(body)
+                .unwrap();
 
-        assert_eq!(out.status_code, "AlreadyExists");
-        assert_eq!(out.message, "The resource you requested already exist");
+            let err = parse_error(resp).await;
+
+            assert!(err.is_ok());
+            assert_eq!(err.unwrap().kind(), res.1);
+        }
     }
 }
diff --git a/core/src/services/alluxio/pager.rs 
b/core/src/services/alluxio/pager.rs
index 148e01b79..64483751b 100644
--- a/core/src/services/alluxio/pager.rs
+++ b/core/src/services/alluxio/pager.rs
@@ -22,6 +22,7 @@ use async_trait::async_trait;
 use super::core::AlluxioCore;
 use crate::raw::oio::Entry;
 use crate::raw::*;
+use crate::ErrorKind;
 use crate::Result;
 
 pub struct AlluxioPager {
@@ -49,20 +50,39 @@ impl oio::Page for AlluxioPager {
             return Ok(None);
         }
 
-        let file_infos = self.core.list_status(&self.path).await?;
+        let result = self.core.list_status(&self.path).await;
 
-        let mut entries = vec![];
-        for file_info in file_infos {
-            let path: String = file_info.path.clone();
-            entries.push(Entry::new(&path, file_info.try_into()?));
-        }
+        match result {
+            Ok(file_infos) => {
+                let mut entries = vec![];
+                for file_info in file_infos {
+                    let path: String = file_info.path.clone();
+                    let path = if file_info.folder {
+                        format!("{}/", path)
+                    } else {
+                        path
+                    };
+                    entries.push(Entry::new(
+                        &build_rel_path(&self.core.root, &path),
+                        file_info.try_into()?,
+                    ));
+                }
 
-        if entries.is_empty() {
-            return Ok(None);
-        }
+                if entries.is_empty() {
+                    return Ok(None);
+                }
 
-        self.done = true;
+                self.done = true;
 
-        Ok(Some(entries))
+                Ok(Some(entries))
+            }
+            Err(e) => {
+                if e.kind() == ErrorKind::NotFound {
+                    self.done = true;
+                    return Ok(None);
+                }
+                Err(e)
+            }
+        }
     }
 }
diff --git a/core/src/types/operator/builder.rs 
b/core/src/types/operator/builder.rs
index 3efbcddf7..f926bd570 100644
--- a/core/src/types/operator/builder.rs
+++ b/core/src/types/operator/builder.rs
@@ -155,6 +155,8 @@ impl Operator {
         let op = match scheme {
             #[cfg(feature = "services-atomicserver")]
             Scheme::Atomicserver => 
Self::from_map::<services::Atomicserver>(map)?.finish(),
+            #[cfg(feature = "services-alluxio")]
+            Scheme::Alluxio => 
Self::from_map::<services::Alluxio>(map)?.finish(),
             #[cfg(feature = "services-azblob")]
             Scheme::Azblob => 
Self::from_map::<services::Azblob>(map)?.finish(),
             #[cfg(feature = "services-azdls")]
diff --git a/core/src/types/scheme.rs b/core/src/types/scheme.rs
index 224e15b7a..f0bf62b73 100644
--- a/core/src/types/scheme.rs
+++ b/core/src/types/scheme.rs
@@ -165,6 +165,8 @@ impl Scheme {
         HashSet::from([
             #[cfg(feature = "services-atomicserver")]
             Scheme::Atomicserver,
+            #[cfg(feature = "services-alluxio")]
+            Scheme::Alluxio,
             #[cfg(feature = "services-azblob")]
             Scheme::Azblob,
             #[cfg(feature = "services-azdls")]
@@ -275,6 +277,7 @@ impl FromStr for Scheme {
         match s.as_str() {
             "atomicserver" => Ok(Scheme::Atomicserver),
             "azblob" => Ok(Scheme::Azblob),
+            "alluxio" => Ok(Scheme::Alluxio),
             // Notes:
             //
             // OpenDAL used to call `azdls` as `azdfs`, we keep it for 
backward compatibility.
diff --git a/fixtures/alluxio/docker-compose-alluxio.yml 
b/fixtures/alluxio/docker-compose-alluxio.yml
new file mode 100644
index 000000000..d725fe8c3
--- /dev/null
+++ b/fixtures/alluxio/docker-compose-alluxio.yml
@@ -0,0 +1,72 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+version: "3.8"
+services:
+  alluxio-master:
+    image: alluxio/alluxio:2.9.3
+    ports:
+      - 19999:19999
+      - 19998:19998
+    environment:
+      ALLUXIO_JAVA_OPTS:  -Dalluxio.master.hostname=alluxio-master 
-Dalluxio.master.mount.table.root.ufs=/opt/alluxio/underFSStorage
+    command: master
+    networks:
+      - alluxio_network
+    healthcheck:
+      test: ["CMD", "curl", "-i", "http://localhost:19999/";]
+      interval: 3s
+      timeout: 20s
+      retries: 10
+
+  alluxio-proxy:
+    image: alluxio/alluxio:2.9.3
+    ports:
+      - 39999:39999
+    environment:
+      ALLUXIO_JAVA_OPTS: -Dalluxio.master.hostname=alluxio-master 
-Dalluxio.proxy.s3.v2.version.enabled=false 
-Dalluxio.proxy.s3.v2.async.processing.enabled=false
+    command: proxy
+    networks:
+      - alluxio_network
+    healthcheck:
+      test: ["CMD", "curl", "-i", "http://localhost:39999/";]
+      interval: 3s
+      timeout: 20s
+      retries: 10
+
+  alluxio-worker:
+    image: alluxio/alluxio:2.9.3
+    ports:
+      - 29999:29999
+      - 30000:30000
+    shm_size: 1gb
+    environment:
+      ALLUXIO_JAVA_OPTS:  -Dalluxio.worker.ramdisk.size=1G     
-Dalluxio.master.hostname=alluxio-master  
-Dalluxio.worker.hostname=alluxio-worker
+    command: worker
+    networks:
+      - alluxio_network
+    healthcheck:
+      test: ["CMD", "curl", "-i", "http://localhost:30000/";]
+      interval: 3s
+      timeout: 20s
+      retries: 10
+
+
+networks:
+  alluxio_network:
+    driver: bridge
+    
\ No newline at end of file

Reply via email to