This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
The following commit(s) were added to refs/heads/main by this push:
new 2ca9a184c feat(service): add alluxio rest api support (#3564)
2ca9a184c is described below
commit 2ca9a184c697086b30300fc737e3a749008f3a36
Author: hoslo <[email protected]>
AuthorDate: Sat Nov 11 22:40:11 2023 +0800
feat(service): add alluxio rest api support (#3564)
feat(service): add alloxio rest api support
---
core/Cargo.toml | 1 +
core/src/services/alluxio/backend.rs | 293 +++++++++++++++++++++++++++++
core/src/services/alluxio/core.rs | 346 +++++++++++++++++++++++++++++++++++
core/src/services/alluxio/docs.md | 47 +++++
core/src/services/alluxio/error.rs | 85 +++++++++
core/src/services/alluxio/mod.rs | 25 +++
core/src/services/alluxio/pager.rs | 68 +++++++
core/src/services/alluxio/writer.rs | 57 ++++++
core/src/services/mod.rs | 7 +
core/src/types/scheme.rs | 3 +
10 files changed, 932 insertions(+)
diff --git a/core/Cargo.toml b/core/Cargo.toml
index 06332d099..f8d61163a 100644
--- a/core/Cargo.toml
+++ b/core/Cargo.toml
@@ -148,6 +148,7 @@ services-gdrive = []
services-ghac = []
services-gridfs = ["dep:mongodb"]
services-hdfs = ["dep:hdrs"]
+services-alluxio = []
services-http = []
services-ipfs = ["dep:prost"]
services-ipmfs = []
diff --git a/core/src/services/alluxio/backend.rs
b/core/src/services/alluxio/backend.rs
new file mode 100644
index 000000000..25046e068
--- /dev/null
+++ b/core/src/services/alluxio/backend.rs
@@ -0,0 +1,293 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+use std::fmt::Debug;
+use std::fmt::Formatter;
+use std::sync::Arc;
+
+use async_trait::async_trait;
+use log::debug;
+use serde::Deserialize;
+
+use crate::raw::oio::OneShotWriter;
+use crate::raw::*;
+use crate::*;
+
+use super::writer::AlluxioWriter;
+use super::writer::AlluxioWriters;
+use super::{core::AlluxioCore, pager::AlluxioPager};
+
+/// Config for alluxio services support.
+#[derive(Default, Deserialize)]
+#[serde(default)]
+#[non_exhaustive]
+pub struct AlluxioConfig {
+ /// root of this backend.
+ ///
+ /// All operations will happen under this root.
+ ///
+ /// default to `/` if not set.
+ pub root: Option<String>,
+ /// endpoint of this backend.
+ ///
+ /// Endpoint must be full uri, mostly like `http://127.0.0.1:39999`.
+ pub endpoint: Option<String>,
+}
+
+impl Debug for AlluxioConfig {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ let mut d = f.debug_struct("AlluxioConfig");
+
+ d.field("root", &self.root)
+ .field("endpoint", &self.endpoint);
+
+ d.finish_non_exhaustive()
+ }
+}
+
+/// [Alluxio](https://www.alluxio.io/) services support.
+#[doc = include_str!("docs.md")]
+#[derive(Default)]
+pub struct AlluxioBuilder {
+ config: AlluxioConfig,
+
+ http_client: Option<HttpClient>,
+}
+
+impl Debug for AlluxioBuilder {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ let mut d = f.debug_struct("AlluxioBuilder");
+
+ d.field("config", &self.config);
+ d.finish_non_exhaustive()
+ }
+}
+
+impl AlluxioBuilder {
+ /// Set root of this backend.
+ ///
+ /// All operations will happen under this root.
+ pub fn root(&mut self, root: &str) -> &mut Self {
+ self.config.root = if root.is_empty() {
+ None
+ } else {
+ Some(root.to_string())
+ };
+
+ self
+ }
+
+ /// endpoint of this backend.
+ ///
+ /// Endpoint must be full uri, mostly like `http://127.0.0.1:39999`.
+ pub fn endpoint(&mut self, endpoint: &str) -> &mut Self {
+ if !endpoint.is_empty() {
+ // Trim trailing `/` so that we can accept
`http://127.0.0.1:39999/`
+ self.config.endpoint =
Some(endpoint.trim_end_matches('/').to_string())
+ }
+
+ self
+ }
+
+ /// Specify the http client that used by this service.
+ ///
+ /// # Notes
+ ///
+ /// This API is part of OpenDAL's Raw API. `HttpClient` could be changed
+ /// during minor updates.
+ pub fn http_client(&mut self, client: HttpClient) -> &mut Self {
+ self.http_client = Some(client);
+ self
+ }
+}
+
+impl Builder for AlluxioBuilder {
+ const SCHEME: Scheme = Scheme::Alluxio;
+ type Accessor = AlluxioBackend;
+
+ /// Converts a HashMap into an AlluxioBuilder instance.
+ ///
+ /// # Arguments
+ ///
+ /// * `map` - A HashMap containing the configuration values.
+ ///
+ /// # Returns
+ ///
+ /// Returns an instance of AlluxioBuilder.
+ fn from_map(map: HashMap<String, String>) -> Self {
+ // Deserialize the configuration from the HashMap.
+ let config = AlluxioConfig::deserialize(ConfigDeserializer::new(map))
+ .expect("config deserialize must succeed");
+
+ // Create an AlluxioBuilder instance with the deserialized config.
+ AlluxioBuilder {
+ config,
+ http_client: None,
+ }
+ }
+
+ /// Builds the backend and returns the result of AlluxioBackend.
+ fn build(&mut self) -> Result<Self::Accessor> {
+ debug!("backend build started: {:?}", &self);
+
+ let root =
normalize_root(&self.config.root.clone().unwrap_or_default());
+ debug!("backend use root {}", &root);
+
+ let endpoint = match &self.config.endpoint {
+ Some(endpoint) => Ok(endpoint.clone()),
+ None => Err(Error::new(ErrorKind::ConfigInvalid, "endpoint is
empty")
+ .with_operation("Builder::build")
+ .with_context("service", Scheme::Azfile)),
+ }?;
+ debug!("backend use endpoint {}", &endpoint);
+
+ let client = if let Some(client) = self.http_client.take() {
+ client
+ } else {
+ HttpClient::new().map_err(|err| {
+ err.with_operation("Builder::build")
+ .with_context("service", Scheme::S3)
+ })?
+ };
+
+ Ok(AlluxioBackend {
+ core: Arc::new(AlluxioCore {
+ root,
+ endpoint,
+ client,
+ }),
+ })
+ }
+}
+
+#[derive(Debug, Clone)]
+pub struct AlluxioBackend {
+ core: Arc<AlluxioCore>,
+}
+
+#[async_trait]
+impl Accessor for AlluxioBackend {
+ type Reader = IncomingAsyncBody;
+ type BlockingReader = ();
+ type Writer = AlluxioWriters;
+ type BlockingWriter = ();
+ type Pager = AlluxioPager;
+ type BlockingPager = ();
+
+ fn info(&self) -> AccessorInfo {
+ let mut am = AccessorInfo::default();
+ am.set_scheme(Scheme::Alluxio)
+ .set_root(&self.core.root)
+ .set_native_capability(Capability {
+ stat: true,
+
+ read: true,
+
+ write: true,
+ /// https://github.com/Alluxio/alluxio/issues/8212
+ write_can_append: false,
+
+ create_dir: true,
+ delete: true,
+ rename: true,
+
+ list: true,
+ list_without_recursive: true,
+
+ ..Default::default()
+ });
+
+ am
+ }
+
+ async fn create_dir(&self, path: &str, _: OpCreateDir) ->
Result<RpCreateDir> {
+ self.core.create_dir(path).await?;
+ Ok(RpCreateDir::default())
+ }
+
+ async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead,
Self::Reader)> {
+ let stream_id = self.core.open_file(path).await?;
+
+ let resp = self.core.read(stream_id, args.range()).await?;
+
+ let size = parse_content_length(resp.headers())?;
+ Ok((RpRead::new().with_size(size), resp.into_body()))
+ }
+
+ async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite,
Self::Writer)> {
+ let w = AlluxioWriter::new(self.core.clone(), args.clone(),
path.to_string());
+ let w = OneShotWriter::new(w);
+
+ Ok((RpWrite::default(), w))
+ }
+
+ async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
+ let file_info = self.core.get_status(path).await?;
+
+ Ok(RpStat::new(file_info.try_into()?))
+ }
+
+ async fn rename(&self, from: &str, to: &str, _: OpRename) ->
Result<RpRename> {
+ self.core.rename(from, to).await?;
+
+ Ok(RpRename::default())
+ }
+
+ async fn delete(&self, path: &str, _: OpDelete) -> Result<RpDelete> {
+ self.core.delete(path).await?;
+
+ Ok(RpDelete::default())
+ }
+
+ async fn list(&self, path: &str, _args: OpList) -> Result<(RpList,
Self::Pager)> {
+ Ok((
+ RpList::default(),
+ AlluxioPager::new(self.core.clone(), path),
+ ))
+ }
+}
+
+#[cfg(test)]
+mod test {
+ use super::*;
+
+ #[test]
+ fn test_builder_from_map() {
+ let mut map = HashMap::new();
+ map.insert("root".to_string(), "/".to_string());
+ map.insert("endpoint".to_string(),
"http://127.0.0.1:39999".to_string());
+
+ let builder = AlluxioBuilder::from_map(map);
+
+ assert_eq!(builder.config.root, Some("/".to_string()));
+ assert_eq!(
+ builder.config.endpoint,
+ Some("http://127.0.0.1:39999".to_string())
+ );
+ }
+
+ #[test]
+ fn test_builder_build() {
+ let mut builder = AlluxioBuilder::default();
+ builder.root("/root").endpoint("http://127.0.0.1:39999");
+
+ let builder = builder.build();
+
+ assert!(builder.is_ok());
+ }
+}
diff --git a/core/src/services/alluxio/core.rs
b/core/src/services/alluxio/core.rs
new file mode 100644
index 000000000..b746e4b1a
--- /dev/null
+++ b/core/src/services/alluxio/core.rs
@@ -0,0 +1,346 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::fmt::Debug;
+use std::fmt::Formatter;
+
+use http::Request;
+
+use http::header::RANGE;
+use http::Response;
+use http::StatusCode;
+
+use serde::{Deserialize, Serialize};
+
+use crate::raw::*;
+use crate::*;
+
+use super::error::parse_error;
+
+#[derive(Debug, Serialize)]
+struct CreateFileRequest {
+ #[serde(skip_serializing_if = "Option::is_none")]
+ recursive: Option<bool>,
+}
+
+#[derive(Debug, Serialize)]
+struct CreateDirRequest {
+ #[serde(skip_serializing_if = "Option::is_none")]
+ recursive: Option<bool>,
+}
+
+/// Metadata of alluxio object
+#[derive(Debug, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub(super) struct FileInfo {
+ /// The path of the object
+ pub path: String,
+ /// The last modification time of the object
+ pub last_modification_time_ms: i64,
+ /// Whether the object is a folder
+ pub folder: bool,
+ /// The length of the object in bytes
+ pub length: u64,
+}
+
+impl TryFrom<FileInfo> for Metadata {
+ type Error = Error;
+
+ fn try_from(file_info: FileInfo) -> Result<Metadata> {
+ let mut metadata = if file_info.folder {
+ Metadata::new(EntryMode::DIR)
+ } else {
+ Metadata::new(EntryMode::FILE)
+ };
+ metadata
+ .set_content_length(file_info.length)
+ .set_last_modified(parse_datetime_from_from_timestamp_millis(
+ file_info.last_modification_time_ms,
+ )?);
+ Ok(metadata)
+ }
+}
+
+/// Alluxio core
+#[derive(Clone)]
+pub struct AlluxioCore {
+ /// root of this backend.
+ pub root: String,
+ /// endpoint of alluxio
+ pub endpoint: String,
+ /// prefix of alluxio
+ pub client: HttpClient,
+}
+
+impl Debug for AlluxioCore {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("Backend")
+ .field("root", &self.root)
+ .field("endpoint", &self.endpoint)
+ .finish_non_exhaustive()
+ }
+}
+
+impl AlluxioCore {
+ pub async fn create_dir(&self, path: &str) -> Result<()> {
+ let path = build_abs_path(&self.root, path);
+
+ let r = CreateDirRequest {
+ recursive: Some(true),
+ };
+
+ let body = serde_json::to_vec(&r).map_err(new_json_serialize_error)?;
+ let body = bytes::Bytes::from(body);
+ let mut req = Request::post(format!(
+ "{}/api/v1/paths//{}/create-directory",
+ self.endpoint, path
+ ));
+
+ req = req.header("Content-Type", "application/json");
+
+ let req = req
+ .body(AsyncBody::Bytes(body))
+ .map_err(new_request_build_error)?;
+
+ let resp = self.client.send(req).await?;
+
+ let status = resp.status();
+ match status {
+ StatusCode::OK => Ok(()),
+ _ => Err(parse_error(resp).await?),
+ }
+ }
+
+ pub async fn create_file(&self, path: &str) -> Result<u64> {
+ let path = build_abs_path(&self.root, path);
+
+ let r = CreateFileRequest {
+ recursive: Some(true),
+ };
+
+ let body = serde_json::to_vec(&r).map_err(new_json_serialize_error)?;
+ let body = bytes::Bytes::from(body);
+ let mut req = Request::post(format!(
+ "{}/api/v1/paths//{}/create-file",
+ self.endpoint, path
+ ));
+
+ req = req.header("Content-Type", "application/json");
+
+ let req = req
+ .body(AsyncBody::Bytes(body))
+ .map_err(new_request_build_error)?;
+
+ let resp = self.client.send(req).await?;
+ let status = resp.status();
+
+ match status {
+ StatusCode::OK => {
+ let body = resp.into_body().bytes().await?;
+ let steam_id: u64 =
+
serde_json::from_slice(&body).map_err(new_json_serialize_error)?;
+ Ok(steam_id)
+ }
+ _ => Err(parse_error(resp).await?),
+ }
+ }
+
+ pub(super) async fn open_file(&self, path: &str) -> Result<u64> {
+ let path = build_abs_path(&self.root, path);
+
+ let req = Request::post(format!(
+ "{}/api/v1/paths//{}/open-file",
+ self.endpoint, path
+ ));
+ let req = req
+ .body(AsyncBody::Empty)
+ .map_err(new_request_build_error)?;
+ let resp = self.client.send(req).await?;
+
+ let status = resp.status();
+
+ match status {
+ StatusCode::OK => {
+ let body = resp.into_body().bytes().await?;
+ let steam_id: u64 =
+
serde_json::from_slice(&body).map_err(new_json_serialize_error)?;
+ Ok(steam_id)
+ }
+ _ => Err(parse_error(resp).await?),
+ }
+ }
+
+ pub(super) async fn delete(&self, path: &str) -> Result<()> {
+ let path = build_abs_path(&self.root, path);
+
+ let req = Request::post(format!("{}/api/v1/paths//{}/delete",
self.endpoint, path));
+ let req = req
+ .body(AsyncBody::Empty)
+ .map_err(new_request_build_error)?;
+ let resp = self.client.send(req).await?;
+
+ let status = resp.status();
+
+ match status {
+ StatusCode::OK => Ok(()),
+ _ => Err(parse_error(resp).await?),
+ }
+ }
+
+ pub(super) async fn rename(&self, path: &str, dst: &str) -> Result<()> {
+ let path = build_abs_path(&self.root, path);
+
+ let req = Request::post(format!(
+ "{}/api/v1/paths//{}/rename?dst=/{}",
+ self.endpoint, path, dst
+ ));
+
+ let req = req
+ .body(AsyncBody::Empty)
+ .map_err(new_request_build_error)?;
+
+ let resp = self.client.send(req).await?;
+
+ let status = resp.status();
+
+ match status {
+ StatusCode::OK => Ok(()),
+ _ => Err(parse_error(resp).await?),
+ }
+ }
+
+ pub(super) async fn get_status(&self, path: &str) -> Result<FileInfo> {
+ let path = build_abs_path(&self.root, path);
+
+ let req = Request::post(format!(
+ "{}/api/v1/paths//{}/get-status",
+ self.endpoint, path
+ ));
+
+ let req = req
+ .body(AsyncBody::Empty)
+ .map_err(new_request_build_error)?;
+ let resp = self.client.send(req).await?;
+
+ let status = resp.status();
+
+ match status {
+ StatusCode::OK => {
+ let body = resp.into_body().bytes().await?;
+ let file_info: FileInfo =
+
serde_json::from_slice(&body).map_err(new_json_serialize_error)?;
+ Ok(file_info)
+ }
+ _ => Err(parse_error(resp).await?),
+ }
+ }
+
+ pub(super) async fn list_status(&self, path: &str) ->
Result<Vec<FileInfo>> {
+ let path = build_abs_path(&self.root, path);
+
+ let req = Request::post(format!(
+ "{}/api/v1/paths//{}/list-status",
+ self.endpoint, path
+ ));
+
+ let req = req
+ .body(AsyncBody::Empty)
+ .map_err(new_request_build_error)?;
+
+ let resp = self.client.send(req).await?;
+
+ let status = resp.status();
+
+ match status {
+ StatusCode::OK => {
+ let body = resp.into_body().bytes().await?;
+ let file_infos: Vec<FileInfo> =
+
serde_json::from_slice(&body).map_err(new_json_deserialize_error)?;
+ Ok(file_infos)
+ }
+ _ => Err(parse_error(resp).await?),
+ }
+ }
+
+ pub async fn read(
+ &self,
+ stream_id: u64,
+ range: BytesRange,
+ ) -> Result<Response<IncomingAsyncBody>> {
+ let mut req = Request::post(format!(
+ "{}/api/v1/streams/{}/read",
+ self.endpoint, stream_id
+ ));
+
+ if !range.is_full() {
+ // alluxio doesn't support read with suffix range.
+ if range.offset().is_none() && range.size().is_some() {
+ return Err(Error::new(
+ ErrorKind::Unsupported,
+ "azblob doesn't support read with suffix range",
+ ));
+ }
+
+ req = req.header(RANGE, range.to_header());
+ }
+
+ let req = req
+ .body(AsyncBody::Empty)
+ .map_err(new_request_build_error)?;
+
+ self.client.send(req).await
+ }
+
+ pub(super) async fn write(&self, stream_id: u64, body: AsyncBody) ->
Result<usize> {
+ let req = Request::post(format!(
+ "{}/api/v1/streams/{}/write",
+ self.endpoint, stream_id
+ ));
+ let req = req.body(body).map_err(new_request_build_error)?;
+
+ let resp = self.client.send(req).await?;
+
+ let status = resp.status();
+
+ match status {
+ StatusCode::OK => {
+ let body = resp.into_body().bytes().await?;
+ Ok(body.len())
+ }
+ _ => Err(parse_error(resp).await?),
+ }
+ }
+
+ pub(super) async fn close(&self, stream_id: u64) -> Result<()> {
+ let req = Request::post(format!(
+ "{}/api/v1/streams/{}/close",
+ self.endpoint, stream_id
+ ));
+ let req = req
+ .body(AsyncBody::Empty)
+ .map_err(new_request_build_error)?;
+
+ let resp = self.client.send(req).await?;
+
+ let status = resp.status();
+
+ match status {
+ StatusCode::OK => Ok(()),
+ _ => Err(parse_error(resp).await?),
+ }
+ }
+}
diff --git a/core/src/services/alluxio/docs.md
b/core/src/services/alluxio/docs.md
new file mode 100644
index 000000000..33775c226
--- /dev/null
+++ b/core/src/services/alluxio/docs.md
@@ -0,0 +1,47 @@
+## Capabilities
+
+This service can be used to:
+
+- [x] stat
+- [x] read
+- [x] write
+- [x] create_dir
+- [x] delete
+- [ ] copy
+- [x] rename
+- [x] list
+- [ ] scan
+- [ ] presign
+- [ ] blocking
+
+## Configuration
+
+- `root`: Set the work directory for backend
+- `endpoint`: Customizable endpoint setting
+
+You can refer to [`AlluxioBuilder`]'s docs for more information
+
+## Example
+
+### Via Builder
+
+```rust
+use anyhow::Result;
+use opendal::services::Alluxio;
+use opendal::Operator;
+
+#[tokio::main]
+async fn main() -> Result<()> {
+ // create backend builder
+ let mut builder = Alluxio::default();
+
+ // set the storage bucket for OpenDAL
+ builder.root("/");
+ // set the endpoint for OpenDAL
+ builder.endpoint("http://127.0.0.1:39999");
+
+ let op: Operator = Operator::new(builder)?.finish();
+
+ Ok(())
+}
+```
diff --git a/core/src/services/alluxio/error.rs
b/core/src/services/alluxio/error.rs
new file mode 100644
index 000000000..8a8941b10
--- /dev/null
+++ b/core/src/services/alluxio/error.rs
@@ -0,0 +1,85 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use bytes::Buf;
+use http::Response;
+use serde::Deserialize;
+
+use crate::raw::*;
+use crate::*;
+
+/// the error response of alluxio
+#[derive(Default, Debug, Deserialize)]
+#[serde(rename_all = "camelCase")]
+#[allow(dead_code)]
+struct AlluxioError {
+ status_code: String,
+ message: String,
+}
+
+pub async fn parse_error(resp: Response<IncomingAsyncBody>) -> Result<Error> {
+ let (parts, body) = resp.into_parts();
+ let bs = body.bytes().await?;
+
+ let mut kind = match parts.status.as_u16() {
+ 500 => ErrorKind::Unexpected,
+ _ => ErrorKind::Unexpected,
+ };
+
+ let (message, alluxio_err) = serde_json::from_reader::<_,
AlluxioError>(bs.clone().reader())
+ .map(|alluxio_err| (format!("{alluxio_err:?}"), Some(alluxio_err)))
+ .unwrap_or_else(|_| (String::from_utf8_lossy(&bs).into_owned(), None));
+
+ if let Some(alluxio_err) = alluxio_err {
+ kind = match alluxio_err.status_code.as_str() {
+ "AlreadyExists" => ErrorKind::AlreadyExists,
+ "NotFound" => ErrorKind::NotFound,
+ "InvalidArgument" => ErrorKind::InvalidInput,
+ _ => ErrorKind::Unexpected,
+ }
+ }
+
+ let mut err = Error::new(kind, &message);
+
+ err = with_error_response_context(err, parts);
+
+ Ok(err)
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ /// Error response example is from
https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html
+ #[test]
+ fn test_parse_error() {
+ let bs = bytes::Bytes::from(
+ r#"
+ {
+ "statusCode": "AlreadyExists",
+ "message": "The resource you requested already exist"
+ }
+"#,
+ );
+
+ let out: AlluxioError =
serde_json::from_reader(bs.reader()).expect("must success");
+ println!("{out:?}");
+
+ assert_eq!(out.status_code, "AlreadyExists");
+ assert_eq!(out.message, "The resource you requested already exist");
+ }
+}
diff --git a/core/src/services/alluxio/mod.rs b/core/src/services/alluxio/mod.rs
new file mode 100644
index 000000000..707df837a
--- /dev/null
+++ b/core/src/services/alluxio/mod.rs
@@ -0,0 +1,25 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+mod backend;
+pub use backend::AlluxioBuilder as Alluxio;
+pub use backend::AlluxioConfig;
+
+mod core;
+mod error;
+mod pager;
+mod writer;
diff --git a/core/src/services/alluxio/pager.rs
b/core/src/services/alluxio/pager.rs
new file mode 100644
index 000000000..148e01b79
--- /dev/null
+++ b/core/src/services/alluxio/pager.rs
@@ -0,0 +1,68 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use async_trait::async_trait;
+
+use super::core::AlluxioCore;
+use crate::raw::oio::Entry;
+use crate::raw::*;
+use crate::Result;
+
+pub struct AlluxioPager {
+ core: Arc<AlluxioCore>,
+
+ path: String,
+
+ done: bool,
+}
+
+impl AlluxioPager {
+ pub(super) fn new(core: Arc<AlluxioCore>, path: &str) -> Self {
+ AlluxioPager {
+ core,
+ path: path.to_string(),
+ done: false,
+ }
+ }
+}
+
+#[async_trait]
+impl oio::Page for AlluxioPager {
+ async fn next(&mut self) -> Result<Option<Vec<Entry>>> {
+ if self.done {
+ return Ok(None);
+ }
+
+ let file_infos = self.core.list_status(&self.path).await?;
+
+ let mut entries = vec![];
+ for file_info in file_infos {
+ let path: String = file_info.path.clone();
+ entries.push(Entry::new(&path, file_info.try_into()?));
+ }
+
+ if entries.is_empty() {
+ return Ok(None);
+ }
+
+ self.done = true;
+
+ Ok(Some(entries))
+ }
+}
diff --git a/core/src/services/alluxio/writer.rs
b/core/src/services/alluxio/writer.rs
new file mode 100644
index 000000000..4eb52ae90
--- /dev/null
+++ b/core/src/services/alluxio/writer.rs
@@ -0,0 +1,57 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use async_trait::async_trait;
+
+use crate::raw::*;
+use crate::Result;
+
+use super::core::AlluxioCore;
+
+pub type AlluxioWriters = oio::OneShotWriter<AlluxioWriter>;
+
+pub struct AlluxioWriter {
+ core: Arc<AlluxioCore>,
+
+ _op: OpWrite,
+ path: String,
+}
+
+impl AlluxioWriter {
+ pub fn new(core: Arc<AlluxioCore>, _op: OpWrite, path: String) -> Self {
+ AlluxioWriter { core, _op, path }
+ }
+}
+
+#[async_trait]
+impl oio::OneShotWrite for AlluxioWriter {
+ async fn write_once(&self, bs: &dyn oio::WriteBuf) -> Result<()> {
+ let bs =
oio::ChunkedBytes::from_vec(bs.vectored_bytes(bs.remaining()));
+
+ let stream_id = self.core.create_file(&self.path).await?;
+
+ self.core
+ .write(stream_id, AsyncBody::ChunkedBytes(bs))
+ .await?;
+
+ self.core.close(stream_id).await?;
+
+ Ok(())
+ }
+}
diff --git a/core/src/services/mod.rs b/core/src/services/mod.rs
index 8ed196f87..9ba0246ca 100644
--- a/core/src/services/mod.rs
+++ b/core/src/services/mod.rs
@@ -282,3 +282,10 @@ pub use self::dbfs::Dbfs;
mod swift;
#[cfg(feature = "services-swift")]
pub use self::swift::Swift;
+
+#[cfg(feature = "services-alluxio")]
+mod alluxio;
+#[cfg(feature = "services-alluxio")]
+pub use alluxio::Alluxio;
+#[cfg(feature = "services-alluxio")]
+pub use alluxio::AlluxioConfig;
diff --git a/core/src/types/scheme.rs b/core/src/types/scheme.rs
index ec582a637..224e15b7a 100644
--- a/core/src/types/scheme.rs
+++ b/core/src/types/scheme.rs
@@ -66,6 +66,8 @@ pub enum Scheme {
Hdfs,
/// [http][crate::services::Http]: HTTP backend.
Http,
+ /// [alluxio][created::services::Alluxio]: Alluxio services.
+ Alluxio,
/// [ipmfs][crate::services::Ipfs]: IPFS HTTP Gateway
Ipfs,
@@ -378,6 +380,7 @@ impl From<Scheme> for &'static str {
Scheme::Azfile => "azfile",
Scheme::Sqlite => "sqlite",
Scheme::Mongodb => "mongodb",
+ Scheme::Alluxio => "alluxio",
Scheme::Custom(v) => v,
}
}