This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new 7b0e3af2 feat: Add wasabi service implementation (#2004)
7b0e3af2 is described below

commit 7b0e3af28a4b9e3dbc7815dbf172f636074dcf6d
Author: Eric <[email protected]>
AuthorDate: Mon Apr 17 12:59:04 2023 +0800

    feat: Add wasabi service implementation (#2004)
    
    * add wasabi service implementation
    
    * sync change from s3, format files
    
    * re-implement wasabi service based on review comments
    
    * fix ci build
---
 core/Cargo.toml                          |    5 +
 core/src/docs/concepts.rs                |    2 +-
 core/src/raw/accessor.rs                 |    2 +-
 core/src/services/mod.rs                 |    5 +
 core/src/services/s3/core.rs             |   17 +-
 core/src/services/wasabi/backend.rs      | 1164 ++++++++++++++++++++++++++++++
 core/src/services/{s3 => wasabi}/core.rs |  152 ++--
 core/src/services/wasabi/error.rs        |  107 +++
 core/src/services/wasabi/mod.rs          |   24 +
 core/src/services/wasabi/pager.rs        |  231 ++++++
 core/src/services/wasabi/writer.rs       |  107 +++
 core/src/types/scheme.rs                 |    4 +
 12 files changed, 1756 insertions(+), 64 deletions(-)

diff --git a/core/Cargo.toml b/core/Cargo.toml
index 3484ec82..f8b3ec78 100644
--- a/core/Cargo.toml
+++ b/core/Cargo.toml
@@ -130,6 +130,11 @@ services-s3 = [
   "reqsign?/reqwest_request",
 ]
 services-sled = ["dep:sled"]
+services-wasabi = [
+  "dep:reqsign",
+  "reqsign?/services-aws",
+  "reqsign?/reqwest_request",
+]
 services-webdav = []
 services-webhdfs = []
 
diff --git a/core/src/docs/concepts.rs b/core/src/docs/concepts.rs
index 5e87a988..aa01e6e9 100644
--- a/core/src/docs/concepts.rs
+++ b/core/src/docs/concepts.rs
@@ -19,7 +19,7 @@
 //!
 //! OpenDAL provides a unified abstraction for all storage services.
 //!
-//! There are three core concepts in OpenDAL:
+//! There are two core concepts in OpenDAL:
 //!
 //! - [`Builder`]: Build an instance of underlying services.
 //! - [`Operator`]: A bridge between underlying implementation detail and 
unified abstraction.
diff --git a/core/src/raw/accessor.rs b/core/src/raw/accessor.rs
index 4bf41c39..84d4b7aa 100644
--- a/core/src/raw/accessor.rs
+++ b/core/src/raw/accessor.rs
@@ -53,7 +53,7 @@ pub trait Accessor: Send + Sync + Debug + Unpin + 'static {
     /// BlockingReader is the associated reader that could return in
     /// `blocking_read` operation.
     type BlockingReader: oio::BlockingRead;
-    /// Reader is the associated writer the could return in `write` operation.
+    /// Writer is the associated writer the could return in `write` operation.
     type Writer: oio::Write;
     /// BlockingWriter is the associated writer the could return in
     /// `blocking_write` operation.
diff --git a/core/src/services/mod.rs b/core/src/services/mod.rs
index 8dff87f3..4c1b7397 100644
--- a/core/src/services/mod.rs
+++ b/core/src/services/mod.rs
@@ -119,6 +119,11 @@ mod sled;
 #[cfg(feature = "services-sled")]
 pub use self::sled::Sled;
 
+#[cfg(feature = "services-wasabi")]
+mod wasabi;
+#[cfg(feature = "services-wasabi")]
+pub use wasabi::Wasabi;
+
 #[cfg(feature = "services-webdav")]
 mod webdav;
 #[cfg(feature = "services-webdav")]
diff --git a/core/src/services/s3/core.rs b/core/src/services/s3/core.rs
index 4f099188..e236f2e5 100644
--- a/core/src/services/s3/core.rs
+++ b/core/src/services/s3/core.rs
@@ -342,22 +342,7 @@ impl S3Core {
         path: &str,
         if_none_match: Option<&str>,
     ) -> Result<Response<IncomingAsyncBody>> {
-        let p = build_abs_path(&self.root, path);
-
-        let url = format!("{}/{}", self.endpoint, percent_encode_path(&p));
-
-        let mut req = Request::head(&url);
-
-        // Set SSE headers.
-        req = self.insert_sse_headers(req, false);
-
-        if let Some(if_none_match) = if_none_match {
-            req = req.header(IF_NONE_MATCH, if_none_match);
-        }
-
-        let mut req = req
-            .body(AsyncBody::Empty)
-            .map_err(new_request_build_error)?;
+        let mut req = self.s3_head_object_request(path, if_none_match)?;
 
         self.sign(&mut req).await?;
 
diff --git a/core/src/services/wasabi/backend.rs 
b/core/src/services/wasabi/backend.rs
new file mode 100644
index 00000000..de4735b8
--- /dev/null
+++ b/core/src/services/wasabi/backend.rs
@@ -0,0 +1,1164 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+use std::fmt::Debug;
+use std::fmt::Formatter;
+use std::fmt::Write;
+use std::sync::Arc;
+
+use async_trait::async_trait;
+use base64::prelude::BASE64_STANDARD;
+use base64::Engine;
+use bytes::Buf;
+use http::StatusCode;
+use log::debug;
+use md5::Digest;
+use md5::Md5;
+use once_cell::sync::Lazy;
+use reqsign::AwsConfig;
+use reqsign::AwsCredentialLoad;
+use reqsign::AwsLoader;
+use reqsign::AwsV4Signer;
+
+use super::core::*;
+use super::error::parse_error;
+use super::pager::WasabiPager;
+use super::writer::WasabiWriter;
+use crate::ops::*;
+use crate::raw::*;
+use crate::*;
+
+/// Allow constructing correct region endpoint if user gives a global endpoint.
+static ENDPOINT_TEMPLATES: Lazy<HashMap<&'static str, &'static str>> = 
Lazy::new(|| {
+    let mut m = HashMap::new();
+    // AWS S3 Service.
+    m.insert(
+        "https://s3.wasabisys.com";,
+        "https://s3.{region}.wasabisys.com";,
+    );
+    m
+});
+
+/// Wasabi (an aws S3 compatible service) support
+///
+/// # Capabilities
+///
+/// This service can be used to:
+///
+/// - [x] read
+/// - [x] write
+/// - [x] copy
+/// - [x] list
+/// - [x] scan
+/// - [x] presign
+/// - [x] rename
+/// - [ ] blocking
+///
+/// # Configuration
+///
+/// - `root`: Set the work dir for backend.
+/// - `bucket`: Set the container name for backend.
+/// - `endpoint`: Set the endpoint for backend.
+/// - `region`: Set the region for backend.
+/// - `access_key_id`: Set the access_key_id for backend.
+/// - `secret_access_key`: Set the secret_access_key for backend.
+/// - `security_token`: Set the security_token for backend.
+/// - `default_storage_class`: Set the default storage_class for backend.
+/// - `server_side_encryption`: Set the server_side_encryption for backend.
+/// - `server_side_encryption_aws_kms_key_id`: Set the 
server_side_encryption_aws_kms_key_id for backend.
+/// - `server_side_encryption_customer_algorithm`: Set the 
server_side_encryption_customer_algorithm for backend.
+/// - `server_side_encryption_customer_key`: Set the 
server_side_encryption_customer_key for backend.
+/// - `server_side_encryption_customer_key_md5`: Set the 
server_side_encryption_customer_key_md5 for backend.
+/// - `disable_config_load`: Disable aws config load from env
+/// - `enable_virtual_host_style`: Enable virtual host style.
+///
+/// Refer to [`WasabiBuilder`]'s public API docs for more information.
+///
+/// # Temporary security credentials
+///
+/// OpenDAL now provides support for S3 temporary security credentials in IAM.
+///
+/// The way to take advantage of this feature is to build your S3 backend with 
`Builder::security_token`.
+///
+/// But OpenDAL will not refresh the temporary security credentials, please 
keep in mind to refresh those credentials in time.
+///
+/// # Server Side Encryption
+///
+/// OpenDAL provides full support of S3 Server Side Encryption(SSE) features.
+///
+/// The easiest way to configure them is to use helper functions like
+///
+/// - SSE-KMS: `server_side_encryption_with_aws_managed_kms_key`
+/// - SSE-KMS: `server_side_encryption_with_customer_managed_kms_key`
+/// - SSE-S3: `server_side_encryption_with_s3_key`
+/// - SSE-C: `server_side_encryption_with_customer_key`
+///
+/// If those functions don't fulfill need, low-level options are also provided:
+///
+/// - Use service managed kms key
+///   - `server_side_encryption="aws:kms"`
+/// - Use customer provided kms key
+///   - `server_side_encryption="aws:kms"`
+///   - `server_side_encryption_aws_kms_key_id="your-kms-key"`
+/// - Use S3 managed key
+///   - `server_side_encryption="AES256"`
+/// - Use customer key
+///   - `server_side_encryption_customer_algorithm="AES256"`
+///   - `server_side_encryption_customer_key="base64-of-your-aes256-key"`
+///   - 
`server_side_encryption_customer_key_md5="base64-of-your-aes256-key-md5"`
+///
+/// After SSE have been configured, all requests send by this backed will 
attach those headers.
+///
+/// Reference: [Protecting data using server-side 
encryption](https://docs.aws.amazon.com/AmazonS3/latest/userguide/serv-side-encryption.html)
+///
+/// # Example
+///
+/// ## Basic Setup
+///
+/// ```no_run
+/// use std::sync::Arc;
+///
+/// use anyhow::Result;
+/// use opendal::services::Wasabi;
+/// use opendal::Operator;
+///
+/// #[tokio::main]
+/// async fn main() -> Result<()> {
+///     // Create s3 backend builder.
+///     let mut builder = Wasabi::default();
+///     // Set the root for s3, all operations will happen under this root.
+///     //
+///     // NOTE: the root must be absolute path.
+///     builder.root("/path/to/dir");
+///     // Set the bucket name, this is required.
+///     builder.bucket("test");
+///     // Set the endpoint.
+///     //
+///     // For examples:
+///     // - "https://s3.wasabisys.com";
+///     // - "http://127.0.0.1:9000";
+///     // - "https://oss-ap-northeast-1.aliyuncs.com";
+///     // - "https://cos.ap-seoul.myqcloud.com";
+///     //
+///     // Default to "https://s3.wasabisys.com";
+///     builder.endpoint("https://s3.wasabisys.com";);
+///     // Set the access_key_id and secret_access_key.
+///     //
+///     // OpenDAL will try load credential from the env.
+///     // If credential not set and no valid credential in env, OpenDAL will
+///     // send request without signing like anonymous user.
+///     builder.access_key_id("access_key_id");
+///     builder.secret_access_key("secret_access_key");
+///
+///     let op: Operator = Operator::new(builder)?.finish();
+///
+///     Ok(())
+/// }
+/// ```
+///
+/// ## Wasabi with SSE-C
+///
+/// ```no_run
+/// use anyhow::Result;
+/// use log::info;
+/// use opendal::services::Wasabi;
+/// use opendal::Operator;
+///
+/// #[tokio::main]
+/// async fn main() -> Result<()> {
+///     let mut builder = Wasabi::default();
+///
+///     // Setup builders
+///
+///     // Enable SSE-C
+///     builder.server_side_encryption_with_customer_key("AES256", 
"customer_key".as_bytes());
+///
+///     let op = Operator::new(builder)?.finish();
+///     info!("operator: {:?}", op);
+///
+///     // Writing your testing code here.
+///
+///     Ok(())
+/// }
+/// ```
+///
+/// ## Wasabi with SSE-KMS and aws managed kms key
+///
+/// ```no_run
+/// use anyhow::Result;
+/// use log::info;
+/// use opendal::services::Wasabi;
+/// use opendal::Operator;
+///
+/// #[tokio::main]
+/// async fn main() -> Result<()> {
+///     let mut builder = Wasabi::default();
+///
+///     // Setup builders
+///
+///     // Enable SSE-KMS with aws managed kms key
+///     builder.server_side_encryption_with_aws_managed_kms_key();
+///
+///     let op = Operator::new(builder)?.finish();
+///     info!("operator: {:?}", op);
+///
+///     // Writing your testing code here.
+///
+///     Ok(())
+/// }
+/// ```
+///
+/// ## Wasabi with SSE-KMS and customer managed kms key
+///
+/// ```no_run
+/// use anyhow::Result;
+/// use log::info;
+/// use opendal::services::Wasabi;
+/// use opendal::Operator;
+///
+/// #[tokio::main]
+/// async fn main() -> Result<()> {
+///     let mut builder = Wasabi::default();
+///
+///     // Setup builders
+///
+///     // Enable SSE-KMS with customer managed kms key
+///     
builder.server_side_encryption_with_customer_managed_kms_key("aws_kms_key_id");
+///
+///     let op = Operator::new(builder)?.finish();
+///     info!("operator: {:?}", op);
+///
+///     // Writing your testing code here.
+///
+///     Ok(())
+/// }
+/// ```
+///
+/// ## Wasabi with SSE-S3
+///
+/// ```no_run
+/// use anyhow::Result;
+/// use log::info;
+/// use opendal::services::Wasabi;
+/// use opendal::Operator;
+///
+/// #[tokio::main]
+/// async fn main() -> Result<()> {
+///     let mut builder = Wasabi::default();
+///
+///     // Setup builders
+///
+///     // Enable SSE-S3
+///     builder.server_side_encryption_with_s3_key();
+///
+///     let op = Operator::new(builder)?.finish();
+///     info!("operator: {:?}", op);
+///
+///     // Writing your testing code here.
+///
+///     Ok(())
+/// }
+/// ```
+#[derive(Default)]
+pub struct WasabiBuilder {
+    root: Option<String>,
+
+    bucket: String,
+    endpoint: Option<String>,
+    region: Option<String>,
+    role_arn: Option<String>,
+    external_id: Option<String>,
+    access_key_id: Option<String>,
+    secret_access_key: Option<String>,
+    server_side_encryption: Option<String>,
+    server_side_encryption_aws_kms_key_id: Option<String>,
+    server_side_encryption_customer_algorithm: Option<String>,
+    server_side_encryption_customer_key: Option<String>,
+    server_side_encryption_customer_key_md5: Option<String>,
+    default_storage_class: Option<String>,
+
+    /// temporary credentials, check the official 
[doc](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_temp.html)
 for detail
+    security_token: Option<String>,
+
+    disable_config_load: bool,
+    disable_ec2_metadata: bool,
+    enable_virtual_host_style: bool,
+
+    http_client: Option<HttpClient>,
+    customed_credential_load: Option<Box<dyn AwsCredentialLoad>>,
+}
+
+impl Debug for WasabiBuilder {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        let mut d = f.debug_struct("Builder");
+
+        d.field("root", &self.root)
+            .field("bucket", &self.bucket)
+            .field("endpoint", &self.endpoint)
+            .field("region", &self.region);
+
+        d.finish_non_exhaustive()
+    }
+}
+
+impl WasabiBuilder {
+    /// Set root of this backend.
+    ///
+    /// All operations will happen under this root.
+    pub fn root(&mut self, root: &str) -> &mut Self {
+        self.root = if root.is_empty() {
+            None
+        } else {
+            Some(root.to_string())
+        };
+
+        self
+    }
+
+    /// Set bucket name of this backend.
+    pub fn bucket(&mut self, bucket: &str) -> &mut Self {
+        self.bucket = bucket.to_string();
+
+        self
+    }
+
+    /// Set endpoint of this backend.
+    ///
+    /// Endpoint must be full uri, e.g.
+    ///
+    /// - `https://s3.wasabisys.com` or `https://s3.{region}.wasabisys.com`
+    ///
+    /// If user inputs endpoint without scheme like "s3.wasabisys.com", we
+    /// will prepend "https://"; before it.
+    pub fn endpoint(&mut self, endpoint: &str) -> &mut Self {
+        if !endpoint.is_empty() {
+            // Trim trailing `/` so that we can accept `http://127.0.0.1:9000/`
+            self.endpoint = Some(endpoint.trim_end_matches('/').to_string())
+        }
+
+        self
+    }
+
+    /// Region represent the signing region of this endpoint.
+    ///
+    /// - If region is set, we will take user's input first.
+    /// - If not, the default `us-east-1` will be used.
+    pub fn region(&mut self, region: &str) -> &mut Self {
+        if !region.is_empty() {
+            self.region = Some(region.to_string())
+        }
+
+        self
+    }
+
+    /// Set access_key_id of this backend.
+    ///
+    /// - If access_key_id is set, we will take user's input first.
+    /// - If not, we will try to load it from environment.
+    pub fn access_key_id(&mut self, v: &str) -> &mut Self {
+        if !v.is_empty() {
+            self.access_key_id = Some(v.to_string())
+        }
+
+        self
+    }
+
+    /// Set secret_access_key of this backend.
+    ///
+    /// - If secret_access_key is set, we will take user's input first.
+    /// - If not, we will try to load it from environment.
+    pub fn secret_access_key(&mut self, v: &str) -> &mut Self {
+        if !v.is_empty() {
+            self.secret_access_key = Some(v.to_string())
+        }
+
+        self
+    }
+
+    /// Set role_arn for this backend.
+    pub fn role_arn(&mut self, v: &str) -> &mut Self {
+        if !v.is_empty() {
+            self.role_arn = Some(v.to_string())
+        }
+
+        self
+    }
+
+    /// Set external_id for this backend.
+    pub fn external_id(&mut self, v: &str) -> &mut Self {
+        if !v.is_empty() {
+            self.external_id = Some(v.to_string())
+        }
+
+        self
+    }
+
+    /// Set default storage_class for this backend.
+    /// Unlike S3, wasabi only supports one single storage class,
+    /// which is most like standard S3 storage class,
+    /// check `https://docs.wasabi.com/docs/operations-on-objects` for more 
details.
+    ///
+    /// Available values:
+    /// - `STANDARD`
+    pub fn default_storage_class(&mut self, v: &str) -> &mut Self {
+        if !v.is_empty() {
+            self.default_storage_class = Some(v.to_string())
+        }
+
+        self
+    }
+
+    /// Set server_side_encryption for this backend.
+    ///
+    /// Available values: `AES256`, `aws:kms`.
+    ///
+    /// # Note
+    ///
+    /// This function is the low-level setting for SSE related features.
+    ///
+    /// SSE related options should be set carefully to make them works.
+    /// Please use `server_side_encryption_with_*` helpers if even possible.
+    pub fn server_side_encryption(&mut self, v: &str) -> &mut Self {
+        if !v.is_empty() {
+            self.server_side_encryption = Some(v.to_string())
+        }
+
+        self
+    }
+
+    /// Set server_side_encryption_aws_kms_key_id for this backend
+    ///
+    /// - If `server_side_encryption` set to `aws:kms`, and 
`server_side_encryption_aws_kms_key_id`
+    /// is not set, S3 will use aws managed kms key to encrypt data.
+    /// - If `server_side_encryption` set to `aws:kms`, and 
`server_side_encryption_aws_kms_key_id`
+    /// is a valid kms key id, S3 will use the provided kms key to encrypt 
data.
+    /// - If the `server_side_encryption_aws_kms_key_id` is invalid or not 
found, an error will be
+    /// returned.
+    /// - If `server_side_encryption` is not `aws:kms`, setting 
`server_side_encryption_aws_kms_key_id`
+    /// is a noop.
+    ///
+    /// # Note
+    ///
+    /// This function is the low-level setting for SSE related features.
+    ///
+    /// SSE related options should be set carefully to make them works.
+    /// Please use `server_side_encryption_with_*` helpers if even possible.
+    pub fn server_side_encryption_aws_kms_key_id(&mut self, v: &str) -> &mut 
Self {
+        if !v.is_empty() {
+            self.server_side_encryption_aws_kms_key_id = Some(v.to_string())
+        }
+
+        self
+    }
+
+    /// Set server_side_encryption_customer_algorithm for this backend.
+    ///
+    /// Available values: `AES256`.
+    ///
+    /// # Note
+    ///
+    /// This function is the low-level setting for SSE related features.
+    ///
+    /// SSE related options should be set carefully to make them works.
+    /// Please use `server_side_encryption_with_*` helpers if even possible.
+    pub fn server_side_encryption_customer_algorithm(&mut self, v: &str) -> 
&mut Self {
+        if !v.is_empty() {
+            self.server_side_encryption_customer_algorithm = 
Some(v.to_string())
+        }
+
+        self
+    }
+
+    /// Set server_side_encryption_customer_key for this backend.
+    ///
+    /// # Args
+    ///
+    /// `v`: base64 encoded key that matches algorithm specified in
+    /// `server_side_encryption_customer_algorithm`.
+    ///
+    /// # Note
+    ///
+    /// This function is the low-level setting for SSE related features.
+    ///
+    /// SSE related options should be set carefully to make them works.
+    /// Please use `server_side_encryption_with_*` helpers if even possible.
+    pub fn server_side_encryption_customer_key(&mut self, v: &str) -> &mut 
Self {
+        if !v.is_empty() {
+            self.server_side_encryption_customer_key = Some(v.to_string())
+        }
+
+        self
+    }
+
+    /// Set server_side_encryption_customer_key_md5 for this backend.
+    ///
+    /// # Args
+    ///
+    /// `v`: MD5 digest of key specified in 
`server_side_encryption_customer_key`.
+    ///
+    /// # Note
+    ///
+    /// This function is the low-level setting for SSE related features.
+    ///
+    /// SSE related options should be set carefully to make them works.
+    /// Please use `server_side_encryption_with_*` helpers if even possible.
+    pub fn server_side_encryption_customer_key_md5(&mut self, v: &str) -> &mut 
Self {
+        if !v.is_empty() {
+            self.server_side_encryption_customer_key_md5 = Some(v.to_string())
+        }
+
+        self
+    }
+
+    /// Enable server side encryption with aws managed kms key
+    ///
+    /// As known as: SSE-KMS
+    ///
+    /// NOTE: This function should not be used along with other 
`server_side_encryption_with_` functions.
+    pub fn server_side_encryption_with_aws_managed_kms_key(&mut self) -> &mut 
Self {
+        self.server_side_encryption = Some("aws:kms".to_string());
+        self
+    }
+
+    /// Enable server side encryption with customer managed kms key
+    ///
+    /// As known as: SSE-KMS
+    ///
+    /// NOTE: This function should not be used along with other 
`server_side_encryption_with_` functions.
+    pub fn server_side_encryption_with_customer_managed_kms_key(
+        &mut self,
+        aws_kms_key_id: &str,
+    ) -> &mut Self {
+        self.server_side_encryption = Some("aws:kms".to_string());
+        self.server_side_encryption_aws_kms_key_id = 
Some(aws_kms_key_id.to_string());
+        self
+    }
+
+    /// Enable server side encryption with s3 managed key
+    ///
+    /// As known as: SSE-S3
+    ///
+    /// NOTE: This function should not be used along with other 
`server_side_encryption_with_` functions.
+    pub fn server_side_encryption_with_s3_key(&mut self) -> &mut Self {
+        self.server_side_encryption = Some("AES256".to_string());
+        self
+    }
+
+    /// Enable server side encryption with customer key.
+    ///
+    /// As known as: SSE-C
+    ///
+    /// NOTE: This function should not be used along with other 
`server_side_encryption_with_` functions.
+    pub fn server_side_encryption_with_customer_key(
+        &mut self,
+        algorithm: &str,
+        key: &[u8],
+    ) -> &mut Self {
+        self.server_side_encryption_customer_algorithm = 
Some(algorithm.to_string());
+        self.server_side_encryption_customer_key = 
Some(BASE64_STANDARD.encode(key));
+        self.server_side_encryption_customer_key_md5 =
+            Some(BASE64_STANDARD.encode(Md5::digest(key).as_slice()));
+        self
+    }
+
+    /// Set temporary credential used in service connections
+    ///
+    /// # Warning
+    ///
+    /// security token's lifetime is short and requires users to refresh in 
time.
+    pub fn security_token(&mut self, token: &str) -> &mut Self {
+        if !token.is_empty() {
+            self.security_token = Some(token.to_string());
+        }
+        self
+    }
+
+    /// Disable config load so that opendal will not load config from
+    /// environment.
+    ///
+    /// For examples:
+    ///
+    /// - envs like `AWS_ACCESS_KEY_ID`
+    /// - files like `~/.aws/config`
+    pub fn disable_config_load(&mut self) -> &mut Self {
+        self.disable_config_load = true;
+        self
+    }
+
+    /// Disable load credential from ec2 metadata.
+    ///
+    /// This option is used to disable the default behavior of opendal
+    /// to load credential from ec2 metadata, a.k.a, IMDSv2
+    pub fn disable_ec2_metadata(&mut self) -> &mut Self {
+        self.disable_ec2_metadata = true;
+        self
+    }
+
+    /// Enable virtual host style so that opendal will send API requests
+    /// in virtual host style instead of path style.
+    ///
+    /// - By default, opendal will send API to 
`https://s3.us-east-1.wasabisys.com/bucket_name`
+    /// - Enabled, opendal will send API to 
`https://bucket_name.s3.us-east-1.wasabisys.com`
+    pub fn enable_virtual_host_style(&mut self) -> &mut Self {
+        self.enable_virtual_host_style = true;
+        self
+    }
+
+    /// Adding a customed credential load for service.
+    pub fn customed_credential_load(&mut self, cred: Box<dyn 
AwsCredentialLoad>) -> &mut Self {
+        self.customed_credential_load = Some(cred);
+        self
+    }
+
+    /// Specify the http client that used by this service.
+    ///
+    /// # Notes
+    ///
+    /// This API is part of OpenDAL's Raw API. `HttpClient` could be changed
+    /// during minor updates.
+    pub fn http_client(&mut self, client: HttpClient) -> &mut Self {
+        self.http_client = Some(client);
+        self
+    }
+
+    /// Check if `bucket` is valid
+    /// `bucket` must be not empty and if `enable_virtual_host_style` is true
+    /// it couldn't contain dot(.) character
+    fn is_bucket_valid(&self) -> bool {
+        if self.bucket.is_empty() {
+            return false;
+        }
+        // If enable virtual host style, `bucket` will reside in domain part,
+        // for example `https://bucket_name.s3.us-east-1.amazonaws.com`,
+        // so `bucket` with dot can't be recognized correctly for this format.
+        if self.enable_virtual_host_style && self.bucket.contains('.') {
+            return false;
+        }
+        true
+    }
+
+    /// Build endpoint with given region.
+    fn build_endpoint(&self, region: &str) -> String {
+        let bucket = {
+            debug_assert!(self.is_bucket_valid(), "bucket must be valid");
+
+            self.bucket.as_str()
+        };
+
+        let mut endpoint = match &self.endpoint {
+            Some(endpoint) => {
+                if endpoint.starts_with("http") {
+                    endpoint.to_string()
+                } else {
+                    // Prefix https if endpoint doesn't start with scheme.
+                    format!("https://{endpoint}";)
+                }
+            }
+            None => "https://s3.wasabisys.com".to_string(),
+        };
+
+        // If endpoint contains bucket name, we should trim them.
+        endpoint = endpoint.replace(&format!("//{bucket}."), "//");
+
+        // Update with endpoint templates.
+        endpoint = if let Some(template) = 
ENDPOINT_TEMPLATES.get(endpoint.as_str()) {
+            template.replace("{region}", region)
+        } else {
+            // If we don't know where about this endpoint, just leave
+            // them as it.
+            endpoint.to_string()
+        };
+
+        // Apply virtual host style.
+        if self.enable_virtual_host_style {
+            endpoint = endpoint.replace("//", &format!("//{bucket}."))
+        } else {
+            write!(endpoint, "/{bucket}").expect("write into string must 
succeed");
+        };
+
+        endpoint
+    }
+}
+
+impl Builder for WasabiBuilder {
+    const SCHEME: Scheme = Scheme::Wasabi;
+    type Accessor = WasabiBackend;
+
+    fn from_map(map: HashMap<String, String>) -> Self {
+        let mut builder = WasabiBuilder::default();
+
+        map.get("root").map(|v| builder.root(v));
+        map.get("bucket").map(|v| builder.bucket(v));
+        map.get("endpoint").map(|v| builder.endpoint(v));
+        map.get("region").map(|v| builder.region(v));
+        map.get("access_key_id").map(|v| builder.access_key_id(v));
+        map.get("secret_access_key")
+            .map(|v| builder.secret_access_key(v));
+        map.get("security_token").map(|v| builder.security_token(v));
+        map.get("role_arn").map(|v| builder.role_arn(v));
+        map.get("external_id").map(|v| builder.external_id(v));
+        map.get("server_side_encryption")
+            .map(|v| builder.server_side_encryption(v));
+        map.get("server_side_encryption_aws_kms_key_id")
+            .map(|v| builder.server_side_encryption_aws_kms_key_id(v));
+        map.get("server_side_encryption_customer_algorithm")
+            .map(|v| builder.server_side_encryption_customer_algorithm(v));
+        map.get("server_side_encryption_customer_key")
+            .map(|v| builder.server_side_encryption_customer_key(v));
+        map.get("server_side_encryption_customer_key_md5")
+            .map(|v| builder.server_side_encryption_customer_key_md5(v));
+        map.get("disable_config_load")
+            .filter(|v| *v == "on" || *v == "true")
+            .map(|_| builder.disable_config_load());
+        map.get("disable_ec2_metadata")
+            .filter(|v| *v == "on" || *v == "true")
+            .map(|_| builder.disable_ec2_metadata());
+        map.get("enable_virtual_host_style")
+            .filter(|v| *v == "on" || *v == "true")
+            .map(|_| builder.enable_virtual_host_style());
+        map.get("default_storage_class")
+            .map(|v| builder.default_storage_class(v));
+
+        builder
+    }
+
+    fn build(&mut self) -> Result<Self::Accessor> {
+        debug!("backend build started: {:?}", &self);
+
+        let root = normalize_root(&self.root.take().unwrap_or_default());
+        debug!("backend use root {}", &root);
+
+        // Handle bucket name.
+        let bucket = if self.is_bucket_valid() {
+            Ok(&self.bucket)
+        } else {
+            Err(
+                Error::new(ErrorKind::ConfigInvalid, "The bucket is 
misconfigured")
+                    .with_context("service", Scheme::S3),
+            )
+        }?;
+        debug!("backend use bucket {}", &bucket);
+
+        let default_storage_class = match &self.default_storage_class {
+            None => None,
+            Some(v) => Some(
+                build_header_value(v).map_err(|err| err.with_context("key", 
"storage_class"))?,
+            ),
+        };
+
+        let server_side_encryption = match &self.server_side_encryption {
+            None => None,
+            Some(v) => Some(
+                build_header_value(v)
+                    .map_err(|err| err.with_context("key", 
"server_side_encryption"))?,
+            ),
+        };
+
+        let server_side_encryption_aws_kms_key_id =
+            match &self.server_side_encryption_aws_kms_key_id {
+                None => None,
+                Some(v) => Some(build_header_value(v).map_err(|err| {
+                    err.with_context("key", 
"server_side_encryption_aws_kms_key_id")
+                })?),
+            };
+
+        let server_side_encryption_customer_algorithm =
+            match &self.server_side_encryption_customer_algorithm {
+                None => None,
+                Some(v) => Some(build_header_value(v).map_err(|err| {
+                    err.with_context("key", 
"server_side_encryption_customer_algorithm")
+                })?),
+            };
+
+        let server_side_encryption_customer_key =
+            match &self.server_side_encryption_customer_key {
+                None => None,
+                Some(v) => Some(build_header_value(v).map_err(|err| {
+                    err.with_context("key", 
"server_side_encryption_customer_key")
+                })?),
+            };
+
+        let server_side_encryption_customer_key_md5 =
+            match &self.server_side_encryption_customer_key_md5 {
+                None => None,
+                Some(v) => Some(build_header_value(v).map_err(|err| {
+                    err.with_context("key", 
"server_side_encryption_customer_key_md5")
+                })?),
+            };
+
+        let client = if let Some(client) = self.http_client.take() {
+            client
+        } else {
+            HttpClient::new().map_err(|err| {
+                err.with_operation("Builder::build")
+                    .with_context("service", Scheme::S3)
+            })?
+        };
+
+        let mut cfg = AwsConfig::default();
+        if !self.disable_config_load {
+            cfg = cfg.from_profile();
+            cfg = cfg.from_env();
+        }
+
+        // Setting all value from user input if available.
+        if let Some(v) = self.region.take() {
+            cfg.region = Some(v);
+        }
+        if let Some(v) = self.access_key_id.take() {
+            cfg.access_key_id = Some(v)
+        }
+        if let Some(v) = self.secret_access_key.take() {
+            cfg.secret_access_key = Some(v)
+        }
+        if let Some(v) = self.security_token.take() {
+            cfg.session_token = Some(v)
+        }
+        if let Some(v) = self.role_arn.take() {
+            cfg.role_arn = Some(v)
+        }
+        if let Some(v) = self.external_id.take() {
+            cfg.external_id = Some(v)
+        }
+
+        if cfg.region.is_none() {
+            // region is required to make signer work.
+            //
+            // If we don't know region after loading from builder and env.
+            // We will use `us-east-1` as default.
+            cfg.region = Some("us-east-1".to_string());
+        }
+
+        let region = cfg.region.to_owned().unwrap();
+        debug!("backend use region: {region}");
+
+        // Building endpoint.
+        let endpoint = self.build_endpoint(&region);
+        debug!("backend use endpoint: {endpoint}");
+
+        let mut loader = AwsLoader::new(client.client(), 
cfg).with_allow_anonymous();
+        if self.disable_ec2_metadata {
+            loader = loader.with_disable_ec2_metadata();
+        }
+        if let Some(v) = self.customed_credential_load.take() {
+            loader = loader.with_customed_credential_loader(v);
+        }
+
+        let signer = AwsV4Signer::new("s3", &region);
+
+        debug!("backend build finished");
+        Ok(WasabiBackend {
+            core: Arc::new(WasabiCore {
+                bucket: bucket.to_string(),
+                endpoint,
+                root,
+                server_side_encryption,
+                server_side_encryption_aws_kms_key_id,
+                server_side_encryption_customer_algorithm,
+                server_side_encryption_customer_key,
+                server_side_encryption_customer_key_md5,
+                default_storage_class,
+                signer,
+                loader,
+                client,
+            }),
+        })
+    }
+}
+
+/// Backend for wasabi service.
+#[derive(Debug, Clone)]
+pub struct WasabiBackend {
+    core: Arc<WasabiCore>,
+}
+
+#[async_trait]
+impl Accessor for WasabiBackend {
+    type Reader = IncomingAsyncBody;
+    type BlockingReader = ();
+    type Writer = WasabiWriter;
+    type BlockingWriter = ();
+    type Pager = WasabiPager;
+    type BlockingPager = ();
+
+    fn info(&self) -> AccessorInfo {
+        use AccessorCapability::*;
+        use AccessorHint::*;
+
+        let mut am = AccessorInfo::default();
+        am.set_scheme(Scheme::Wasabi)
+            .set_root(&self.core.root)
+            .set_name(&self.core.bucket)
+            .set_max_batch_operations(1000)
+            .set_capabilities(Read | Write | List | Scan | Presign | Batch | 
Copy | Rename)
+            .set_hints(ReadStreamable);
+
+        am
+    }
+
+    async fn create(&self, path: &str, _: OpCreate) -> Result<RpCreate> {
+        let mut req =
+            self.core
+                .put_object_request(path, Some(0), None, None, None, 
AsyncBody::Empty)?;
+
+        self.core.sign(&mut req).await?;
+
+        let resp = self.core.send(req).await?;
+
+        let status = resp.status();
+
+        match status {
+            StatusCode::CREATED | StatusCode::OK => {
+                resp.into_body().consume().await?;
+                Ok(RpCreate::default())
+            }
+            _ => Err(parse_error(resp).await?),
+        }
+    }
+
+    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, 
Self::Reader)> {
+        let resp = self
+            .core
+            .get_object(path, args.range(), args.if_none_match())
+            .await?;
+
+        let status = resp.status();
+
+        match status {
+            StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
+                let meta = parse_into_metadata(path, resp.headers())?;
+                Ok((RpRead::with_metadata(meta), resp.into_body()))
+            }
+            _ => Err(parse_error(resp).await?),
+        }
+    }
+
+    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
+        Ok((
+            RpWrite::default(),
+            WasabiWriter::new(self.core.clone(), args, path.to_string(), None),
+        ))
+    }
+
+    async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> 
Result<RpCopy> {
+        let resp = self.core.copy_object(from, to).await?;
+
+        let status = resp.status();
+
+        match status {
+            StatusCode::OK => {
+                // According to the documentation, when using copy_object, a 
200 error may occur and we need to detect it.
+                // 
https://docs.aws.amazon.com/AmazonS3/latest/API/API_CopyObject.html#API_CopyObject_RequestSyntax
+                resp.into_body().consume().await?;
+
+                Ok(RpCopy::default())
+            }
+            _ => Err(parse_error(resp).await?),
+        }
+    }
+
+    async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
+        // Stat root always returns a DIR.
+        if path == "/" {
+            return Ok(RpStat::new(Metadata::new(EntryMode::DIR)));
+        }
+
+        let resp = self.core.head_object(path, args.if_none_match()).await?;
+
+        let status = resp.status();
+
+        match status {
+            StatusCode::OK => parse_into_metadata(path, 
resp.headers()).map(RpStat::new),
+            StatusCode::NOT_FOUND if path.ends_with('/') => {
+                Ok(RpStat::new(Metadata::new(EntryMode::DIR)))
+            }
+            _ => Err(parse_error(resp).await?),
+        }
+    }
+
+    async fn delete(&self, path: &str, _: OpDelete) -> Result<RpDelete> {
+        let resp = self.core.delete_object(path).await?;
+
+        let status = resp.status();
+
+        match status {
+            StatusCode::NO_CONTENT => Ok(RpDelete::default()),
+            _ => Err(parse_error(resp).await?),
+        }
+    }
+
+    async fn list(&self, path: &str, args: OpList) -> Result<(RpList, 
Self::Pager)> {
+        Ok((
+            RpList::default(),
+            WasabiPager::new(self.core.clone(), path, "/", args.limit()),
+        ))
+    }
+
+    async fn scan(&self, path: &str, args: OpScan) -> Result<(RpScan, 
Self::Pager)> {
+        Ok((
+            RpScan::default(),
+            WasabiPager::new(self.core.clone(), path, "", args.limit()),
+        ))
+    }
+
+    async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
+        // We will not send this request out, just for signing.
+        let mut req = match args.operation() {
+            PresignOperation::Stat(v) => self.core.head_object_request(path, 
v.if_none_match())?,
+            PresignOperation::Read(v) => self.core.get_object_request(
+                path,
+                v.range(),
+                v.override_content_disposition(),
+                v.override_cache_control(),
+                v.if_none_match(),
+            )?,
+            PresignOperation::Write(_) => {
+                self.core
+                    .put_object_request(path, None, None, None, None, 
AsyncBody::Empty)?
+            }
+        };
+
+        self.core.sign_query(&mut req, args.expire()).await?;
+
+        // We don't need this request anymore, consume it directly.
+        let (parts, _) = req.into_parts();
+
+        Ok(RpPresign::new(PresignedRequest::new(
+            parts.method,
+            parts.uri,
+            parts.headers,
+        )))
+    }
+
+    async fn batch(&self, args: OpBatch) -> Result<RpBatch> {
+        let ops = args.into_operation();
+        if ops.len() > 1000 {
+            return Err(Error::new(
+                ErrorKind::Unsupported,
+                "s3 services only allow delete up to 1000 keys at once",
+            )
+            .with_context("length", ops.len().to_string()));
+        }
+
+        let paths = ops.into_iter().map(|(p, _)| p).collect();
+
+        let resp = self.core.delete_objects(paths).await?;
+
+        let status = resp.status();
+
+        if let StatusCode::OK = status {
+            let bs = resp.into_body().bytes().await?;
+
+            let result: DeleteObjectsResult =
+                
quick_xml::de::from_reader(bs.reader()).map_err(new_xml_deserialize_error)?;
+
+            let mut batched_result = Vec::with_capacity(result.deleted.len() + 
result.error.len());
+            for i in result.deleted {
+                let path = build_rel_path(&self.core.root, &i.key);
+                batched_result.push((path, Ok(RpDelete::default().into())));
+            }
+            // TODO: we should handle those errors with code.
+            for i in result.error {
+                let path = build_rel_path(&self.core.root, &i.key);
+
+                batched_result.push((
+                    path,
+                    Err(Error::new(ErrorKind::Unexpected, &format!("{i:?}"))),
+                ));
+            }
+
+            Ok(RpBatch::new(batched_result))
+        } else {
+            Err(parse_error(resp).await?)
+        }
+    }
+
+    /// Execute rename API call
+    /// Wasabi will auto-create missing path for destination `to` if any
+    async fn rename(&self, from: &str, to: &str, _args: OpRename) -> 
Result<RpRename> {
+        let resp = self.core.rename_object(from, to).await?;
+
+        let status = resp.status();
+
+        match status {
+            StatusCode::OK => Ok(RpRename::default()),
+            _ => Err(parse_error(resp).await?),
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn test_is_valid_bucket() {
+        let bucket_cases = vec![
+            ("", false, false),
+            ("test", false, true),
+            ("test.xyz", false, true),
+            ("", true, false),
+            ("test", true, true),
+            ("test.xyz", true, false),
+        ];
+
+        for (bucket, enable_virtual_host_style, expected) in bucket_cases {
+            let mut b = WasabiBuilder::default();
+            b.bucket(bucket);
+            if enable_virtual_host_style {
+                b.enable_virtual_host_style();
+            }
+            assert_eq!(b.is_bucket_valid(), expected)
+        }
+    }
+
+    #[test]
+    fn test_build_endpoint() {
+        let _ = env_logger::try_init();
+
+        let endpoint_cases = vec![
+            Some("s3.wasabisys.com"),
+            Some("https://s3.wasabisys.com";),
+            Some("https://s3.us-east-2.amazonaws.com";),
+            None,
+        ];
+
+        for endpoint in &endpoint_cases {
+            let mut b = WasabiBuilder::default();
+            b.bucket("test");
+            if let Some(endpoint) = endpoint {
+                b.endpoint(endpoint);
+            }
+
+            let endpoint = b.build_endpoint("us-east-2");
+            assert_eq!(endpoint, "https://s3.us-east-2.wasabisys.com/test";);
+        }
+
+        for endpoint in &endpoint_cases {
+            let mut b = WasabiBuilder::default();
+            b.bucket("test");
+            b.enable_virtual_host_style();
+            if let Some(endpoint) = endpoint {
+                b.endpoint(endpoint);
+            }
+
+            let endpoint = b.build_endpoint("us-east-2");
+            assert_eq!(endpoint, "https://test.s3.us-east-2.wasabisys.com";);
+        }
+    }
+}
diff --git a/core/src/services/s3/core.rs b/core/src/services/wasabi/core.rs
similarity index 89%
copy from core/src/services/s3/core.rs
copy to core/src/services/wasabi/core.rs
index 4f099188..4fdc385f 100644
--- a/core/src/services/s3/core.rs
+++ b/core/src/services/wasabi/core.rs
@@ -24,11 +24,11 @@ use std::time::Duration;
 use backon::ExponentialBuilder;
 use backon::Retryable;
 use bytes::Bytes;
+use http::header::HeaderName;
 use http::header::CACHE_CONTROL;
 use http::header::CONTENT_DISPOSITION;
 use http::header::CONTENT_LENGTH;
 use http::header::CONTENT_TYPE;
-use http::header::{HeaderName, IF_NONE_MATCH};
 use http::HeaderValue;
 use http::Request;
 use http::Response;
@@ -56,21 +56,27 @@ mod constants {
         "x-amz-server-side-encryption-aws-kms-key-id";
     pub const X_AMZ_STORAGE_CLASS: &str = "x-amz-storage-class";
 
+    #[allow(dead_code)]
     pub const X_AMZ_COPY_SOURCE_SERVER_SIDE_ENCRYPTION_CUSTOMER_ALGORITHM: 
&str =
         "x-amz-copy-source-server-side-encryption-customer-algorithm";
+    #[allow(dead_code)]
     pub const X_AMZ_COPY_SOURCE_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY: &str =
         "x-amz-copy-source-server-side-encryption-customer-key";
+    #[allow(dead_code)]
     pub const X_AMZ_COPY_SOURCE_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY_MD5: &str =
         "x-amz-copy-source-server-side-encryption-customer-key-md5";
 
     pub const RESPONSE_CONTENT_DISPOSITION: &str = 
"response-content-disposition";
     pub const RESPONSE_CACHE_CONTROL: &str = "response-cache-control";
+
+    pub const DESTINATION: &str = "Destination";
+    pub const OVERWRITE: &str = "Overwrite";
 }
 
 static BACKOFF: Lazy<ExponentialBuilder> =
     Lazy::new(|| ExponentialBuilder::default().with_jitter());
 
-pub struct S3Core {
+pub struct WasabiCore {
     pub bucket: String,
     pub endpoint: String,
     pub root: String,
@@ -86,9 +92,9 @@ pub struct S3Core {
     pub client: HttpClient,
 }
 
-impl Debug for S3Core {
+impl Debug for WasabiCore {
     fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
-        f.debug_struct("S3Core")
+        f.debug_struct("WasabiCore")
             .field("bucket", &self.bucket)
             .field("endpoint", &self.endpoint)
             .field("root", &self.root)
@@ -96,7 +102,7 @@ impl Debug for S3Core {
     }
 }
 
-impl S3Core {
+impl WasabiCore {
     /// If credential is not found, we will not sign the request.
     async fn load_credential(&self) -> Result<Option<AwsCredential>> {
         let cred = { || self.loader.load() }
@@ -141,7 +147,7 @@ impl S3Core {
     /// # Note
     ///
     /// header like X_AMZ_SERVER_SIDE_ENCRYPTION doesn't need to set while
-    //  get or stat.
+    /// get or stat.
     pub fn insert_sse_headers(
         &self,
         mut req: http::request::Builder,
@@ -200,8 +206,8 @@ impl S3Core {
     }
 }
 
-impl S3Core {
-    pub fn s3_head_object_request(
+impl WasabiCore {
+    pub fn head_object_request(
         &self,
         path: &str,
         if_none_match: Option<&str>,
@@ -215,7 +221,7 @@ impl S3Core {
         req = self.insert_sse_headers(req, false);
 
         if let Some(if_none_match) = if_none_match {
-            req = req.header(IF_NONE_MATCH, if_none_match);
+            req = req.header(http::header::IF_NONE_MATCH, if_none_match);
         }
 
         let req = req
@@ -225,7 +231,7 @@ impl S3Core {
         Ok(req)
     }
 
-    pub fn s3_get_object_request(
+    pub fn get_object_request(
         &self,
         path: &str,
         range: BytesRange,
@@ -265,7 +271,7 @@ impl S3Core {
         }
 
         if let Some(if_none_match) = if_none_match {
-            req = req.header(IF_NONE_MATCH, if_none_match);
+            req = req.header(http::header::IF_NONE_MATCH, if_none_match);
         }
 
         // Set SSE headers.
@@ -279,20 +285,20 @@ impl S3Core {
         Ok(req)
     }
 
-    pub async fn s3_get_object(
+    pub async fn get_object(
         &self,
         path: &str,
         range: BytesRange,
         if_none_match: Option<&str>,
     ) -> Result<Response<IncomingAsyncBody>> {
-        let mut req = self.s3_get_object_request(path, range, None, None, 
if_none_match)?;
+        let mut req = self.get_object_request(path, range, None, None, 
if_none_match)?;
 
         self.sign(&mut req).await?;
 
         self.send(req).await
     }
 
-    pub fn s3_put_object_request(
+    pub fn put_object_request(
         &self,
         path: &str,
         size: Option<usize>,
@@ -337,34 +343,19 @@ impl S3Core {
         Ok(req)
     }
 
-    pub async fn s3_head_object(
+    pub async fn head_object(
         &self,
         path: &str,
         if_none_match: Option<&str>,
     ) -> Result<Response<IncomingAsyncBody>> {
-        let p = build_abs_path(&self.root, path);
-
-        let url = format!("{}/{}", self.endpoint, percent_encode_path(&p));
-
-        let mut req = Request::head(&url);
-
-        // Set SSE headers.
-        req = self.insert_sse_headers(req, false);
-
-        if let Some(if_none_match) = if_none_match {
-            req = req.header(IF_NONE_MATCH, if_none_match);
-        }
-
-        let mut req = req
-            .body(AsyncBody::Empty)
-            .map_err(new_request_build_error)?;
+        let mut req = self.head_object_request(path, if_none_match)?;
 
         self.sign(&mut req).await?;
 
         self.send(req).await
     }
 
-    pub async fn s3_delete_object(&self, path: &str) -> 
Result<Response<IncomingAsyncBody>> {
+    pub async fn delete_object(&self, path: &str) -> 
Result<Response<IncomingAsyncBody>> {
         let p = build_abs_path(&self.root, path);
 
         let url = format!("{}/{}", self.endpoint, percent_encode_path(&p));
@@ -378,11 +369,7 @@ impl S3Core {
         self.send(req).await
     }
 
-    pub async fn s3_copy_object(
-        &self,
-        from: &str,
-        to: &str,
-    ) -> Result<Response<IncomingAsyncBody>> {
+    pub async fn copy_object(&self, from: &str, to: &str) -> 
Result<Response<IncomingAsyncBody>> {
         let from = build_abs_path(&self.root, from);
         let to = build_abs_path(&self.root, to);
 
@@ -442,7 +429,7 @@ impl S3Core {
 
     /// Make this functions as `pub(suber)` because `DirStream` depends
     /// on this.
-    pub async fn s3_list_objects(
+    pub async fn list_objects(
         &self,
         path: &str,
         continuation_token: &str,
@@ -481,7 +468,7 @@ impl S3Core {
         self.send(req).await
     }
 
-    pub async fn s3_initiate_multipart_upload(
+    pub async fn initiate_multipart_upload(
         &self,
         path: &str,
         content_type: Option<&str>,
@@ -523,7 +510,7 @@ impl S3Core {
         self.send(req).await
     }
 
-    pub fn s3_upload_part_request(
+    pub fn upload_part_request(
         &self,
         path: &str,
         upload_id: &str,
@@ -556,7 +543,7 @@ impl S3Core {
         Ok(req)
     }
 
-    pub async fn s3_complete_multipart_upload(
+    pub async fn complete_multipart_upload(
         &self,
         path: &str,
         upload_id: &str,
@@ -595,7 +582,7 @@ impl S3Core {
     }
 
     /// Abort an on-going multipart upload.
-    pub async fn s3_abort_multipart_upload(
+    pub async fn abort_multipart_upload(
         &self,
         path: &str,
         upload_id: &str,
@@ -616,10 +603,7 @@ impl S3Core {
         self.send(req).await
     }
 
-    pub async fn s3_delete_objects(
-        &self,
-        paths: Vec<String>,
-    ) -> Result<Response<IncomingAsyncBody>> {
+    pub async fn delete_objects(&self, paths: Vec<String>) -> 
Result<Response<IncomingAsyncBody>> {
         let url = format!("{}/?delete", self.endpoint);
 
         let req = Request::post(&url);
@@ -649,6 +633,82 @@ impl S3Core {
 
         self.send(req).await
     }
+
+    pub async fn put_object(
+        &self,
+        path: &str,
+        size: Option<usize>,
+        content_type: Option<&str>,
+        content_disposition: Option<&str>,
+        cache_control: Option<&str>,
+        body: AsyncBody,
+    ) -> Result<Response<IncomingAsyncBody>> {
+        let mut req = self.put_object_request(
+            path,
+            size,
+            content_type,
+            content_disposition,
+            cache_control,
+            body,
+        )?;
+
+        self.sign(&mut req).await?;
+
+        self.send(req).await
+    }
+
+    pub async fn rename_object(&self, from: &str, to: &str) -> 
Result<Response<IncomingAsyncBody>> {
+        let from = percent_encode_path(build_abs_path(&self.root, 
from).as_str());
+        let to = percent_encode_path(build_abs_path(&self.root, to).as_str());
+
+        let url = format!("{}/{}", self.endpoint, from);
+
+        let mut req = Request::builder().method("MOVE").uri(url);
+
+        // Set SSE headers.
+        req = self.insert_sse_headers(req, true);
+
+        let mut req = req
+            .header(constants::DESTINATION, to)
+            .header(constants::OVERWRITE, "true")
+            .body(AsyncBody::Empty)
+            .map_err(new_request_build_error)?;
+
+        self.sign(&mut req).await?;
+
+        self.send(req).await
+    }
+
+    pub async fn append_object(
+        &self,
+        path: &str,
+        size: Option<usize>,
+        body: AsyncBody,
+    ) -> Result<Response<IncomingAsyncBody>> {
+        let p = build_abs_path(&self.root, path);
+
+        let url = format!("{}/{}?append", self.endpoint, 
percent_encode_path(&p));
+
+        let mut req = Request::put(&url);
+
+        if let Some(size) = size {
+            req = req.header(CONTENT_LENGTH, size)
+        }
+
+        // Set storage class header
+        if let Some(v) = &self.default_storage_class {
+            req = 
req.header(HeaderName::from_static(constants::X_AMZ_STORAGE_CLASS), v);
+        }
+
+        // Set SSE headers.
+        req = self.insert_sse_headers(req, true);
+
+        let mut req = req.body(body).map_err(new_request_build_error)?;
+
+        self.sign(&mut req).await?;
+
+        self.send(req).await
+    }
 }
 
 /// Result of CreateMultipartUpload
diff --git a/core/src/services/wasabi/error.rs 
b/core/src/services/wasabi/error.rs
new file mode 100644
index 00000000..1eb5407e
--- /dev/null
+++ b/core/src/services/wasabi/error.rs
@@ -0,0 +1,107 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use bytes::Buf;
+use http::Response;
+use http::StatusCode;
+use quick_xml::de;
+use serde::Deserialize;
+
+use crate::raw::*;
+use crate::Error;
+use crate::ErrorKind;
+use crate::Result;
+
+/// WasabiError is the error returned by wasabi service.
+#[derive(Default, Debug, Deserialize)]
+#[serde(default, rename_all = "PascalCase")]
+struct WasabiError {
+    code: String,
+    message: String,
+    resource: String,
+    request_id: String,
+}
+
+/// Parse error response into Error.
+pub async fn parse_error(resp: Response<IncomingAsyncBody>) -> Result<Error> {
+    let (parts, body) = resp.into_parts();
+    let bs = body.bytes().await?;
+
+    let (mut kind, mut retryable) = match parts.status {
+        StatusCode::NOT_FOUND => (ErrorKind::NotFound, false),
+        StatusCode::FORBIDDEN => (ErrorKind::PermissionDenied, false),
+        StatusCode::PRECONDITION_FAILED => (ErrorKind::PreconditionFailed, 
false),
+        StatusCode::INTERNAL_SERVER_ERROR
+        | StatusCode::BAD_GATEWAY
+        | StatusCode::SERVICE_UNAVAILABLE
+        | StatusCode::GATEWAY_TIMEOUT => (ErrorKind::Unexpected, true),
+        _ => (ErrorKind::Unexpected, false),
+    };
+
+    let (message, wasabi_err) = de::from_reader::<_, 
WasabiError>(bs.clone().reader())
+        .map(|err| (format!("{err:?}"), Some(err)))
+        .unwrap_or_else(|_| (String::from_utf8_lossy(&bs).into_owned(), None));
+
+    // All possible error code: 
<https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html#ErrorCodeList>
+    if let Some(wasabi_err) = wasabi_err {
+        (kind, retryable) = match wasabi_err.code.as_str() {
+            // > Your socket connection to the server was not read from
+            // > or written to within the timeout period."
+            //
+            // It's Ok for us to retry it again.
+            "RequestTimeout" => (ErrorKind::Unexpected, true),
+            _ => (kind, retryable),
+        }
+    }
+
+    let mut err = Error::new(kind, &message).with_context("response", 
format!("{parts:?}"));
+
+    if retryable {
+        err = err.set_temporary();
+    }
+
+    Ok(err)
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    /// Error response example is from 
https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html
+    #[test]
+    fn test_parse_error() {
+        let bs = bytes::Bytes::from(
+            r#"
+<?xml version="1.0" encoding="UTF-8"?>
+<Error>
+  <Code>NoSuchKey</Code>
+  <Message>The resource you requested does not exist</Message>
+  <Resource>/mybucket/myfoto.jpg</Resource>
+  <RequestId>4442587FB7D0A2F9</RequestId>
+</Error>
+"#,
+        );
+
+        let out: WasabiError = de::from_reader(bs.reader()).expect("must 
success");
+        println!("{out:?}");
+
+        assert_eq!(out.code, "NoSuchKey");
+        assert_eq!(out.message, "The resource you requested does not exist");
+        assert_eq!(out.resource, "/mybucket/myfoto.jpg");
+        assert_eq!(out.request_id, "4442587FB7D0A2F9");
+    }
+}
diff --git a/core/src/services/wasabi/mod.rs b/core/src/services/wasabi/mod.rs
new file mode 100644
index 00000000..7356abc0
--- /dev/null
+++ b/core/src/services/wasabi/mod.rs
@@ -0,0 +1,24 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+mod backend;
+pub use backend::WasabiBuilder as Wasabi;
+
+mod core;
+mod error;
+mod pager;
+mod writer;
diff --git a/core/src/services/wasabi/pager.rs 
b/core/src/services/wasabi/pager.rs
new file mode 100644
index 00000000..038d69a1
--- /dev/null
+++ b/core/src/services/wasabi/pager.rs
@@ -0,0 +1,231 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use async_trait::async_trait;
+use bytes::Buf;
+use quick_xml::de;
+use serde::Deserialize;
+
+use super::core::WasabiCore;
+use super::error::parse_error;
+use crate::raw::*;
+use crate::EntryMode;
+use crate::Metadata;
+use crate::Result;
+
+pub struct WasabiPager {
+    core: Arc<WasabiCore>,
+
+    path: String,
+    delimiter: String,
+    limit: Option<usize>,
+
+    token: String,
+    done: bool,
+}
+
+impl WasabiPager {
+    pub fn new(core: Arc<WasabiCore>, path: &str, delimiter: &str, limit: 
Option<usize>) -> Self {
+        Self {
+            core,
+
+            path: path.to_string(),
+            delimiter: delimiter.to_string(),
+            limit,
+
+            token: "".to_string(),
+            done: false,
+        }
+    }
+}
+
+#[async_trait]
+impl oio::Page for WasabiPager {
+    async fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> {
+        if self.done {
+            return Ok(None);
+        }
+
+        let resp = self
+            .core
+            .list_objects(&self.path, &self.token, &self.delimiter, self.limit)
+            .await?;
+
+        if resp.status() != http::StatusCode::OK {
+            return Err(parse_error(resp).await?);
+        }
+
+        let bs = resp.into_body().bytes().await?;
+
+        let output: Output = 
de::from_reader(bs.reader()).map_err(new_xml_deserialize_error)?;
+
+        // Try our best to check whether this list is done.
+        //
+        // - Check `is_truncated`
+        // - Check `next_continuation_token`
+        // - Check the length of `common_prefixes` and `contents` (very rarely 
case)
+        self.done = if let Some(is_truncated) = output.is_truncated {
+            !is_truncated
+        } else if let Some(next_continuation_token) = 
output.next_continuation_token.as_ref() {
+            next_continuation_token.is_empty()
+        } else {
+            output.common_prefixes.is_empty() && output.contents.is_empty()
+        };
+        self.token = 
output.next_continuation_token.clone().unwrap_or_default();
+
+        let mut entries = Vec::with_capacity(output.common_prefixes.len() + 
output.contents.len());
+
+        for prefix in output.common_prefixes {
+            let de = oio::Entry::new(
+                &build_rel_path(&self.core.root, &prefix.prefix),
+                Metadata::new(EntryMode::DIR),
+            );
+
+            entries.push(de);
+        }
+
+        for object in output.contents {
+            // s3 could return the dir itself in contents
+            // which endswith `/`.
+            // We should ignore them.
+            if object.key.ends_with('/') {
+                continue;
+            }
+
+            let mut meta = Metadata::new(EntryMode::FILE);
+
+            meta.set_etag(&object.etag);
+            meta.set_content_md5(object.etag.trim_matches('"'));
+            meta.set_content_length(object.size);
+
+            // object.last_modified provides more precious time that contains
+            // nanosecond, let's trim them.
+            
meta.set_last_modified(parse_datetime_from_rfc3339(object.last_modified.as_str())?);
+
+            let de = oio::Entry::new(&build_rel_path(&self.core.root, 
&object.key), meta);
+
+            entries.push(de);
+        }
+
+        Ok(Some(entries))
+    }
+}
+
+/// Output of ListBucket/ListObjects.
+///
+/// ## Note
+///
+/// Use `Option` in `is_truncated` and `next_continuation_token` to make
+/// the behavior more clear so that we can be compatible to more s3 services.
+///
+/// And enable `serde(default)` so that we can keep going even when some field
+/// is not exist.
+#[derive(Default, Debug, Deserialize)]
+#[serde(default, rename_all = "PascalCase")]
+struct Output {
+    is_truncated: Option<bool>,
+    next_continuation_token: Option<String>,
+    common_prefixes: Vec<OutputCommonPrefix>,
+    contents: Vec<OutputContent>,
+}
+
+#[derive(Default, Debug, Eq, PartialEq, Deserialize)]
+#[serde(rename_all = "PascalCase")]
+struct OutputContent {
+    key: String,
+    size: u64,
+    last_modified: String,
+    #[serde(rename = "ETag")]
+    etag: String,
+}
+
+#[derive(Default, Debug, Eq, PartialEq, Deserialize)]
+#[serde(rename_all = "PascalCase")]
+struct OutputCommonPrefix {
+    prefix: String,
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn test_parse_list_output() {
+        let bs = bytes::Bytes::from(
+            r#"<ListBucketResult 
xmlns="http://s3.amazonaws.com/doc/2006-03-01/";>
+  <Name>example-bucket</Name>
+  <Prefix>photos/2006/</Prefix>
+  <KeyCount>3</KeyCount>
+  <MaxKeys>1000</MaxKeys>
+  <Delimiter>/</Delimiter>
+  <IsTruncated>false</IsTruncated>
+  <Contents>
+    <Key>photos/2006</Key>
+    <LastModified>2016-04-30T23:51:29.000Z</LastModified>
+    <ETag>"d41d8cd98f00b204e9800998ecf8427e"</ETag>
+    <Size>56</Size>
+    <StorageClass>STANDARD</StorageClass>
+  </Contents>
+  <Contents>
+    <Key>photos/2007</Key>
+    <LastModified>2016-04-30T23:51:29.000Z</LastModified>
+    <ETag>"d41d8cd98f00b204e9800998ecf8427e"</ETag>
+    <Size>100</Size>
+    <StorageClass>STANDARD</StorageClass>
+  </Contents>
+
+  <CommonPrefixes>
+    <Prefix>photos/2006/February/</Prefix>
+  </CommonPrefixes>
+  <CommonPrefixes>
+    <Prefix>photos/2006/January/</Prefix>
+  </CommonPrefixes>
+</ListBucketResult>"#,
+        );
+
+        let out: Output = de::from_reader(bs.reader()).expect("must success");
+
+        assert!(!out.is_truncated.unwrap());
+        assert!(out.next_continuation_token.is_none());
+        assert_eq!(
+            out.common_prefixes
+                .iter()
+                .map(|v| v.prefix.clone())
+                .collect::<Vec<String>>(),
+            vec!["photos/2006/February/", "photos/2006/January/"]
+        );
+        assert_eq!(
+            out.contents,
+            vec![
+                OutputContent {
+                    key: "photos/2006".to_string(),
+                    size: 56,
+                    etag: "\"d41d8cd98f00b204e9800998ecf8427e\"".to_string(),
+                    last_modified: "2016-04-30T23:51:29.000Z".to_string(),
+                },
+                OutputContent {
+                    key: "photos/2007".to_string(),
+                    size: 100,
+                    last_modified: "2016-04-30T23:51:29.000Z".to_string(),
+                    etag: "\"d41d8cd98f00b204e9800998ecf8427e\"".to_string(),
+                }
+            ]
+        )
+    }
+}
diff --git a/core/src/services/wasabi/writer.rs 
b/core/src/services/wasabi/writer.rs
new file mode 100644
index 00000000..90d193df
--- /dev/null
+++ b/core/src/services/wasabi/writer.rs
@@ -0,0 +1,107 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use async_trait::async_trait;
+use bytes::Bytes;
+use http::StatusCode;
+
+use super::core::*;
+use super::error::parse_error;
+use crate::ops::OpWrite;
+use crate::raw::*;
+use crate::*;
+
+pub struct WasabiWriter {
+    core: Arc<WasabiCore>,
+
+    op: OpWrite,
+    path: String,
+
+    upload_id: Option<String>,
+}
+
+impl WasabiWriter {
+    pub fn new(
+        core: Arc<WasabiCore>,
+        op: OpWrite,
+        path: String,
+        upload_id: Option<String>,
+    ) -> Self {
+        WasabiWriter {
+            core,
+
+            op,
+            path,
+            upload_id,
+        }
+    }
+}
+
+#[async_trait]
+impl oio::Write for WasabiWriter {
+    async fn write(&mut self, bs: Bytes) -> Result<()> {
+        debug_assert!(
+            self.upload_id.is_none(),
+            "Writer initiated with upload id, but users trying to call write, 
must be buggy"
+        );
+
+        let resp = self
+            .core
+            .put_object(
+                &self.path,
+                Some(bs.len()),
+                self.op.content_type(),
+                self.op.content_disposition(),
+                self.op.cache_control(),
+                AsyncBody::Bytes(bs),
+            )
+            .await?;
+
+        match resp.status() {
+            StatusCode::CREATED | StatusCode::OK => {
+                resp.into_body().consume().await?;
+                Ok(())
+            }
+            _ => Err(parse_error(resp).await?),
+        }
+    }
+
+    async fn append(&mut self, bs: Bytes) -> Result<()> {
+        let resp = self
+            .core
+            .append_object(&self.path, Some(bs.len()), AsyncBody::Bytes(bs))
+            .await?;
+
+        match resp.status() {
+            StatusCode::CREATED | StatusCode::OK | StatusCode::NO_CONTENT => {
+                resp.into_body().consume().await?;
+                Ok(())
+            }
+            _ => Err(parse_error(resp).await?),
+        }
+    }
+
+    async fn abort(&mut self) -> Result<()> {
+        Ok(())
+    }
+
+    async fn close(&mut self) -> Result<()> {
+        Ok(())
+    }
+}
diff --git a/core/src/types/scheme.rs b/core/src/types/scheme.rs
index c5c1b1d9..aff72868 100644
--- a/core/src/types/scheme.rs
+++ b/core/src/types/scheme.rs
@@ -71,6 +71,8 @@ pub enum Scheme {
     S3,
     /// [sled][crate::services::Sled]: Sled services
     Sled,
+    /// [wasabi][crate::services::Wasabi]: Wasabi service
+    Wasabi,
     /// [webdav][crate::services::Webdav]: WebDAV support.
     Webdav,
     /// [webhdfs][crate::services::Webhdfs]: WebHDFS RESTful API Services
@@ -129,6 +131,7 @@ impl FromStr for Scheme {
             "s3" => Ok(Scheme::S3),
             "sled" => Ok(Scheme::Sled),
             "oss" => Ok(Scheme::Oss),
+            "wasabi" => Ok(Scheme::Wasabi),
             "webdav" => Ok(Scheme::Webdav),
             "webhdfs" => Ok(Scheme::Webhdfs),
             _ => Ok(Scheme::Custom(Box::leak(s.into_boxed_str()))),
@@ -159,6 +162,7 @@ impl From<Scheme> for &'static str {
             Scheme::S3 => "s3",
             Scheme::Sled => "sled",
             Scheme::Oss => "oss",
+            Scheme::Wasabi => "wasabi",
             Scheme::Webdav => "webdav",
             Scheme::Webhdfs => "webhdfs",
             Scheme::Custom(v) => v,

Reply via email to