Xuanwo commented on code in PR #3564:
URL: 
https://github.com/apache/incubator-opendal/pull/3564#discussion_r1390134682


##########
core/src/services/alluxio/core.rs:
##########
@@ -0,0 +1,407 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::fmt::Debug;
+use std::fmt::Formatter;
+
+use http::Request;
+
+use http::header::RANGE;
+use http::Response;
+
+use serde::{Deserialize, Serialize};
+
+use crate::raw::*;
+use crate::*;
+
+#[derive(Debug, Serialize)]
+#[serde(untagged)]
+enum Op {
+    CreateFile {
+        /// Whether to create file recursively
+        #[serde(skip_serializing_if = "Option::is_none")]
+        recursive: Option<bool>,
+    },
+    CreateDir {
+        /// Whether to create dir recursively
+        #[serde(skip_serializing_if = "Option::is_none")]
+        recursive: Option<bool>,
+    },
+    OpenFile,
+    Read,
+    Write,
+    Close,
+    Delete,
+    Rename,
+    ListStatus,
+    GetStatus,
+}
+
+/// Metadata of alluxio object
+#[derive(Debug, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub(super) struct FileInfo {
+    /// The path of the object
+    pub path: String,
+    /// The last modification time of the object
+    pub last_modification_time_ms: i64,
+    /// Whether the object is a folder
+    pub folder: bool,
+    /// The length of the object in bytes
+    pub length: u64,
+}
+
+impl TryFrom<FileInfo> for Metadata {
+    type Error = Error;
+
+    fn try_from(file_info: FileInfo) -> Result<Metadata> {
+        let mut metadata = if file_info.folder {
+            Metadata::new(EntryMode::DIR)
+        } else {
+            Metadata::new(EntryMode::FILE)
+        };
+        metadata
+            .set_content_length(file_info.length)
+            .set_last_modified(parse_datetime_from_from_timestamp_millis(
+                file_info.last_modification_time_ms,
+            )?);
+        Ok(metadata)
+    }
+}
+
+impl Op {
+    /// Get operation name
+    fn as_str(&self) -> &str {
+        match self {
+            Op::CreateFile { .. } => "create-file",
+            Op::CreateDir { .. } => "create-dir",
+            Op::OpenFile => "open-file",
+            Op::Read => "read",
+            Op::Write => "write",
+            Op::Close => "close",
+            Op::Delete => "delete",
+            Op::Rename => "rename",
+            Op::ListStatus => "list-status",
+            Op::GetStatus => "get-status",
+        }
+    }
+}
+
+/// the status code of alluxio
+#[derive(Debug, Deserialize)]
+#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
+enum StatusCode {
+    AlreadyExists,
+    NotFound,
+    Internal,
+}
+
+/// the error response of alluxio
+#[derive(Debug, Deserialize)]
+#[serde(rename_all = "camelCase")]
+struct ErrRsponse {
+    status_code: StatusCode,
+    message: String,
+}
+
+impl From<ErrRsponse> for Error {
+    fn from(value: ErrRsponse) -> Self {
+        match value.status_code {
+            StatusCode::AlreadyExists => Error::new(ErrorKind::AlreadyExists, 
&value.message),
+            StatusCode::NotFound => Error::new(ErrorKind::NotFound, 
&value.message),
+            StatusCode::Internal => Error::new(ErrorKind::Unexpected, 
&value.message),
+        }
+    }
+}
+
+/// Alluxio core
+#[derive(Clone)]
+pub struct AlluxioCore {
+    /// prefix of alluxio api, e.g. "api/v1"
+    pub api_prefix: String,
+    /// prefix of alluxio paths, e.g. "paths"
+    pub paths_prefix: String,
+    /// prefix of alluxio streams, e.g. "streams"
+    pub streams_prefix: String,
+
+    /// root of this backend.
+    pub root: String,
+    /// endpoint of alluxio
+    pub endpoint: String,
+    /// prefix of alluxio
+    pub client: HttpClient,
+}
+
+impl Debug for AlluxioCore {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("Backend")
+            .field("root", &self.root)
+            .field("endpoint", &self.endpoint)
+            .finish_non_exhaustive()
+    }
+}
+
+impl AlluxioCore {
+    /// join a alluxio path
+    fn join_path(&self, path: &str, op: &str) -> String {
+        let path = build_abs_path(&self.root, path);
+        format!(
+            "{}/{}/{}//{}/{}",

Review Comment:
   The `//` is expected?



##########
core/src/services/alluxio/core.rs:
##########
@@ -0,0 +1,407 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::fmt::Debug;
+use std::fmt::Formatter;
+
+use http::Request;
+
+use http::header::RANGE;
+use http::Response;
+
+use serde::{Deserialize, Serialize};
+
+use crate::raw::*;
+use crate::*;
+
+#[derive(Debug, Serialize)]
+#[serde(untagged)]
+enum Op {

Review Comment:
   The `Op` seems a bit over-designed to me. Maybe we can define 
`CreateFileRequest` directly?



##########
core/src/services/alluxio/backend.rs:
##########
@@ -0,0 +1,297 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+use std::fmt::Debug;
+use std::fmt::Formatter;
+use std::sync::Arc;
+
+use async_trait::async_trait;
+use log::debug;
+use serde::Deserialize;
+
+use crate::raw::oio::MultipartUploadWriter;
+use crate::raw::*;
+use crate::*;
+
+use super::writer::AlluxioWriter;
+use super::writer::AlluxioWriters;
+use super::{core::AlluxioCore, pager::AlluxioPager};
+
+/// Config for alluxio services support.
+#[derive(Default, Deserialize)]
+#[serde(default)]
+#[non_exhaustive]
+pub struct AlluxioConfig {
+    /// root of this backend.
+    ///
+    /// All operations will happen under this root.
+    ///
+    /// default to `/` if not set.
+    pub root: Option<String>,
+    /// endpoint of this backend.
+    ///
+    /// Endpoint must be full uri, mostly like `http://127.0.0.1:39999`.
+    pub endpoint: Option<String>,

Review Comment:
   Does Alluxio not require authentication to access its service?



##########
core/src/services/alluxio/writer.rs:
##########
@@ -0,0 +1,97 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use async_trait::async_trait;
+
+use crate::raw::*;
+use crate::*;
+
+use super::core::AlluxioCore;
+
+pub type AlluxioWriters = oio::MultipartUploadWriter<AlluxioWriter>;
+
+pub struct AlluxioWriter {
+    core: Arc<AlluxioCore>,
+
+    _op: OpWrite,
+    path: String,
+}
+
+impl AlluxioWriter {
+    pub fn new(core: Arc<AlluxioCore>, _op: OpWrite, path: String) -> Self {
+        AlluxioWriter { core, _op, path }
+    }
+}
+
+#[async_trait]
+impl oio::MultipartUploadWrite for AlluxioWriter {

Review Comment:
   This is not `MultipartUploadWrite`. Please take a look over services fs.



##########
Cargo.lock:
##########


Review Comment:
   It's better not to introduce unrelated to `Cargo.lock`.



##########
core/src/services/alluxio/backend.rs:
##########
@@ -0,0 +1,297 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+use std::fmt::Debug;
+use std::fmt::Formatter;
+use std::sync::Arc;
+
+use async_trait::async_trait;
+use log::debug;
+use serde::Deserialize;
+
+use crate::raw::oio::MultipartUploadWriter;
+use crate::raw::*;
+use crate::*;
+
+use super::writer::AlluxioWriter;
+use super::writer::AlluxioWriters;
+use super::{core::AlluxioCore, pager::AlluxioPager};
+
+/// Config for alluxio services support.
+#[derive(Default, Deserialize)]
+#[serde(default)]
+#[non_exhaustive]
+pub struct AlluxioConfig {
+    /// root of this backend.
+    ///
+    /// All operations will happen under this root.
+    ///
+    /// default to `/` if not set.
+    pub root: Option<String>,
+    /// endpoint of this backend.
+    ///
+    /// Endpoint must be full uri, mostly like `http://127.0.0.1:39999`.
+    pub endpoint: Option<String>,
+}
+
+impl Debug for AlluxioConfig {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        let mut d = f.debug_struct("AlluxioConfig");
+
+        d.field("root", &self.root)
+            .field("endpoint", &self.endpoint);
+
+        d.finish_non_exhaustive()
+    }
+}
+
+/// [Alluxio](https://www.alluxio.io/) services support.
+#[doc = include_str!("docs.md")]
+#[derive(Default)]
+pub struct AlluxioBuilder {
+    config: AlluxioConfig,
+
+    http_client: Option<HttpClient>,
+}
+
+impl Debug for AlluxioBuilder {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        let mut d = f.debug_struct("AlluxioBuilder");
+
+        d.field("config", &self.config);
+        d.finish_non_exhaustive()
+    }
+}
+
+impl AlluxioBuilder {
+    /// Set root of this backend.
+    ///
+    /// All operations will happen under this root.
+    pub fn root(&mut self, root: &str) -> &mut Self {
+        self.config.root = if root.is_empty() {
+            None
+        } else {
+            Some(root.to_string())
+        };
+
+        self
+    }
+
+    /// endpoint of this backend.
+    ///
+    /// Endpoint must be full uri, mostly like `http://127.0.0.1:39999`.
+    pub fn endpoint(&mut self, endpoint: &str) -> &mut Self {
+        if !endpoint.is_empty() {
+            // Trim trailing `/` so that we can accept 
`http://127.0.0.1:39999/`
+            self.config.endpoint = 
Some(endpoint.trim_end_matches('/').to_string())
+        }
+
+        self
+    }
+
+    /// Specify the http client that used by this service.
+    ///
+    /// # Notes
+    ///
+    /// This API is part of OpenDAL's Raw API. `HttpClient` could be changed
+    /// during minor updates.
+    pub fn http_client(&mut self, client: HttpClient) -> &mut Self {
+        self.http_client = Some(client);
+        self
+    }
+}
+
+impl Builder for AlluxioBuilder {
+    const SCHEME: Scheme = Scheme::Alluxio;
+    type Accessor = AlluxioBackend;
+
+    /// Converts a HashMap into an AlluxioBuilder instance.
+    ///
+    /// # Arguments
+    ///
+    /// * `map` - A HashMap containing the configuration values.
+    ///
+    /// # Returns
+    ///
+    /// Returns an instance of AlluxioBuilder.
+    fn from_map(map: HashMap<String, String>) -> Self {
+        // Deserialize the configuration from the HashMap.
+        let config = AlluxioConfig::deserialize(ConfigDeserializer::new(map))
+            .expect("config deserialize must succeed");
+
+        // Create an AlluxioBuilder instance with the deserialized config.
+        AlluxioBuilder {
+            config,
+            http_client: None,
+        }
+    }
+
+    /// Builds the backend and returns the result of AlluxioBackend.
+    fn build(&mut self) -> Result<Self::Accessor> {
+        debug!("backend build started: {:?}", &self);
+
+        let root = 
normalize_root(&self.config.root.clone().unwrap_or_default());
+        debug!("backend use root {}", &root);
+
+        let endpoint = match &self.config.endpoint {
+            Some(endpoint) => Ok(endpoint.clone()),
+            None => Err(Error::new(ErrorKind::ConfigInvalid, "endpoint is 
empty")
+                .with_operation("Builder::build")
+                .with_context("service", Scheme::Azfile)),
+        }?;
+        debug!("backend use endpoint {}", &endpoint);
+
+        let client = if let Some(client) = self.http_client.take() {
+            client
+        } else {
+            HttpClient::new().map_err(|err| {
+                err.with_operation("Builder::build")
+                    .with_context("service", Scheme::S3)
+            })?
+        };
+
+        Ok(AlluxioBackend {
+            core: Arc::new(AlluxioCore {
+                api_prefix: "api/v1".to_string(),

Review Comment:
   Seems we can store as `&'static str` if they never changed during runtime.



##########
core/src/services/alluxio/pager.rs:
##########
@@ -0,0 +1,68 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use async_trait::async_trait;
+
+use super::core::AlluxioCore;
+use crate::raw::oio::Entry;
+use crate::raw::*;
+use crate::Result;
+
+pub struct AlluxioPager {
+    core: Arc<AlluxioCore>,
+
+    path: String,
+
+    done: bool,
+}
+
+impl AlluxioPager {
+    pub(super) fn new(core: Arc<AlluxioCore>, path: &str) -> Self {
+        AlluxioPager {
+            core,
+            path: path.to_string(),
+            done: false,
+        }
+    }
+}
+
+#[async_trait]
+impl oio::Page for AlluxioPager {
+    async fn next(&mut self) -> Result<Option<Vec<Entry>>> {
+        if self.done {
+            return Ok(None);
+        }
+
+        let file_infos = self.core.list_status(&self.path).await?;

Review Comment:
   list_status will always list all files?



##########
core/src/services/alluxio/core.rs:
##########
@@ -0,0 +1,407 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::fmt::Debug;
+use std::fmt::Formatter;
+
+use http::Request;
+
+use http::header::RANGE;
+use http::Response;
+
+use serde::{Deserialize, Serialize};
+
+use crate::raw::*;
+use crate::*;
+
+#[derive(Debug, Serialize)]
+#[serde(untagged)]
+enum Op {
+    CreateFile {
+        /// Whether to create file recursively
+        #[serde(skip_serializing_if = "Option::is_none")]
+        recursive: Option<bool>,
+    },
+    CreateDir {
+        /// Whether to create dir recursively
+        #[serde(skip_serializing_if = "Option::is_none")]
+        recursive: Option<bool>,
+    },
+    OpenFile,
+    Read,
+    Write,
+    Close,
+    Delete,
+    Rename,
+    ListStatus,
+    GetStatus,
+}
+
+/// Metadata of alluxio object
+#[derive(Debug, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub(super) struct FileInfo {
+    /// The path of the object
+    pub path: String,
+    /// The last modification time of the object
+    pub last_modification_time_ms: i64,
+    /// Whether the object is a folder
+    pub folder: bool,
+    /// The length of the object in bytes
+    pub length: u64,
+}
+
+impl TryFrom<FileInfo> for Metadata {
+    type Error = Error;
+
+    fn try_from(file_info: FileInfo) -> Result<Metadata> {
+        let mut metadata = if file_info.folder {
+            Metadata::new(EntryMode::DIR)
+        } else {
+            Metadata::new(EntryMode::FILE)
+        };
+        metadata
+            .set_content_length(file_info.length)
+            .set_last_modified(parse_datetime_from_from_timestamp_millis(
+                file_info.last_modification_time_ms,
+            )?);
+        Ok(metadata)
+    }
+}
+
+impl Op {
+    /// Get operation name
+    fn as_str(&self) -> &str {
+        match self {
+            Op::CreateFile { .. } => "create-file",
+            Op::CreateDir { .. } => "create-dir",
+            Op::OpenFile => "open-file",
+            Op::Read => "read",
+            Op::Write => "write",
+            Op::Close => "close",
+            Op::Delete => "delete",
+            Op::Rename => "rename",
+            Op::ListStatus => "list-status",
+            Op::GetStatus => "get-status",
+        }
+    }
+}
+
+/// the status code of alluxio
+#[derive(Debug, Deserialize)]
+#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
+enum StatusCode {
+    AlreadyExists,
+    NotFound,
+    Internal,
+}
+
+/// the error response of alluxio
+#[derive(Debug, Deserialize)]
+#[serde(rename_all = "camelCase")]
+struct ErrRsponse {
+    status_code: StatusCode,
+    message: String,
+}
+
+impl From<ErrRsponse> for Error {

Review Comment:
   Please don't impl `From<ErrRsponse> for Error` which could leak error 
unexpectedly. Please use function like `parse_error_response(ErrResponse) -> 
Error`.



##########
core/src/services/alluxio/core.rs:
##########
@@ -0,0 +1,407 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::fmt::Debug;
+use std::fmt::Formatter;
+
+use http::Request;
+
+use http::header::RANGE;
+use http::Response;
+
+use serde::{Deserialize, Serialize};
+
+use crate::raw::*;
+use crate::*;
+
+#[derive(Debug, Serialize)]
+#[serde(untagged)]
+enum Op {
+    CreateFile {
+        /// Whether to create file recursively
+        #[serde(skip_serializing_if = "Option::is_none")]
+        recursive: Option<bool>,
+    },
+    CreateDir {
+        /// Whether to create dir recursively
+        #[serde(skip_serializing_if = "Option::is_none")]
+        recursive: Option<bool>,
+    },
+    OpenFile,
+    Read,
+    Write,
+    Close,
+    Delete,
+    Rename,
+    ListStatus,
+    GetStatus,
+}
+
+/// Metadata of alluxio object
+#[derive(Debug, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub(super) struct FileInfo {
+    /// The path of the object
+    pub path: String,
+    /// The last modification time of the object
+    pub last_modification_time_ms: i64,
+    /// Whether the object is a folder
+    pub folder: bool,
+    /// The length of the object in bytes
+    pub length: u64,
+}
+
+impl TryFrom<FileInfo> for Metadata {
+    type Error = Error;
+
+    fn try_from(file_info: FileInfo) -> Result<Metadata> {
+        let mut metadata = if file_info.folder {
+            Metadata::new(EntryMode::DIR)
+        } else {
+            Metadata::new(EntryMode::FILE)
+        };
+        metadata
+            .set_content_length(file_info.length)
+            .set_last_modified(parse_datetime_from_from_timestamp_millis(
+                file_info.last_modification_time_ms,
+            )?);
+        Ok(metadata)
+    }
+}
+
+impl Op {
+    /// Get operation name
+    fn as_str(&self) -> &str {
+        match self {
+            Op::CreateFile { .. } => "create-file",
+            Op::CreateDir { .. } => "create-dir",
+            Op::OpenFile => "open-file",
+            Op::Read => "read",
+            Op::Write => "write",
+            Op::Close => "close",
+            Op::Delete => "delete",
+            Op::Rename => "rename",
+            Op::ListStatus => "list-status",
+            Op::GetStatus => "get-status",
+        }
+    }
+}
+
+/// the status code of alluxio
+#[derive(Debug, Deserialize)]
+#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
+enum StatusCode {
+    AlreadyExists,
+    NotFound,
+    Internal,
+}
+
+/// the error response of alluxio
+#[derive(Debug, Deserialize)]
+#[serde(rename_all = "camelCase")]
+struct ErrRsponse {
+    status_code: StatusCode,
+    message: String,
+}
+
+impl From<ErrRsponse> for Error {
+    fn from(value: ErrRsponse) -> Self {
+        match value.status_code {
+            StatusCode::AlreadyExists => Error::new(ErrorKind::AlreadyExists, 
&value.message),
+            StatusCode::NotFound => Error::new(ErrorKind::NotFound, 
&value.message),
+            StatusCode::Internal => Error::new(ErrorKind::Unexpected, 
&value.message),
+        }
+    }
+}
+
+/// Alluxio core
+#[derive(Clone)]
+pub struct AlluxioCore {
+    /// prefix of alluxio api, e.g. "api/v1"
+    pub api_prefix: String,
+    /// prefix of alluxio paths, e.g. "paths"
+    pub paths_prefix: String,
+    /// prefix of alluxio streams, e.g. "streams"
+    pub streams_prefix: String,
+
+    /// root of this backend.
+    pub root: String,
+    /// endpoint of alluxio
+    pub endpoint: String,
+    /// prefix of alluxio
+    pub client: HttpClient,
+}
+
+impl Debug for AlluxioCore {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("Backend")
+            .field("root", &self.root)
+            .field("endpoint", &self.endpoint)
+            .finish_non_exhaustive()
+    }
+}
+
+impl AlluxioCore {
+    /// join a alluxio path
+    fn join_path(&self, path: &str, op: &str) -> String {
+        let path = build_abs_path(&self.root, path);
+        format!(
+            "{}/{}/{}//{}/{}",
+            self.endpoint, self.api_prefix, self.paths_prefix, path, op
+        )
+    }
+
+    /// join a alluxio stream
+    fn join_stream(&self, id: u64, op: &str) -> String {
+        format!(
+            "{}/{}/{}/{}/{}",
+            self.endpoint, self.api_prefix, self.streams_prefix, id, op
+        )
+    }
+
+    pub async fn create_dir(&self, path: &str) -> Result<()> {
+        let op = Op::CreateDir {
+            recursive: Some(true),
+        };
+
+        let body = serde_json::to_vec(&op).map_err(new_json_serialize_error)?;
+        let body = bytes::Bytes::from(body);
+        let req = Request::post(self.join_path(path, op.as_str()));
+        let req = req
+            .body(AsyncBody::Bytes(body))
+            .map_err(new_request_build_error)?;
+        let resp = self.client.send(req).await?;
+
+        let status = resp.status();
+        let body = resp.into_body().bytes().await?;
+        if status.is_success() {
+            return Ok(());
+        }
+        let err_response: ErrRsponse =
+            serde_json::from_slice(&body).map_err(new_json_serialize_error)?;
+        Err(err_response.into())
+    }
+
+    pub async fn create_file(&self, path: &str) -> Result<u64> {
+        let op = Op::CreateFile {
+            recursive: Some(true),
+        };
+
+        let body = serde_json::to_vec(&op).map_err(new_json_serialize_error)?;
+        let body = bytes::Bytes::from(body);
+        let mut req = Request::post(self.join_path(path, op.as_str()));

Review Comment:
   How about write the url directly for better understanding?



##########
core/src/services/alluxio/backend.rs:
##########
@@ -0,0 +1,296 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+use std::fmt::Debug;
+use std::fmt::Formatter;
+use std::sync::Arc;
+
+use async_trait::async_trait;
+use log::debug;
+use serde::Deserialize;
+
+use crate::raw::oio::MultipartUploadWriter;
+use crate::raw::*;
+use crate::*;
+
+use super::writer::AlluxioWriter;
+use super::writer::AlluxioWriters;
+use super::{core::AlluxioCore, pager::AlluxioPager};
+
+/// Config for alluxio services support.
+#[derive(Default, Deserialize)]
+#[serde(default)]
+#[non_exhaustive]
+pub struct AlluxioConfig {
+    /// root of this backend.
+    ///
+    /// All operations will happen under this root.
+    ///
+    /// default to `/` if not set.
+    pub root: Option<String>,
+    /// endpoint of this backend.
+    ///
+    /// Endpoint must be full uri, mostly like `http://127.0.0.1:39999`.
+    pub endpoint: Option<String>,
+}
+
+impl Debug for AlluxioConfig {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        let mut d = f.debug_struct("AlluxioConfig");
+
+        d.field("root", &self.root)
+            .field("endpoint", &self.endpoint);
+
+        d.finish_non_exhaustive()
+    }
+}
+
+/// [Alluxio](https://www.alluxio.io/) services support.
+#[doc = include_str!("docs.md")]
+#[derive(Default)]
+pub struct AlluxioBuilder {
+    config: AlluxioConfig,
+
+    http_client: Option<HttpClient>,
+}
+
+impl Debug for AlluxioBuilder {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        let mut d = f.debug_struct("AlluxioBuilder");
+
+        d.field("config", &self.config);
+        d.finish_non_exhaustive()
+    }
+}
+
+impl AlluxioBuilder {
+    /// Set root of this backend.
+    ///
+    /// All operations will happen under this root.
+    pub fn root(&mut self, root: &str) -> &mut Self {
+        self.config.root = if root.is_empty() {
+            None
+        } else {
+            Some(root.to_string())
+        };
+
+        self
+    }
+
+    /// endpoint of this backend.
+    ///
+    /// Endpoint must be full uri, mostly like `http://127.0.0.1:39999`.
+    pub fn endpoint(&mut self, endpoint: &str) -> &mut Self {
+        if !endpoint.is_empty() {
+            // Trim trailing `/` so that we can accept 
`http://127.0.0.1:39999/`
+            self.config.endpoint = 
Some(endpoint.trim_end_matches('/').to_string())
+        }
+
+        self
+    }
+
+    /// Specify the http client that used by this service.
+    ///
+    /// # Notes
+    ///
+    /// This API is part of OpenDAL's Raw API. `HttpClient` could be changed
+    /// during minor updates.
+    pub fn http_client(&mut self, client: HttpClient) -> &mut Self {
+        self.http_client = Some(client);
+        self
+    }
+}
+
+impl Builder for AlluxioBuilder {
+    const SCHEME: Scheme = Scheme::Alluxio;
+    type Accessor = AlluxioBackend;
+
+    /// Converts a HashMap into an AlluxioBuilder instance.
+    ///
+    /// # Arguments
+    ///
+    /// * `map` - A HashMap containing the configuration values.
+    ///
+    /// # Returns
+    ///
+    /// Returns an instance of AlluxioBuilder.
+    fn from_map(map: HashMap<String, String>) -> Self {
+        // Deserialize the configuration from the HashMap.
+        let config = AlluxioConfig::deserialize(ConfigDeserializer::new(map))
+            .expect("config deserialize must succeed");
+
+        // Create an AlluxioBuilder instance with the deserialized config.
+        AlluxioBuilder {
+            config,
+            http_client: None,
+        }
+    }
+
+    /// Builds the backend and returns the result of AlluxioBackend.
+    fn build(&mut self) -> Result<Self::Accessor> {
+        debug!("backend build started: {:?}", &self);
+
+        let root = 
normalize_root(&self.config.root.clone().unwrap_or_default());
+        debug!("backend use root {}", &root);
+
+        let endpoint = match &self.config.endpoint {
+            Some(endpoint) => Ok(endpoint.clone()),
+            None => Err(Error::new(ErrorKind::ConfigInvalid, "endpoint is 
empty")
+                .with_operation("Builder::build")
+                .with_context("service", Scheme::Azfile)),
+        }?;
+        debug!("backend use endpoint {}", &endpoint);
+
+        let client = if let Some(client) = self.http_client.take() {
+            client
+        } else {
+            HttpClient::new().map_err(|err| {
+                err.with_operation("Builder::build")
+                    .with_context("service", Scheme::S3)
+            })?
+        };
+
+        Ok(AlluxioBackend {
+            core: Arc::new(AlluxioCore {
+                api_prefix: "api/v1".to_string(),
+                paths_prefix: "paths".to_string(),
+                streams_prefix: "streams".to_string(),
+
+                root,
+                endpoint,
+                client,
+            }),
+        })
+    }
+}
+
+#[derive(Debug, Clone)]
+pub struct AlluxioBackend {
+    core: Arc<AlluxioCore>,
+}
+
+#[async_trait]
+impl Accessor for AlluxioBackend {
+    type Reader = IncomingAsyncBody;
+    type BlockingReader = ();
+    type Writer = AlluxioWriters;
+    type BlockingWriter = ();
+    type Pager = AlluxioPager;
+    type BlockingPager = ();
+
+    fn info(&self) -> AccessorInfo {
+        let mut am = AccessorInfo::default();
+        am.set_scheme(Scheme::Alluxio)
+            .set_root(&self.core.root)
+            .set_native_capability(Capability {
+                stat: true,
+
+                read: true,
+
+                write: true,
+                /// https://github.com/Alluxio/alluxio/issues/8212
+                write_can_append: false,
+
+                create_dir: true,
+                delete: true,
+                rename: true,
+
+                list: true,
+
+                ..Default::default()
+            });
+
+        am
+    }
+
+    async fn create_dir(&self, path: &str, _: OpCreateDir) -> 
Result<RpCreateDir> {
+        self.core.create_dir(path).await?;
+        Ok(RpCreateDir::default())
+    }
+
+    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, 
Self::Reader)> {
+        let stream_id = self.core.open_file(path).await?;
+        debug!("stream_id: {}", stream_id);

Review Comment:
   Please don't add debug log for service internal operations.



##########
core/src/services/alluxio/core.rs:
##########
@@ -0,0 +1,407 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::fmt::Debug;
+use std::fmt::Formatter;
+
+use http::Request;
+
+use http::header::RANGE;
+use http::Response;
+
+use serde::{Deserialize, Serialize};
+
+use crate::raw::*;
+use crate::*;
+
+#[derive(Debug, Serialize)]
+#[serde(untagged)]
+enum Op {
+    CreateFile {
+        /// Whether to create file recursively
+        #[serde(skip_serializing_if = "Option::is_none")]
+        recursive: Option<bool>,
+    },
+    CreateDir {
+        /// Whether to create dir recursively
+        #[serde(skip_serializing_if = "Option::is_none")]
+        recursive: Option<bool>,
+    },
+    OpenFile,
+    Read,
+    Write,
+    Close,
+    Delete,
+    Rename,
+    ListStatus,
+    GetStatus,
+}
+
+/// Metadata of alluxio object
+#[derive(Debug, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub(super) struct FileInfo {
+    /// The path of the object
+    pub path: String,
+    /// The last modification time of the object
+    pub last_modification_time_ms: i64,
+    /// Whether the object is a folder
+    pub folder: bool,
+    /// The length of the object in bytes
+    pub length: u64,
+}
+
+impl TryFrom<FileInfo> for Metadata {
+    type Error = Error;
+
+    fn try_from(file_info: FileInfo) -> Result<Metadata> {
+        let mut metadata = if file_info.folder {
+            Metadata::new(EntryMode::DIR)
+        } else {
+            Metadata::new(EntryMode::FILE)
+        };
+        metadata
+            .set_content_length(file_info.length)
+            .set_last_modified(parse_datetime_from_from_timestamp_millis(
+                file_info.last_modification_time_ms,
+            )?);
+        Ok(metadata)
+    }
+}
+
+impl Op {
+    /// Get operation name
+    fn as_str(&self) -> &str {
+        match self {
+            Op::CreateFile { .. } => "create-file",
+            Op::CreateDir { .. } => "create-dir",
+            Op::OpenFile => "open-file",
+            Op::Read => "read",
+            Op::Write => "write",
+            Op::Close => "close",
+            Op::Delete => "delete",
+            Op::Rename => "rename",
+            Op::ListStatus => "list-status",
+            Op::GetStatus => "get-status",
+        }
+    }
+}
+
+/// the status code of alluxio
+#[derive(Debug, Deserialize)]
+#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
+enum StatusCode {
+    AlreadyExists,
+    NotFound,
+    Internal,
+}
+
+/// the error response of alluxio
+#[derive(Debug, Deserialize)]
+#[serde(rename_all = "camelCase")]
+struct ErrRsponse {
+    status_code: StatusCode,
+    message: String,
+}
+
+impl From<ErrRsponse> for Error {
+    fn from(value: ErrRsponse) -> Self {
+        match value.status_code {
+            StatusCode::AlreadyExists => Error::new(ErrorKind::AlreadyExists, 
&value.message),
+            StatusCode::NotFound => Error::new(ErrorKind::NotFound, 
&value.message),
+            StatusCode::Internal => Error::new(ErrorKind::Unexpected, 
&value.message),
+        }
+    }
+}
+
+/// Alluxio core
+#[derive(Clone)]
+pub struct AlluxioCore {
+    /// prefix of alluxio api, e.g. "api/v1"
+    pub api_prefix: String,
+    /// prefix of alluxio paths, e.g. "paths"
+    pub paths_prefix: String,
+    /// prefix of alluxio streams, e.g. "streams"
+    pub streams_prefix: String,
+
+    /// root of this backend.
+    pub root: String,
+    /// endpoint of alluxio
+    pub endpoint: String,
+    /// prefix of alluxio
+    pub client: HttpClient,
+}
+
+impl Debug for AlluxioCore {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("Backend")
+            .field("root", &self.root)
+            .field("endpoint", &self.endpoint)
+            .finish_non_exhaustive()
+    }
+}
+
+impl AlluxioCore {
+    /// join a alluxio path
+    fn join_path(&self, path: &str, op: &str) -> String {
+        let path = build_abs_path(&self.root, path);
+        format!(
+            "{}/{}/{}//{}/{}",
+            self.endpoint, self.api_prefix, self.paths_prefix, path, op
+        )
+    }
+
+    /// join a alluxio stream
+    fn join_stream(&self, id: u64, op: &str) -> String {
+        format!(
+            "{}/{}/{}/{}/{}",
+            self.endpoint, self.api_prefix, self.streams_prefix, id, op
+        )
+    }
+
+    pub async fn create_dir(&self, path: &str) -> Result<()> {
+        let op = Op::CreateDir {
+            recursive: Some(true),
+        };
+
+        let body = serde_json::to_vec(&op).map_err(new_json_serialize_error)?;
+        let body = bytes::Bytes::from(body);
+        let req = Request::post(self.join_path(path, op.as_str()));
+        let req = req
+            .body(AsyncBody::Bytes(body))
+            .map_err(new_request_build_error)?;
+        let resp = self.client.send(req).await?;
+
+        let status = resp.status();
+        let body = resp.into_body().bytes().await?;
+        if status.is_success() {
+            return Ok(());
+        }
+        let err_response: ErrRsponse =
+            serde_json::from_slice(&body).map_err(new_json_serialize_error)?;
+        Err(err_response.into())
+    }
+
+    pub async fn create_file(&self, path: &str) -> Result<u64> {
+        let op = Op::CreateFile {
+            recursive: Some(true),
+        };
+
+        let body = serde_json::to_vec(&op).map_err(new_json_serialize_error)?;
+        let body = bytes::Bytes::from(body);
+        let mut req = Request::post(self.join_path(path, op.as_str()));
+        req = req.header("Content-Type", "application/json");
+
+        let req = req
+            .body(AsyncBody::Bytes(body.clone()))
+            .map_err(new_request_build_error)?;
+
+        let resp = self.client.send(req).await?;
+
+        let status = resp.status();
+        let body = resp.into_body().bytes().await?;
+
+        if status.is_success() {
+            let steam_id: u64 = 
serde_json::from_slice(&body).map_err(new_json_serialize_error)?;
+            return Ok(steam_id);
+        }
+        let err_response: ErrRsponse =
+            serde_json::from_slice(&body).map_err(new_json_serialize_error)?;
+        Err(err_response.into())
+    }
+
+    pub(super) async fn open_file(&self, path: &str) -> Result<u64> {
+        let op = Op::OpenFile;
+
+        let req = Request::post(self.join_path(path, op.as_str()));
+        let req = req
+            .body(AsyncBody::Empty)
+            .map_err(new_request_build_error)?;
+        let resp = self.client.send(req).await?;
+
+        let status = resp.status();
+        let body = resp.into_body().bytes().await?;
+
+        if status.is_success() {
+            let steam_id: u64 = 
serde_json::from_slice(&body).map_err(new_json_serialize_error)?;
+            return Ok(steam_id);
+        }
+        let err_response: ErrRsponse =
+            serde_json::from_slice(&body).map_err(new_json_serialize_error)?;
+        Err(err_response.into())
+    }
+
+    pub(super) async fn delete(&self, path: &str) -> Result<()> {
+        let op = Op::Delete;
+
+        let req = Request::post(self.join_path(path, op.as_str()));
+        let req = req
+            .body(AsyncBody::Empty)
+            .map_err(new_request_build_error)?;
+        let resp = self.client.send(req).await?;
+
+        let status = resp.status();
+        let body = resp.into_body().bytes().await?;
+
+        if status.is_success() {
+            return Ok(());
+        }
+        let err_response: ErrRsponse =
+            serde_json::from_slice(&body).map_err(new_json_serialize_error)?;
+        Err(err_response.into())
+    }
+
+    pub(super) async fn rename(&self, path: &str, dst: &str) -> Result<()> {
+        let op = Op::Rename;
+
+        let req = Request::post(format!(
+            "{}?dst=/{}",
+            self.join_path(path, op.as_str()),
+            dst

Review Comment:
   Do we need to do url encoding for path?



##########
core/src/services/alluxio/core.rs:
##########
@@ -0,0 +1,407 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::fmt::Debug;
+use std::fmt::Formatter;
+
+use http::Request;
+
+use http::header::RANGE;
+use http::Response;
+
+use serde::{Deserialize, Serialize};
+
+use crate::raw::*;
+use crate::*;
+
+#[derive(Debug, Serialize)]
+#[serde(untagged)]
+enum Op {
+    CreateFile {
+        /// Whether to create file recursively
+        #[serde(skip_serializing_if = "Option::is_none")]
+        recursive: Option<bool>,
+    },
+    CreateDir {
+        /// Whether to create dir recursively
+        #[serde(skip_serializing_if = "Option::is_none")]
+        recursive: Option<bool>,
+    },
+    OpenFile,
+    Read,
+    Write,
+    Close,
+    Delete,
+    Rename,
+    ListStatus,
+    GetStatus,
+}
+
+/// Metadata of alluxio object
+#[derive(Debug, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub(super) struct FileInfo {
+    /// The path of the object
+    pub path: String,
+    /// The last modification time of the object
+    pub last_modification_time_ms: i64,
+    /// Whether the object is a folder
+    pub folder: bool,
+    /// The length of the object in bytes
+    pub length: u64,
+}
+
+impl TryFrom<FileInfo> for Metadata {
+    type Error = Error;
+
+    fn try_from(file_info: FileInfo) -> Result<Metadata> {
+        let mut metadata = if file_info.folder {
+            Metadata::new(EntryMode::DIR)
+        } else {
+            Metadata::new(EntryMode::FILE)
+        };
+        metadata
+            .set_content_length(file_info.length)
+            .set_last_modified(parse_datetime_from_from_timestamp_millis(
+                file_info.last_modification_time_ms,
+            )?);
+        Ok(metadata)
+    }
+}
+
+impl Op {
+    /// Get operation name
+    fn as_str(&self) -> &str {
+        match self {
+            Op::CreateFile { .. } => "create-file",
+            Op::CreateDir { .. } => "create-dir",
+            Op::OpenFile => "open-file",
+            Op::Read => "read",
+            Op::Write => "write",
+            Op::Close => "close",
+            Op::Delete => "delete",
+            Op::Rename => "rename",
+            Op::ListStatus => "list-status",
+            Op::GetStatus => "get-status",
+        }
+    }
+}
+
+/// the status code of alluxio
+#[derive(Debug, Deserialize)]
+#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
+enum StatusCode {
+    AlreadyExists,
+    NotFound,
+    Internal,
+}
+
+/// the error response of alluxio
+#[derive(Debug, Deserialize)]
+#[serde(rename_all = "camelCase")]
+struct ErrRsponse {
+    status_code: StatusCode,
+    message: String,
+}
+
+impl From<ErrRsponse> for Error {
+    fn from(value: ErrRsponse) -> Self {
+        match value.status_code {
+            StatusCode::AlreadyExists => Error::new(ErrorKind::AlreadyExists, 
&value.message),
+            StatusCode::NotFound => Error::new(ErrorKind::NotFound, 
&value.message),
+            StatusCode::Internal => Error::new(ErrorKind::Unexpected, 
&value.message),
+        }
+    }
+}
+
+/// Alluxio core
+#[derive(Clone)]
+pub struct AlluxioCore {
+    /// prefix of alluxio api, e.g. "api/v1"
+    pub api_prefix: String,
+    /// prefix of alluxio paths, e.g. "paths"
+    pub paths_prefix: String,
+    /// prefix of alluxio streams, e.g. "streams"
+    pub streams_prefix: String,
+
+    /// root of this backend.
+    pub root: String,
+    /// endpoint of alluxio
+    pub endpoint: String,
+    /// prefix of alluxio
+    pub client: HttpClient,
+}
+
+impl Debug for AlluxioCore {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("Backend")
+            .field("root", &self.root)
+            .field("endpoint", &self.endpoint)
+            .finish_non_exhaustive()
+    }
+}
+
+impl AlluxioCore {
+    /// join a alluxio path
+    fn join_path(&self, path: &str, op: &str) -> String {

Review Comment:
   There are too many internal functions here. Maybe we can write them directly 
since they only be used internally?



##########
core/src/services/alluxio/core.rs:
##########
@@ -0,0 +1,407 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::fmt::Debug;
+use std::fmt::Formatter;
+
+use http::Request;
+
+use http::header::RANGE;
+use http::Response;
+
+use serde::{Deserialize, Serialize};
+
+use crate::raw::*;
+use crate::*;
+
+#[derive(Debug, Serialize)]
+#[serde(untagged)]
+enum Op {
+    CreateFile {
+        /// Whether to create file recursively
+        #[serde(skip_serializing_if = "Option::is_none")]
+        recursive: Option<bool>,
+    },
+    CreateDir {
+        /// Whether to create dir recursively
+        #[serde(skip_serializing_if = "Option::is_none")]
+        recursive: Option<bool>,
+    },
+    OpenFile,
+    Read,
+    Write,
+    Close,
+    Delete,
+    Rename,
+    ListStatus,
+    GetStatus,
+}
+
+/// Metadata of alluxio object
+#[derive(Debug, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub(super) struct FileInfo {
+    /// The path of the object
+    pub path: String,
+    /// The last modification time of the object
+    pub last_modification_time_ms: i64,
+    /// Whether the object is a folder
+    pub folder: bool,
+    /// The length of the object in bytes
+    pub length: u64,
+}
+
+impl TryFrom<FileInfo> for Metadata {
+    type Error = Error;
+
+    fn try_from(file_info: FileInfo) -> Result<Metadata> {
+        let mut metadata = if file_info.folder {
+            Metadata::new(EntryMode::DIR)
+        } else {
+            Metadata::new(EntryMode::FILE)
+        };
+        metadata
+            .set_content_length(file_info.length)
+            .set_last_modified(parse_datetime_from_from_timestamp_millis(
+                file_info.last_modification_time_ms,
+            )?);
+        Ok(metadata)
+    }
+}
+
+impl Op {
+    /// Get operation name
+    fn as_str(&self) -> &str {
+        match self {
+            Op::CreateFile { .. } => "create-file",
+            Op::CreateDir { .. } => "create-dir",
+            Op::OpenFile => "open-file",
+            Op::Read => "read",
+            Op::Write => "write",
+            Op::Close => "close",
+            Op::Delete => "delete",
+            Op::Rename => "rename",
+            Op::ListStatus => "list-status",
+            Op::GetStatus => "get-status",
+        }
+    }
+}
+
+/// the status code of alluxio
+#[derive(Debug, Deserialize)]
+#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
+enum StatusCode {
+    AlreadyExists,
+    NotFound,
+    Internal,
+}
+
+/// the error response of alluxio
+#[derive(Debug, Deserialize)]
+#[serde(rename_all = "camelCase")]
+struct ErrRsponse {
+    status_code: StatusCode,
+    message: String,
+}
+
+impl From<ErrRsponse> for Error {
+    fn from(value: ErrRsponse) -> Self {
+        match value.status_code {
+            StatusCode::AlreadyExists => Error::new(ErrorKind::AlreadyExists, 
&value.message),
+            StatusCode::NotFound => Error::new(ErrorKind::NotFound, 
&value.message),
+            StatusCode::Internal => Error::new(ErrorKind::Unexpected, 
&value.message),
+        }
+    }
+}
+
+/// Alluxio core
+#[derive(Clone)]
+pub struct AlluxioCore {
+    /// prefix of alluxio api, e.g. "api/v1"
+    pub api_prefix: String,
+    /// prefix of alluxio paths, e.g. "paths"
+    pub paths_prefix: String,
+    /// prefix of alluxio streams, e.g. "streams"
+    pub streams_prefix: String,
+
+    /// root of this backend.
+    pub root: String,
+    /// endpoint of alluxio
+    pub endpoint: String,
+    /// prefix of alluxio
+    pub client: HttpClient,
+}
+
+impl Debug for AlluxioCore {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("Backend")
+            .field("root", &self.root)
+            .field("endpoint", &self.endpoint)
+            .finish_non_exhaustive()
+    }
+}
+
+impl AlluxioCore {
+    /// join a alluxio path
+    fn join_path(&self, path: &str, op: &str) -> String {
+        let path = build_abs_path(&self.root, path);
+        format!(
+            "{}/{}/{}//{}/{}",
+            self.endpoint, self.api_prefix, self.paths_prefix, path, op
+        )
+    }
+
+    /// join a alluxio stream
+    fn join_stream(&self, id: u64, op: &str) -> String {
+        format!(
+            "{}/{}/{}/{}/{}",
+            self.endpoint, self.api_prefix, self.streams_prefix, id, op
+        )
+    }
+
+    pub async fn create_dir(&self, path: &str) -> Result<()> {
+        let op = Op::CreateDir {
+            recursive: Some(true),
+        };
+
+        let body = serde_json::to_vec(&op).map_err(new_json_serialize_error)?;
+        let body = bytes::Bytes::from(body);
+        let req = Request::post(self.join_path(path, op.as_str()));
+        let req = req
+            .body(AsyncBody::Bytes(body))
+            .map_err(new_request_build_error)?;
+        let resp = self.client.send(req).await?;
+
+        let status = resp.status();
+        let body = resp.into_body().bytes().await?;
+        if status.is_success() {
+            return Ok(());
+        }
+        let err_response: ErrRsponse =
+            serde_json::from_slice(&body).map_err(new_json_serialize_error)?;
+        Err(err_response.into())
+    }
+
+    pub async fn create_file(&self, path: &str) -> Result<u64> {
+        let op = Op::CreateFile {
+            recursive: Some(true),
+        };
+
+        let body = serde_json::to_vec(&op).map_err(new_json_serialize_error)?;
+        let body = bytes::Bytes::from(body);
+        let mut req = Request::post(self.join_path(path, op.as_str()));
+        req = req.header("Content-Type", "application/json");
+
+        let req = req
+            .body(AsyncBody::Bytes(body.clone()))
+            .map_err(new_request_build_error)?;
+
+        let resp = self.client.send(req).await?;
+
+        let status = resp.status();
+        let body = resp.into_body().bytes().await?;
+
+        if status.is_success() {
+            let steam_id: u64 = 
serde_json::from_slice(&body).map_err(new_json_serialize_error)?;
+            return Ok(steam_id);
+        }
+        let err_response: ErrRsponse =
+            serde_json::from_slice(&body).map_err(new_json_serialize_error)?;
+        Err(err_response.into())
+    }
+
+    pub(super) async fn open_file(&self, path: &str) -> Result<u64> {
+        let op = Op::OpenFile;
+
+        let req = Request::post(self.join_path(path, op.as_str()));
+        let req = req
+            .body(AsyncBody::Empty)
+            .map_err(new_request_build_error)?;
+        let resp = self.client.send(req).await?;
+
+        let status = resp.status();
+        let body = resp.into_body().bytes().await?;
+
+        if status.is_success() {
+            let steam_id: u64 = 
serde_json::from_slice(&body).map_err(new_json_serialize_error)?;
+            return Ok(steam_id);
+        }
+        let err_response: ErrRsponse =
+            serde_json::from_slice(&body).map_err(new_json_serialize_error)?;
+        Err(err_response.into())
+    }
+
+    pub(super) async fn delete(&self, path: &str) -> Result<()> {
+        let op = Op::Delete;
+
+        let req = Request::post(self.join_path(path, op.as_str()));
+        let req = req
+            .body(AsyncBody::Empty)
+            .map_err(new_request_build_error)?;
+        let resp = self.client.send(req).await?;
+
+        let status = resp.status();
+        let body = resp.into_body().bytes().await?;
+
+        if status.is_success() {
+            return Ok(());
+        }
+        let err_response: ErrRsponse =
+            serde_json::from_slice(&body).map_err(new_json_serialize_error)?;
+        Err(err_response.into())
+    }
+
+    pub(super) async fn rename(&self, path: &str, dst: &str) -> Result<()> {
+        let op = Op::Rename;
+
+        let req = Request::post(format!(
+            "{}?dst=/{}",
+            self.join_path(path, op.as_str()),
+            dst
+        ));
+
+        let req = req
+            .body(AsyncBody::Empty)
+            .map_err(new_request_build_error)?;
+
+        let resp = self.client.send(req).await?;
+
+        let status = resp.status();
+        let body = resp.into_body().bytes().await?;
+
+        if status.is_success() {
+            return Ok(());
+        }
+        let err_response: ErrRsponse =
+            serde_json::from_slice(&body).map_err(new_json_serialize_error)?;
+        Err(err_response.into())
+    }
+
+    pub(super) async fn get_status(&self, path: &str) -> Result<FileInfo> {
+        let op = Op::GetStatus;
+
+        let req = Request::post(self.join_path(path, op.as_str()));
+
+        let req = req
+            .body(AsyncBody::Empty)
+            .map_err(new_request_build_error)?;
+        let resp = self.client.send(req).await?;
+
+        let status = resp.status();
+        let body = resp.into_body().bytes().await?;
+
+        if status.is_success() {
+            let file_info: FileInfo =
+                
serde_json::from_slice(&body).map_err(new_json_deserialize_error)?;
+            return Ok(file_info);
+        }
+        let err_response: ErrRsponse =
+            serde_json::from_slice(&body).map_err(new_json_serialize_error)?;
+        Err(err_response.into())
+    }
+
+    pub(super) async fn list_status(&self, path: &str) -> 
Result<Vec<FileInfo>> {
+        let op = Op::ListStatus;
+
+        let req = Request::post(self.join_path(path, op.as_str()));
+
+        let req = req
+            .body(AsyncBody::Empty)
+            .map_err(new_request_build_error)?;
+
+        let resp = self.client.send(req).await?;
+
+        let status = resp.status();
+        let body = resp.into_body().bytes().await?;
+
+        if status.is_success() {
+            let file_infos: Vec<FileInfo> =
+                
serde_json::from_slice(&body).map_err(new_json_deserialize_error)?;
+            return Ok(file_infos);
+        }
+        let err_response: ErrRsponse =
+            serde_json::from_slice(&body).map_err(new_json_serialize_error)?;
+        Err(err_response.into())
+    }
+
+    pub async fn read(
+        &self,
+        stream_id: u64,
+        range: BytesRange,
+    ) -> Result<Response<IncomingAsyncBody>> {
+        let op = Op::Read;
+
+        let mut req = Request::post(self.join_stream(stream_id, op.as_str()));

Review Comment:
   Maybe we can return the stream id in reader directly so that we don't need 
to `open_file` every time users calling `read`?
   
   We can treat this as a future improvement.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to