This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
The following commit(s) were added to refs/heads/main by this push:
new 47c47b7e6 feat(core): add Alluxio e2e test (#3573)
47c47b7e6 is described below
commit 47c47b7e6c224fe27569771a72433d1bc304ea14
Author: hoslo <[email protected]>
AuthorDate: Tue Nov 14 00:00:38 2023 +0800
feat(core): add Alluxio e2e test (#3573)
---
.env.example | 3 ++
.github/services/alluxio/alluxio/action.yml | 35 ++++++++++++++
.github/workflows/ci.yml | 1 +
bindings/java/Cargo.toml | 2 +
bindings/nodejs/Cargo.toml | 2 +
bindings/python/Cargo.toml | 2 +
core/src/services/alluxio/backend.rs | 7 ++-
core/src/services/alluxio/core.rs | 66 +++++++++++++++++---------
core/src/services/alluxio/error.rs | 57 ++++++++++++++++-------
core/src/services/alluxio/pager.rs | 42 ++++++++++++-----
core/src/types/operator/builder.rs | 2 +
core/src/types/scheme.rs | 3 ++
fixtures/alluxio/docker-compose-alluxio.yml | 72 +++++++++++++++++++++++++++++
13 files changed, 241 insertions(+), 53 deletions(-)
diff --git a/.env.example b/.env.example
index 32eee467b..7f11ebf03 100644
--- a/.env.example
+++ b/.env.example
@@ -166,3 +166,6 @@ OPENDAL_GRIDFS_CONNECTION_STRING=mongodb://localhost:27017
OPENDAL_GRIDFS_DATABASE=<database>
OPENDAL_GRIDFS_BUCKET=<fs>
OPENDAL_GRIDFS_CHUNK_SIZE=<chunk_size>
+# alluxio
+OPENDAL_ALLUXIO_ENDPOINT=<endpoint>
+OPENDAL_ALLUXIO_ROOT=/path/to/dor
diff --git a/.github/services/alluxio/alluxio/action.yml
b/.github/services/alluxio/alluxio/action.yml
new file mode 100644
index 000000000..25c05761c
--- /dev/null
+++ b/.github/services/alluxio/alluxio/action.yml
@@ -0,0 +1,35 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+name: alluxio
+description: "Behavior test for Alluxio"
+
+runs:
+ using: "composite"
+ steps:
+ - name: Setup Alluxio service
+ shell: bash
+ working-directory: fixtures/alluxio
+ run: |
+ docker compose -f docker-compose-alluxio.yml up -d --wait
+ - name: Set environment variables
+ shell: bash
+ run: |
+ cat << EOF >> $GITHUB_ENV
+ OPENDAL_ALLUXIO_ENDPOINT=http://127.0.0.1:39999
+ OPENDAL_ALLUXIO_ROOT=/
+ EOF
diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index de199f5d3..5b9acf14e 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -202,6 +202,7 @@ jobs:
shell: bash
run: |
FEATURES=(
+ services-alluxio
services-azblob
services-azdls
services-cacache
diff --git a/bindings/java/Cargo.toml b/bindings/java/Cargo.toml
index c93d9bfcf..2fcff3da7 100644
--- a/bindings/java/Cargo.toml
+++ b/bindings/java/Cargo.toml
@@ -85,6 +85,7 @@ services-all = [
"services-azfile",
"services-libsql",
"services-swift",
+ "services-alluxio",
]
# Default services provided by opendal.
@@ -135,6 +136,7 @@ services-swift = ["opendal/services-swift"]
services-tikv = ["opendal/services-tikv"]
services-vercel-artifacts = ["opendal/services-vercel-artifacts"]
services-wasabi = ["opendal/services-wasabi"]
+services-alluxio = ["opendal/services-alluxio"]
[dependencies]
anyhow = "1.0.71"
diff --git a/bindings/nodejs/Cargo.toml b/bindings/nodejs/Cargo.toml
index 9a29697a8..92d7c451c 100644
--- a/bindings/nodejs/Cargo.toml
+++ b/bindings/nodejs/Cargo.toml
@@ -81,6 +81,7 @@ services-all = [
"services-gridfs",
"services-sqlite",
"services-libsql",
+ "services-alluxio",
]
# Default services provided by opendal.
@@ -131,6 +132,7 @@ services-swift = ["opendal/services-swift"]
services-tikv = ["opendal/services-tikv"]
services-vercel-artifacts = ["opendal/services-vercel-artifacts"]
services-wasabi = ["opendal/services-wasabi"]
+services-alluxio = ["opendal/services-alluxio"]
[lib]
crate-type = ["cdylib"]
diff --git a/bindings/python/Cargo.toml b/bindings/python/Cargo.toml
index d34be0d40..553575f71 100644
--- a/bindings/python/Cargo.toml
+++ b/bindings/python/Cargo.toml
@@ -81,6 +81,7 @@ services-all = [
"services-gridfs",
"services-sqlite",
"services-libsql",
+ "services-alluxio",
]
# Default services provided by opendal.
@@ -131,6 +132,7 @@ services-swift = ["opendal/services-swift"]
services-tikv = ["opendal/services-tikv"]
services-vercel-artifacts = ["opendal/services-vercel-artifacts"]
services-wasabi = ["opendal/services-wasabi"]
+services-alluxio = ["opendal/services-alluxio"]
[lib]
crate-type = ["cdylib"]
diff --git a/core/src/services/alluxio/backend.rs
b/core/src/services/alluxio/backend.rs
index 25046e068..58d33c07a 100644
--- a/core/src/services/alluxio/backend.rs
+++ b/core/src/services/alluxio/backend.rs
@@ -152,7 +152,7 @@ impl Builder for AlluxioBuilder {
Some(endpoint) => Ok(endpoint.clone()),
None => Err(Error::new(ErrorKind::ConfigInvalid, "endpoint is
empty")
.with_operation("Builder::build")
- .with_context("service", Scheme::Azfile)),
+ .with_context("service", Scheme::Alluxio)),
}?;
debug!("backend use endpoint {}", &endpoint);
@@ -161,7 +161,7 @@ impl Builder for AlluxioBuilder {
} else {
HttpClient::new().map_err(|err| {
err.with_operation("Builder::build")
- .with_context("service", Scheme::S3)
+ .with_context("service", Scheme::Alluxio)
})?
};
@@ -204,7 +204,6 @@ impl Accessor for AlluxioBackend {
create_dir: true,
delete: true,
- rename: true,
list: true,
list_without_recursive: true,
@@ -230,7 +229,7 @@ impl Accessor for AlluxioBackend {
}
async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite,
Self::Writer)> {
- let w = AlluxioWriter::new(self.core.clone(), args.clone(),
path.to_string());
+ let w = AlluxioWriter::new(self.core.clone(), args, path.to_string());
let w = OneShotWriter::new(w);
Ok((RpWrite::default(), w))
diff --git a/core/src/services/alluxio/core.rs
b/core/src/services/alluxio/core.rs
index b746e4b1a..86bf36923 100644
--- a/core/src/services/alluxio/core.rs
+++ b/core/src/services/alluxio/core.rs
@@ -38,9 +38,12 @@ struct CreateFileRequest {
}
#[derive(Debug, Serialize)]
+#[serde(rename_all = "camelCase")]
struct CreateDirRequest {
#[serde(skip_serializing_if = "Option::is_none")]
recursive: Option<bool>,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ allow_exists: Option<bool>,
}
/// Metadata of alluxio object
@@ -97,17 +100,20 @@ impl Debug for AlluxioCore {
impl AlluxioCore {
pub async fn create_dir(&self, path: &str) -> Result<()> {
- let path = build_abs_path(&self.root, path);
+ let path = build_rooted_abs_path(&self.root, path);
let r = CreateDirRequest {
recursive: Some(true),
+ allow_exists: Some(true),
};
let body = serde_json::to_vec(&r).map_err(new_json_serialize_error)?;
let body = bytes::Bytes::from(body);
+
let mut req = Request::post(format!(
- "{}/api/v1/paths//{}/create-directory",
- self.endpoint, path
+ "{}/api/v1/paths/{}/create-directory",
+ self.endpoint,
+ percent_encode_path(&path)
));
req = req.header("Content-Type", "application/json");
@@ -126,7 +132,7 @@ impl AlluxioCore {
}
pub async fn create_file(&self, path: &str) -> Result<u64> {
- let path = build_abs_path(&self.root, path);
+ let path = build_rooted_abs_path(&self.root, path);
let r = CreateFileRequest {
recursive: Some(true),
@@ -135,8 +141,9 @@ impl AlluxioCore {
let body = serde_json::to_vec(&r).map_err(new_json_serialize_error)?;
let body = bytes::Bytes::from(body);
let mut req = Request::post(format!(
- "{}/api/v1/paths//{}/create-file",
- self.endpoint, path
+ "{}/api/v1/paths/{}/create-file",
+ self.endpoint,
+ percent_encode_path(&path)
));
req = req.header("Content-Type", "application/json");
@@ -160,11 +167,12 @@ impl AlluxioCore {
}
pub(super) async fn open_file(&self, path: &str) -> Result<u64> {
- let path = build_abs_path(&self.root, path);
+ let path = build_rooted_abs_path(&self.root, path);
let req = Request::post(format!(
- "{}/api/v1/paths//{}/open-file",
- self.endpoint, path
+ "{}/api/v1/paths/{}/open-file",
+ self.endpoint,
+ percent_encode_path(&path)
));
let req = req
.body(AsyncBody::Empty)
@@ -185,9 +193,13 @@ impl AlluxioCore {
}
pub(super) async fn delete(&self, path: &str) -> Result<()> {
- let path = build_abs_path(&self.root, path);
+ let path = build_rooted_abs_path(&self.root, path);
- let req = Request::post(format!("{}/api/v1/paths//{}/delete",
self.endpoint, path));
+ let req = Request::post(format!(
+ "{}/api/v1/paths/{}/delete",
+ self.endpoint,
+ percent_encode_path(&path)
+ ));
let req = req
.body(AsyncBody::Empty)
.map_err(new_request_build_error)?;
@@ -197,16 +209,25 @@ impl AlluxioCore {
match status {
StatusCode::OK => Ok(()),
- _ => Err(parse_error(resp).await?),
+ _ => {
+ let err = parse_error(resp).await?;
+ if err.kind() == ErrorKind::NotFound {
+ return Ok(());
+ }
+ Err(err)
+ }
}
}
pub(super) async fn rename(&self, path: &str, dst: &str) -> Result<()> {
- let path = build_abs_path(&self.root, path);
+ let path = build_rooted_abs_path(&self.root, path);
+ let dst = build_rooted_abs_path(&self.root, dst);
let req = Request::post(format!(
- "{}/api/v1/paths//{}/rename?dst=/{}",
- self.endpoint, path, dst
+ "{}/api/v1/paths/{}/rename?dst={}",
+ self.endpoint,
+ percent_encode_path(&path),
+ percent_encode_path(&dst)
));
let req = req
@@ -224,16 +245,18 @@ impl AlluxioCore {
}
pub(super) async fn get_status(&self, path: &str) -> Result<FileInfo> {
- let path = build_abs_path(&self.root, path);
+ let path = build_rooted_abs_path(&self.root, path);
let req = Request::post(format!(
- "{}/api/v1/paths//{}/get-status",
- self.endpoint, path
+ "{}/api/v1/paths/{}/get-status",
+ self.endpoint,
+ percent_encode_path(&path)
));
let req = req
.body(AsyncBody::Empty)
.map_err(new_request_build_error)?;
+
let resp = self.client.send(req).await?;
let status = resp.status();
@@ -250,11 +273,12 @@ impl AlluxioCore {
}
pub(super) async fn list_status(&self, path: &str) ->
Result<Vec<FileInfo>> {
- let path = build_abs_path(&self.root, path);
+ let path = build_rooted_abs_path(&self.root, path);
let req = Request::post(format!(
- "{}/api/v1/paths//{}/list-status",
- self.endpoint, path
+ "{}/api/v1/paths/{}/list-status",
+ self.endpoint,
+ percent_encode_path(&path)
));
let req = req
diff --git a/core/src/services/alluxio/error.rs
b/core/src/services/alluxio/error.rs
index 8a8941b10..0f19d7a29 100644
--- a/core/src/services/alluxio/error.rs
+++ b/core/src/services/alluxio/error.rs
@@ -46,9 +46,9 @@ pub async fn parse_error(resp: Response<IncomingAsyncBody>)
-> Result<Error> {
if let Some(alluxio_err) = alluxio_err {
kind = match alluxio_err.status_code.as_str() {
- "AlreadyExists" => ErrorKind::AlreadyExists,
- "NotFound" => ErrorKind::NotFound,
- "InvalidArgument" => ErrorKind::InvalidInput,
+ "ALREADY_EXISTS" => ErrorKind::AlreadyExists,
+ "NOT_FOUND" => ErrorKind::NotFound,
+ "INVALID_ARGUMENT" => ErrorKind::InvalidInput,
_ => ErrorKind::Unexpected,
}
}
@@ -63,23 +63,46 @@ pub async fn parse_error(resp: Response<IncomingAsyncBody>)
-> Result<Error> {
#[cfg(test)]
mod tests {
use super::*;
+ use futures::stream;
+ use http::StatusCode;
/// Error response example is from
https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html
- #[test]
- fn test_parse_error() {
- let bs = bytes::Bytes::from(
- r#"
- {
- "statusCode": "AlreadyExists",
- "message": "The resource you requested already exist"
- }
-"#,
- );
+ #[tokio::test]
+ async fn test_parse_error() {
+ let err_res = vec![
+ (
+ r#"{"statusCode":"ALREADY_EXISTS","message":"The resource you
requested already exist"}"#,
+ ErrorKind::AlreadyExists,
+ ),
+ (
+ r#"{"statusCode":"NOT_FOUND","message":"The resource you
requested does not exist"}"#,
+ ErrorKind::NotFound,
+ ),
+ (
+ r#"{"statusCode":"INVALID_ARGUMENT","message":"The argument
you provided is invalid"}"#,
+ ErrorKind::InvalidInput,
+ ),
+ (
+ r#"{"statusCode":"INTERNAL_SERVER_ERROR","message":"Internal
server error"}"#,
+ ErrorKind::Unexpected,
+ ),
+ ];
- let out: AlluxioError =
serde_json::from_reader(bs.reader()).expect("must success");
- println!("{out:?}");
+ for res in err_res {
+ let bs = bytes::Bytes::from(res.0);
+ let body = IncomingAsyncBody::new(
+ Box::new(oio::into_stream(stream::iter(vec![Ok(bs.clone())]))),
+ None,
+ );
+ let resp = Response::builder()
+ .status(StatusCode::INTERNAL_SERVER_ERROR)
+ .body(body)
+ .unwrap();
- assert_eq!(out.status_code, "AlreadyExists");
- assert_eq!(out.message, "The resource you requested already exist");
+ let err = parse_error(resp).await;
+
+ assert!(err.is_ok());
+ assert_eq!(err.unwrap().kind(), res.1);
+ }
}
}
diff --git a/core/src/services/alluxio/pager.rs
b/core/src/services/alluxio/pager.rs
index 148e01b79..64483751b 100644
--- a/core/src/services/alluxio/pager.rs
+++ b/core/src/services/alluxio/pager.rs
@@ -22,6 +22,7 @@ use async_trait::async_trait;
use super::core::AlluxioCore;
use crate::raw::oio::Entry;
use crate::raw::*;
+use crate::ErrorKind;
use crate::Result;
pub struct AlluxioPager {
@@ -49,20 +50,39 @@ impl oio::Page for AlluxioPager {
return Ok(None);
}
- let file_infos = self.core.list_status(&self.path).await?;
+ let result = self.core.list_status(&self.path).await;
- let mut entries = vec![];
- for file_info in file_infos {
- let path: String = file_info.path.clone();
- entries.push(Entry::new(&path, file_info.try_into()?));
- }
+ match result {
+ Ok(file_infos) => {
+ let mut entries = vec![];
+ for file_info in file_infos {
+ let path: String = file_info.path.clone();
+ let path = if file_info.folder {
+ format!("{}/", path)
+ } else {
+ path
+ };
+ entries.push(Entry::new(
+ &build_rel_path(&self.core.root, &path),
+ file_info.try_into()?,
+ ));
+ }
- if entries.is_empty() {
- return Ok(None);
- }
+ if entries.is_empty() {
+ return Ok(None);
+ }
- self.done = true;
+ self.done = true;
- Ok(Some(entries))
+ Ok(Some(entries))
+ }
+ Err(e) => {
+ if e.kind() == ErrorKind::NotFound {
+ self.done = true;
+ return Ok(None);
+ }
+ Err(e)
+ }
+ }
}
}
diff --git a/core/src/types/operator/builder.rs
b/core/src/types/operator/builder.rs
index 3efbcddf7..f926bd570 100644
--- a/core/src/types/operator/builder.rs
+++ b/core/src/types/operator/builder.rs
@@ -155,6 +155,8 @@ impl Operator {
let op = match scheme {
#[cfg(feature = "services-atomicserver")]
Scheme::Atomicserver =>
Self::from_map::<services::Atomicserver>(map)?.finish(),
+ #[cfg(feature = "services-alluxio")]
+ Scheme::Alluxio =>
Self::from_map::<services::Alluxio>(map)?.finish(),
#[cfg(feature = "services-azblob")]
Scheme::Azblob =>
Self::from_map::<services::Azblob>(map)?.finish(),
#[cfg(feature = "services-azdls")]
diff --git a/core/src/types/scheme.rs b/core/src/types/scheme.rs
index 224e15b7a..f0bf62b73 100644
--- a/core/src/types/scheme.rs
+++ b/core/src/types/scheme.rs
@@ -165,6 +165,8 @@ impl Scheme {
HashSet::from([
#[cfg(feature = "services-atomicserver")]
Scheme::Atomicserver,
+ #[cfg(feature = "services-alluxio")]
+ Scheme::Alluxio,
#[cfg(feature = "services-azblob")]
Scheme::Azblob,
#[cfg(feature = "services-azdls")]
@@ -275,6 +277,7 @@ impl FromStr for Scheme {
match s.as_str() {
"atomicserver" => Ok(Scheme::Atomicserver),
"azblob" => Ok(Scheme::Azblob),
+ "alluxio" => Ok(Scheme::Alluxio),
// Notes:
//
// OpenDAL used to call `azdls` as `azdfs`, we keep it for
backward compatibility.
diff --git a/fixtures/alluxio/docker-compose-alluxio.yml
b/fixtures/alluxio/docker-compose-alluxio.yml
new file mode 100644
index 000000000..d725fe8c3
--- /dev/null
+++ b/fixtures/alluxio/docker-compose-alluxio.yml
@@ -0,0 +1,72 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+version: "3.8"
+services:
+ alluxio-master:
+ image: alluxio/alluxio:2.9.3
+ ports:
+ - 19999:19999
+ - 19998:19998
+ environment:
+ ALLUXIO_JAVA_OPTS: -Dalluxio.master.hostname=alluxio-master
-Dalluxio.master.mount.table.root.ufs=/opt/alluxio/underFSStorage
+ command: master
+ networks:
+ - alluxio_network
+ healthcheck:
+ test: ["CMD", "curl", "-i", "http://localhost:19999/"]
+ interval: 3s
+ timeout: 20s
+ retries: 10
+
+ alluxio-proxy:
+ image: alluxio/alluxio:2.9.3
+ ports:
+ - 39999:39999
+ environment:
+ ALLUXIO_JAVA_OPTS: -Dalluxio.master.hostname=alluxio-master
-Dalluxio.proxy.s3.v2.version.enabled=false
-Dalluxio.proxy.s3.v2.async.processing.enabled=false
+ command: proxy
+ networks:
+ - alluxio_network
+ healthcheck:
+ test: ["CMD", "curl", "-i", "http://localhost:39999/"]
+ interval: 3s
+ timeout: 20s
+ retries: 10
+
+ alluxio-worker:
+ image: alluxio/alluxio:2.9.3
+ ports:
+ - 29999:29999
+ - 30000:30000
+ shm_size: 1gb
+ environment:
+ ALLUXIO_JAVA_OPTS: -Dalluxio.worker.ramdisk.size=1G
-Dalluxio.master.hostname=alluxio-master
-Dalluxio.worker.hostname=alluxio-worker
+ command: worker
+ networks:
+ - alluxio_network
+ healthcheck:
+ test: ["CMD", "curl", "-i", "http://localhost:30000/"]
+ interval: 3s
+ timeout: 20s
+ retries: 10
+
+
+networks:
+ alluxio_network:
+ driver: bridge
+
\ No newline at end of file