This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new 2ca9a184c feat(service): add alluxio rest api support (#3564)
2ca9a184c is described below

commit 2ca9a184c697086b30300fc737e3a749008f3a36
Author: hoslo <[email protected]>
AuthorDate: Sat Nov 11 22:40:11 2023 +0800

    feat(service): add alluxio rest api support (#3564)
    
    feat(service): add alloxio rest api support
---
 core/Cargo.toml                      |   1 +
 core/src/services/alluxio/backend.rs | 293 +++++++++++++++++++++++++++++
 core/src/services/alluxio/core.rs    | 346 +++++++++++++++++++++++++++++++++++
 core/src/services/alluxio/docs.md    |  47 +++++
 core/src/services/alluxio/error.rs   |  85 +++++++++
 core/src/services/alluxio/mod.rs     |  25 +++
 core/src/services/alluxio/pager.rs   |  68 +++++++
 core/src/services/alluxio/writer.rs  |  57 ++++++
 core/src/services/mod.rs             |   7 +
 core/src/types/scheme.rs             |   3 +
 10 files changed, 932 insertions(+)

diff --git a/core/Cargo.toml b/core/Cargo.toml
index 06332d099..f8d61163a 100644
--- a/core/Cargo.toml
+++ b/core/Cargo.toml
@@ -148,6 +148,7 @@ services-gdrive = []
 services-ghac = []
 services-gridfs = ["dep:mongodb"]
 services-hdfs = ["dep:hdrs"]
+services-alluxio = []
 services-http = []
 services-ipfs = ["dep:prost"]
 services-ipmfs = []
diff --git a/core/src/services/alluxio/backend.rs 
b/core/src/services/alluxio/backend.rs
new file mode 100644
index 000000000..25046e068
--- /dev/null
+++ b/core/src/services/alluxio/backend.rs
@@ -0,0 +1,293 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+use std::fmt::Debug;
+use std::fmt::Formatter;
+use std::sync::Arc;
+
+use async_trait::async_trait;
+use log::debug;
+use serde::Deserialize;
+
+use crate::raw::oio::OneShotWriter;
+use crate::raw::*;
+use crate::*;
+
+use super::writer::AlluxioWriter;
+use super::writer::AlluxioWriters;
+use super::{core::AlluxioCore, pager::AlluxioPager};
+
+/// Config for alluxio services support.
+#[derive(Default, Deserialize)]
+#[serde(default)]
+#[non_exhaustive]
+pub struct AlluxioConfig {
+    /// root of this backend.
+    ///
+    /// All operations will happen under this root.
+    ///
+    /// default to `/` if not set.
+    pub root: Option<String>,
+    /// endpoint of this backend.
+    ///
+    /// Endpoint must be full uri, mostly like `http://127.0.0.1:39999`.
+    pub endpoint: Option<String>,
+}
+
+impl Debug for AlluxioConfig {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        let mut d = f.debug_struct("AlluxioConfig");
+
+        d.field("root", &self.root)
+            .field("endpoint", &self.endpoint);
+
+        d.finish_non_exhaustive()
+    }
+}
+
+/// [Alluxio](https://www.alluxio.io/) services support.
+#[doc = include_str!("docs.md")]
+#[derive(Default)]
+pub struct AlluxioBuilder {
+    config: AlluxioConfig,
+
+    http_client: Option<HttpClient>,
+}
+
+impl Debug for AlluxioBuilder {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        let mut d = f.debug_struct("AlluxioBuilder");
+
+        d.field("config", &self.config);
+        d.finish_non_exhaustive()
+    }
+}
+
+impl AlluxioBuilder {
+    /// Set root of this backend.
+    ///
+    /// All operations will happen under this root.
+    pub fn root(&mut self, root: &str) -> &mut Self {
+        self.config.root = if root.is_empty() {
+            None
+        } else {
+            Some(root.to_string())
+        };
+
+        self
+    }
+
+    /// endpoint of this backend.
+    ///
+    /// Endpoint must be full uri, mostly like `http://127.0.0.1:39999`.
+    pub fn endpoint(&mut self, endpoint: &str) -> &mut Self {
+        if !endpoint.is_empty() {
+            // Trim trailing `/` so that we can accept 
`http://127.0.0.1:39999/`
+            self.config.endpoint = 
Some(endpoint.trim_end_matches('/').to_string())
+        }
+
+        self
+    }
+
+    /// Specify the http client that used by this service.
+    ///
+    /// # Notes
+    ///
+    /// This API is part of OpenDAL's Raw API. `HttpClient` could be changed
+    /// during minor updates.
+    pub fn http_client(&mut self, client: HttpClient) -> &mut Self {
+        self.http_client = Some(client);
+        self
+    }
+}
+
+impl Builder for AlluxioBuilder {
+    const SCHEME: Scheme = Scheme::Alluxio;
+    type Accessor = AlluxioBackend;
+
+    /// Converts a HashMap into an AlluxioBuilder instance.
+    ///
+    /// # Arguments
+    ///
+    /// * `map` - A HashMap containing the configuration values.
+    ///
+    /// # Returns
+    ///
+    /// Returns an instance of AlluxioBuilder.
+    fn from_map(map: HashMap<String, String>) -> Self {
+        // Deserialize the configuration from the HashMap.
+        let config = AlluxioConfig::deserialize(ConfigDeserializer::new(map))
+            .expect("config deserialize must succeed");
+
+        // Create an AlluxioBuilder instance with the deserialized config.
+        AlluxioBuilder {
+            config,
+            http_client: None,
+        }
+    }
+
+    /// Builds the backend and returns the result of AlluxioBackend.
+    fn build(&mut self) -> Result<Self::Accessor> {
+        debug!("backend build started: {:?}", &self);
+
+        let root = 
normalize_root(&self.config.root.clone().unwrap_or_default());
+        debug!("backend use root {}", &root);
+
+        let endpoint = match &self.config.endpoint {
+            Some(endpoint) => Ok(endpoint.clone()),
+            None => Err(Error::new(ErrorKind::ConfigInvalid, "endpoint is 
empty")
+                .with_operation("Builder::build")
+                .with_context("service", Scheme::Azfile)),
+        }?;
+        debug!("backend use endpoint {}", &endpoint);
+
+        let client = if let Some(client) = self.http_client.take() {
+            client
+        } else {
+            HttpClient::new().map_err(|err| {
+                err.with_operation("Builder::build")
+                    .with_context("service", Scheme::S3)
+            })?
+        };
+
+        Ok(AlluxioBackend {
+            core: Arc::new(AlluxioCore {
+                root,
+                endpoint,
+                client,
+            }),
+        })
+    }
+}
+
+#[derive(Debug, Clone)]
+pub struct AlluxioBackend {
+    core: Arc<AlluxioCore>,
+}
+
+#[async_trait]
+impl Accessor for AlluxioBackend {
+    type Reader = IncomingAsyncBody;
+    type BlockingReader = ();
+    type Writer = AlluxioWriters;
+    type BlockingWriter = ();
+    type Pager = AlluxioPager;
+    type BlockingPager = ();
+
+    fn info(&self) -> AccessorInfo {
+        let mut am = AccessorInfo::default();
+        am.set_scheme(Scheme::Alluxio)
+            .set_root(&self.core.root)
+            .set_native_capability(Capability {
+                stat: true,
+
+                read: true,
+
+                write: true,
+                /// https://github.com/Alluxio/alluxio/issues/8212
+                write_can_append: false,
+
+                create_dir: true,
+                delete: true,
+                rename: true,
+
+                list: true,
+                list_without_recursive: true,
+
+                ..Default::default()
+            });
+
+        am
+    }
+
+    async fn create_dir(&self, path: &str, _: OpCreateDir) -> 
Result<RpCreateDir> {
+        self.core.create_dir(path).await?;
+        Ok(RpCreateDir::default())
+    }
+
+    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, 
Self::Reader)> {
+        let stream_id = self.core.open_file(path).await?;
+
+        let resp = self.core.read(stream_id, args.range()).await?;
+
+        let size = parse_content_length(resp.headers())?;
+        Ok((RpRead::new().with_size(size), resp.into_body()))
+    }
+
+    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
+        let w = AlluxioWriter::new(self.core.clone(), args.clone(), 
path.to_string());
+        let w = OneShotWriter::new(w);
+
+        Ok((RpWrite::default(), w))
+    }
+
+    async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
+        let file_info = self.core.get_status(path).await?;
+
+        Ok(RpStat::new(file_info.try_into()?))
+    }
+
+    async fn rename(&self, from: &str, to: &str, _: OpRename) -> 
Result<RpRename> {
+        self.core.rename(from, to).await?;
+
+        Ok(RpRename::default())
+    }
+
+    async fn delete(&self, path: &str, _: OpDelete) -> Result<RpDelete> {
+        self.core.delete(path).await?;
+
+        Ok(RpDelete::default())
+    }
+
+    async fn list(&self, path: &str, _args: OpList) -> Result<(RpList, 
Self::Pager)> {
+        Ok((
+            RpList::default(),
+            AlluxioPager::new(self.core.clone(), path),
+        ))
+    }
+}
+
+#[cfg(test)]
+mod test {
+    use super::*;
+
+    #[test]
+    fn test_builder_from_map() {
+        let mut map = HashMap::new();
+        map.insert("root".to_string(), "/".to_string());
+        map.insert("endpoint".to_string(), 
"http://127.0.0.1:39999".to_string());
+
+        let builder = AlluxioBuilder::from_map(map);
+
+        assert_eq!(builder.config.root, Some("/".to_string()));
+        assert_eq!(
+            builder.config.endpoint,
+            Some("http://127.0.0.1:39999".to_string())
+        );
+    }
+
+    #[test]
+    fn test_builder_build() {
+        let mut builder = AlluxioBuilder::default();
+        builder.root("/root").endpoint("http://127.0.0.1:39999";);
+
+        let builder = builder.build();
+
+        assert!(builder.is_ok());
+    }
+}
diff --git a/core/src/services/alluxio/core.rs 
b/core/src/services/alluxio/core.rs
new file mode 100644
index 000000000..b746e4b1a
--- /dev/null
+++ b/core/src/services/alluxio/core.rs
@@ -0,0 +1,346 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::fmt::Debug;
+use std::fmt::Formatter;
+
+use http::Request;
+
+use http::header::RANGE;
+use http::Response;
+use http::StatusCode;
+
+use serde::{Deserialize, Serialize};
+
+use crate::raw::*;
+use crate::*;
+
+use super::error::parse_error;
+
+#[derive(Debug, Serialize)]
+struct CreateFileRequest {
+    #[serde(skip_serializing_if = "Option::is_none")]
+    recursive: Option<bool>,
+}
+
+#[derive(Debug, Serialize)]
+struct CreateDirRequest {
+    #[serde(skip_serializing_if = "Option::is_none")]
+    recursive: Option<bool>,
+}
+
+/// Metadata of alluxio object
+#[derive(Debug, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub(super) struct FileInfo {
+    /// The path of the object
+    pub path: String,
+    /// The last modification time of the object
+    pub last_modification_time_ms: i64,
+    /// Whether the object is a folder
+    pub folder: bool,
+    /// The length of the object in bytes
+    pub length: u64,
+}
+
+impl TryFrom<FileInfo> for Metadata {
+    type Error = Error;
+
+    fn try_from(file_info: FileInfo) -> Result<Metadata> {
+        let mut metadata = if file_info.folder {
+            Metadata::new(EntryMode::DIR)
+        } else {
+            Metadata::new(EntryMode::FILE)
+        };
+        metadata
+            .set_content_length(file_info.length)
+            .set_last_modified(parse_datetime_from_from_timestamp_millis(
+                file_info.last_modification_time_ms,
+            )?);
+        Ok(metadata)
+    }
+}
+
+/// Alluxio core
+#[derive(Clone)]
+pub struct AlluxioCore {
+    /// root of this backend.
+    pub root: String,
+    /// endpoint of alluxio
+    pub endpoint: String,
+    /// prefix of alluxio
+    pub client: HttpClient,
+}
+
+impl Debug for AlluxioCore {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("Backend")
+            .field("root", &self.root)
+            .field("endpoint", &self.endpoint)
+            .finish_non_exhaustive()
+    }
+}
+
+impl AlluxioCore {
+    pub async fn create_dir(&self, path: &str) -> Result<()> {
+        let path = build_abs_path(&self.root, path);
+
+        let r = CreateDirRequest {
+            recursive: Some(true),
+        };
+
+        let body = serde_json::to_vec(&r).map_err(new_json_serialize_error)?;
+        let body = bytes::Bytes::from(body);
+        let mut req = Request::post(format!(
+            "{}/api/v1/paths//{}/create-directory",
+            self.endpoint, path
+        ));
+
+        req = req.header("Content-Type", "application/json");
+
+        let req = req
+            .body(AsyncBody::Bytes(body))
+            .map_err(new_request_build_error)?;
+
+        let resp = self.client.send(req).await?;
+
+        let status = resp.status();
+        match status {
+            StatusCode::OK => Ok(()),
+            _ => Err(parse_error(resp).await?),
+        }
+    }
+
+    pub async fn create_file(&self, path: &str) -> Result<u64> {
+        let path = build_abs_path(&self.root, path);
+
+        let r = CreateFileRequest {
+            recursive: Some(true),
+        };
+
+        let body = serde_json::to_vec(&r).map_err(new_json_serialize_error)?;
+        let body = bytes::Bytes::from(body);
+        let mut req = Request::post(format!(
+            "{}/api/v1/paths//{}/create-file",
+            self.endpoint, path
+        ));
+
+        req = req.header("Content-Type", "application/json");
+
+        let req = req
+            .body(AsyncBody::Bytes(body))
+            .map_err(new_request_build_error)?;
+
+        let resp = self.client.send(req).await?;
+        let status = resp.status();
+
+        match status {
+            StatusCode::OK => {
+                let body = resp.into_body().bytes().await?;
+                let steam_id: u64 =
+                    
serde_json::from_slice(&body).map_err(new_json_serialize_error)?;
+                Ok(steam_id)
+            }
+            _ => Err(parse_error(resp).await?),
+        }
+    }
+
+    pub(super) async fn open_file(&self, path: &str) -> Result<u64> {
+        let path = build_abs_path(&self.root, path);
+
+        let req = Request::post(format!(
+            "{}/api/v1/paths//{}/open-file",
+            self.endpoint, path
+        ));
+        let req = req
+            .body(AsyncBody::Empty)
+            .map_err(new_request_build_error)?;
+        let resp = self.client.send(req).await?;
+
+        let status = resp.status();
+
+        match status {
+            StatusCode::OK => {
+                let body = resp.into_body().bytes().await?;
+                let steam_id: u64 =
+                    
serde_json::from_slice(&body).map_err(new_json_serialize_error)?;
+                Ok(steam_id)
+            }
+            _ => Err(parse_error(resp).await?),
+        }
+    }
+
+    pub(super) async fn delete(&self, path: &str) -> Result<()> {
+        let path = build_abs_path(&self.root, path);
+
+        let req = Request::post(format!("{}/api/v1/paths//{}/delete", 
self.endpoint, path));
+        let req = req
+            .body(AsyncBody::Empty)
+            .map_err(new_request_build_error)?;
+        let resp = self.client.send(req).await?;
+
+        let status = resp.status();
+
+        match status {
+            StatusCode::OK => Ok(()),
+            _ => Err(parse_error(resp).await?),
+        }
+    }
+
+    pub(super) async fn rename(&self, path: &str, dst: &str) -> Result<()> {
+        let path = build_abs_path(&self.root, path);
+
+        let req = Request::post(format!(
+            "{}/api/v1/paths//{}/rename?dst=/{}",
+            self.endpoint, path, dst
+        ));
+
+        let req = req
+            .body(AsyncBody::Empty)
+            .map_err(new_request_build_error)?;
+
+        let resp = self.client.send(req).await?;
+
+        let status = resp.status();
+
+        match status {
+            StatusCode::OK => Ok(()),
+            _ => Err(parse_error(resp).await?),
+        }
+    }
+
+    pub(super) async fn get_status(&self, path: &str) -> Result<FileInfo> {
+        let path = build_abs_path(&self.root, path);
+
+        let req = Request::post(format!(
+            "{}/api/v1/paths//{}/get-status",
+            self.endpoint, path
+        ));
+
+        let req = req
+            .body(AsyncBody::Empty)
+            .map_err(new_request_build_error)?;
+        let resp = self.client.send(req).await?;
+
+        let status = resp.status();
+
+        match status {
+            StatusCode::OK => {
+                let body = resp.into_body().bytes().await?;
+                let file_info: FileInfo =
+                    
serde_json::from_slice(&body).map_err(new_json_serialize_error)?;
+                Ok(file_info)
+            }
+            _ => Err(parse_error(resp).await?),
+        }
+    }
+
+    pub(super) async fn list_status(&self, path: &str) -> 
Result<Vec<FileInfo>> {
+        let path = build_abs_path(&self.root, path);
+
+        let req = Request::post(format!(
+            "{}/api/v1/paths//{}/list-status",
+            self.endpoint, path
+        ));
+
+        let req = req
+            .body(AsyncBody::Empty)
+            .map_err(new_request_build_error)?;
+
+        let resp = self.client.send(req).await?;
+
+        let status = resp.status();
+
+        match status {
+            StatusCode::OK => {
+                let body = resp.into_body().bytes().await?;
+                let file_infos: Vec<FileInfo> =
+                    
serde_json::from_slice(&body).map_err(new_json_deserialize_error)?;
+                Ok(file_infos)
+            }
+            _ => Err(parse_error(resp).await?),
+        }
+    }
+
+    pub async fn read(
+        &self,
+        stream_id: u64,
+        range: BytesRange,
+    ) -> Result<Response<IncomingAsyncBody>> {
+        let mut req = Request::post(format!(
+            "{}/api/v1/streams/{}/read",
+            self.endpoint, stream_id
+        ));
+
+        if !range.is_full() {
+            // alluxio doesn't support read with suffix range.
+            if range.offset().is_none() && range.size().is_some() {
+                return Err(Error::new(
+                    ErrorKind::Unsupported,
+                    "azblob doesn't support read with suffix range",
+                ));
+            }
+
+            req = req.header(RANGE, range.to_header());
+        }
+
+        let req = req
+            .body(AsyncBody::Empty)
+            .map_err(new_request_build_error)?;
+
+        self.client.send(req).await
+    }
+
+    pub(super) async fn write(&self, stream_id: u64, body: AsyncBody) -> 
Result<usize> {
+        let req = Request::post(format!(
+            "{}/api/v1/streams/{}/write",
+            self.endpoint, stream_id
+        ));
+        let req = req.body(body).map_err(new_request_build_error)?;
+
+        let resp = self.client.send(req).await?;
+
+        let status = resp.status();
+
+        match status {
+            StatusCode::OK => {
+                let body = resp.into_body().bytes().await?;
+                Ok(body.len())
+            }
+            _ => Err(parse_error(resp).await?),
+        }
+    }
+
+    pub(super) async fn close(&self, stream_id: u64) -> Result<()> {
+        let req = Request::post(format!(
+            "{}/api/v1/streams/{}/close",
+            self.endpoint, stream_id
+        ));
+        let req = req
+            .body(AsyncBody::Empty)
+            .map_err(new_request_build_error)?;
+
+        let resp = self.client.send(req).await?;
+
+        let status = resp.status();
+
+        match status {
+            StatusCode::OK => Ok(()),
+            _ => Err(parse_error(resp).await?),
+        }
+    }
+}
diff --git a/core/src/services/alluxio/docs.md 
b/core/src/services/alluxio/docs.md
new file mode 100644
index 000000000..33775c226
--- /dev/null
+++ b/core/src/services/alluxio/docs.md
@@ -0,0 +1,47 @@
+## Capabilities
+
+This service can be used to:
+
+- [x] stat
+- [x] read
+- [x] write
+- [x] create_dir
+- [x] delete
+- [ ] copy
+- [x] rename
+- [x] list
+- [ ] scan
+- [ ] presign
+- [ ] blocking
+
+## Configuration
+
+- `root`: Set the work directory for backend
+- `endpoint`: Customizable endpoint setting
+
+You can refer to [`AlluxioBuilder`]'s docs for more information
+
+## Example
+
+### Via Builder
+
+```rust
+use anyhow::Result;
+use opendal::services::Alluxio;
+use opendal::Operator;
+
+#[tokio::main]
+async fn main() -> Result<()> {
+    // create backend builder
+    let mut builder = Alluxio::default();
+
+    // set the storage bucket for OpenDAL
+    builder.root("/");
+    // set the endpoint for OpenDAL
+    builder.endpoint("http://127.0.0.1:39999";);
+
+    let op: Operator = Operator::new(builder)?.finish();
+
+    Ok(())
+}
+```
diff --git a/core/src/services/alluxio/error.rs 
b/core/src/services/alluxio/error.rs
new file mode 100644
index 000000000..8a8941b10
--- /dev/null
+++ b/core/src/services/alluxio/error.rs
@@ -0,0 +1,85 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use bytes::Buf;
+use http::Response;
+use serde::Deserialize;
+
+use crate::raw::*;
+use crate::*;
+
+/// the error response of alluxio
+#[derive(Default, Debug, Deserialize)]
+#[serde(rename_all = "camelCase")]
+#[allow(dead_code)]
+struct AlluxioError {
+    status_code: String,
+    message: String,
+}
+
+pub async fn parse_error(resp: Response<IncomingAsyncBody>) -> Result<Error> {
+    let (parts, body) = resp.into_parts();
+    let bs = body.bytes().await?;
+
+    let mut kind = match parts.status.as_u16() {
+        500 => ErrorKind::Unexpected,
+        _ => ErrorKind::Unexpected,
+    };
+
+    let (message, alluxio_err) = serde_json::from_reader::<_, 
AlluxioError>(bs.clone().reader())
+        .map(|alluxio_err| (format!("{alluxio_err:?}"), Some(alluxio_err)))
+        .unwrap_or_else(|_| (String::from_utf8_lossy(&bs).into_owned(), None));
+
+    if let Some(alluxio_err) = alluxio_err {
+        kind = match alluxio_err.status_code.as_str() {
+            "AlreadyExists" => ErrorKind::AlreadyExists,
+            "NotFound" => ErrorKind::NotFound,
+            "InvalidArgument" => ErrorKind::InvalidInput,
+            _ => ErrorKind::Unexpected,
+        }
+    }
+
+    let mut err = Error::new(kind, &message);
+
+    err = with_error_response_context(err, parts);
+
+    Ok(err)
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    /// Error response example is from 
https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html
+    #[test]
+    fn test_parse_error() {
+        let bs = bytes::Bytes::from(
+            r#"
+            {
+                "statusCode": "AlreadyExists",
+                "message": "The resource you requested already exist"
+            }
+"#,
+        );
+
+        let out: AlluxioError = 
serde_json::from_reader(bs.reader()).expect("must success");
+        println!("{out:?}");
+
+        assert_eq!(out.status_code, "AlreadyExists");
+        assert_eq!(out.message, "The resource you requested already exist");
+    }
+}
diff --git a/core/src/services/alluxio/mod.rs b/core/src/services/alluxio/mod.rs
new file mode 100644
index 000000000..707df837a
--- /dev/null
+++ b/core/src/services/alluxio/mod.rs
@@ -0,0 +1,25 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+mod backend;
+pub use backend::AlluxioBuilder as Alluxio;
+pub use backend::AlluxioConfig;
+
+mod core;
+mod error;
+mod pager;
+mod writer;
diff --git a/core/src/services/alluxio/pager.rs 
b/core/src/services/alluxio/pager.rs
new file mode 100644
index 000000000..148e01b79
--- /dev/null
+++ b/core/src/services/alluxio/pager.rs
@@ -0,0 +1,68 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use async_trait::async_trait;
+
+use super::core::AlluxioCore;
+use crate::raw::oio::Entry;
+use crate::raw::*;
+use crate::Result;
+
+pub struct AlluxioPager {
+    core: Arc<AlluxioCore>,
+
+    path: String,
+
+    done: bool,
+}
+
+impl AlluxioPager {
+    pub(super) fn new(core: Arc<AlluxioCore>, path: &str) -> Self {
+        AlluxioPager {
+            core,
+            path: path.to_string(),
+            done: false,
+        }
+    }
+}
+
+#[async_trait]
+impl oio::Page for AlluxioPager {
+    async fn next(&mut self) -> Result<Option<Vec<Entry>>> {
+        if self.done {
+            return Ok(None);
+        }
+
+        let file_infos = self.core.list_status(&self.path).await?;
+
+        let mut entries = vec![];
+        for file_info in file_infos {
+            let path: String = file_info.path.clone();
+            entries.push(Entry::new(&path, file_info.try_into()?));
+        }
+
+        if entries.is_empty() {
+            return Ok(None);
+        }
+
+        self.done = true;
+
+        Ok(Some(entries))
+    }
+}
diff --git a/core/src/services/alluxio/writer.rs 
b/core/src/services/alluxio/writer.rs
new file mode 100644
index 000000000..4eb52ae90
--- /dev/null
+++ b/core/src/services/alluxio/writer.rs
@@ -0,0 +1,57 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use async_trait::async_trait;
+
+use crate::raw::*;
+use crate::Result;
+
+use super::core::AlluxioCore;
+
+pub type AlluxioWriters = oio::OneShotWriter<AlluxioWriter>;
+
+pub struct AlluxioWriter {
+    core: Arc<AlluxioCore>,
+
+    _op: OpWrite,
+    path: String,
+}
+
+impl AlluxioWriter {
+    pub fn new(core: Arc<AlluxioCore>, _op: OpWrite, path: String) -> Self {
+        AlluxioWriter { core, _op, path }
+    }
+}
+
+#[async_trait]
+impl oio::OneShotWrite for AlluxioWriter {
+    async fn write_once(&self, bs: &dyn oio::WriteBuf) -> Result<()> {
+        let bs = 
oio::ChunkedBytes::from_vec(bs.vectored_bytes(bs.remaining()));
+
+        let stream_id = self.core.create_file(&self.path).await?;
+
+        self.core
+            .write(stream_id, AsyncBody::ChunkedBytes(bs))
+            .await?;
+
+        self.core.close(stream_id).await?;
+
+        Ok(())
+    }
+}
diff --git a/core/src/services/mod.rs b/core/src/services/mod.rs
index 8ed196f87..9ba0246ca 100644
--- a/core/src/services/mod.rs
+++ b/core/src/services/mod.rs
@@ -282,3 +282,10 @@ pub use self::dbfs::Dbfs;
 mod swift;
 #[cfg(feature = "services-swift")]
 pub use self::swift::Swift;
+
+#[cfg(feature = "services-alluxio")]
+mod alluxio;
+#[cfg(feature = "services-alluxio")]
+pub use alluxio::Alluxio;
+#[cfg(feature = "services-alluxio")]
+pub use alluxio::AlluxioConfig;
diff --git a/core/src/types/scheme.rs b/core/src/types/scheme.rs
index ec582a637..224e15b7a 100644
--- a/core/src/types/scheme.rs
+++ b/core/src/types/scheme.rs
@@ -66,6 +66,8 @@ pub enum Scheme {
     Hdfs,
     /// [http][crate::services::Http]: HTTP backend.
     Http,
+    /// [alluxio][created::services::Alluxio]: Alluxio services.
+    Alluxio,
 
     /// [ipmfs][crate::services::Ipfs]: IPFS HTTP Gateway
     Ipfs,
@@ -378,6 +380,7 @@ impl From<Scheme> for &'static str {
             Scheme::Azfile => "azfile",
             Scheme::Sqlite => "sqlite",
             Scheme::Mongodb => "mongodb",
+            Scheme::Alluxio => "alluxio",
             Scheme::Custom(v) => v,
         }
     }

Reply via email to