This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch drop-wasabi
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git

commit 1a2bb992c2aa4d381379f8873061448b1d63372f
Author: Xuanwo <[email protected]>
AuthorDate: Wed Nov 1 19:36:15 2023 +0800

    refactor(services/wasabi)!: Remove native support for wasabi services
    
    Signed-off-by: Xuanwo <[email protected]>
---
 .github/workflows/service_test_wasabi.yml |  68 ---
 core/Cargo.toml                           |   9 +-
 core/src/services/mod.rs                  |   5 -
 core/src/services/wasabi/backend.rs       | 951 ------------------------------
 core/src/services/wasabi/core.rs          | 911 ----------------------------
 core/src/services/wasabi/docs.md          | 198 -------
 core/src/services/wasabi/error.rs         | 131 ----
 core/src/services/wasabi/mod.rs           |  24 -
 core/src/services/wasabi/pager.rs         | 231 --------
 core/src/services/wasabi/writer.rs        |  65 --
 core/src/types/operator/builder.rs        |   2 -
 core/src/types/scheme.rs                  |   6 -
 12 files changed, 4 insertions(+), 2597 deletions(-)

diff --git a/.github/workflows/service_test_wasabi.yml 
b/.github/workflows/service_test_wasabi.yml
deleted file mode 100644
index f8dd015af..000000000
--- a/.github/workflows/service_test_wasabi.yml
+++ /dev/null
@@ -1,68 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-name: Service Test Wasabi
-
-on:
-  push:
-    branches:
-      - main
-  pull_request:
-    branches:
-      - main
-    paths:
-      - "core/src/**"
-      - "core/tests/**"
-      - "!core/src/docs/**"
-      - "!core/src/services/**"
-      - "core/src/services/wasabi/**"
-      - ".github/workflows/service_test_wasabi.yml"
-
-concurrency:
-  group: ${{ github.workflow }}-${{ github.ref }}-${{ github.event_name }}
-  cancel-in-progress: true
-
-jobs:
-  wasabi:
-    runs-on: ubuntu-latest
-    if: (github.event_name == 'push' && github.repository == 
'apache/incubator-opendal') || !github.event.pull_request.head.repo.fork
-    steps:
-      - uses: actions/checkout@v4
-      - name: Setup Rust toolchain
-        uses: ./.github/actions/setup
-        with:
-          need-nextest: true
-
-      - name: Load secret
-        id: op-load-secret
-        uses: 1password/load-secrets-action@v1
-        with:
-          export-env: true
-        env:
-          OP_SERVICE_ACCOUNT_TOKEN: ${{ secrets.OP_SERVICE_ACCOUNT_TOKEN }}
-          OPENDAL_WASABI_REGION: op://services/wasabi/region
-          OPENDAL_WASABI_BUCKET: op://services/wasabi/bucket
-          OPENDAL_WASABI_ENDPOINT: op://services/wasabi/endpoint
-          OPENDAL_WASABI_ACCESS_KEY_ID: op://services/wasabi/access_key_id
-          OPENDAL_WASABI_SECRET_ACCESS_KEY: 
op://services/wasabi/secret_access_key
-
-      - name: Test
-        shell: bash
-        working-directory: core
-        run: cargo nextest run behavior --features tests,services-wasabi
-        env:
-          OPENDAL_TEST: wasabi
diff --git a/core/Cargo.toml b/core/Cargo.toml
index 8bed9d249..e76769d42 100644
--- a/core/Cargo.toml
+++ b/core/Cargo.toml
@@ -186,11 +186,10 @@ services-sqlite = ["dep:rusqlite", "dep:r2d2"]
 services-supabase = []
 services-tikv = ["tikv-client"]
 services-vercel-artifacts = []
-services-wasabi = [
-  "dep:reqsign",
-  "reqsign?/services-aws",
-  "reqsign?/reqwest_request",
-]
+# Deprecated
+# wasabi services support has been removed.
+# We will remove this feature in next version.
+services-wasabi = []
 services-webdav = []
 services-webhdfs = []
 
diff --git a/core/src/services/mod.rs b/core/src/services/mod.rs
index 06e12933e..feaf6746f 100644
--- a/core/src/services/mod.rs
+++ b/core/src/services/mod.rs
@@ -164,11 +164,6 @@ mod supabase;
 #[cfg(feature = "services-supabase")]
 pub use supabase::Supabase;
 
-#[cfg(feature = "services-wasabi")]
-mod wasabi;
-#[cfg(feature = "services-wasabi")]
-pub use wasabi::Wasabi;
-
 #[cfg(feature = "services-webdav")]
 mod webdav;
 #[cfg(feature = "services-webdav")]
diff --git a/core/src/services/wasabi/backend.rs 
b/core/src/services/wasabi/backend.rs
deleted file mode 100644
index c2f746e77..000000000
--- a/core/src/services/wasabi/backend.rs
+++ /dev/null
@@ -1,951 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use std::collections::HashMap;
-use std::fmt::Debug;
-use std::fmt::Formatter;
-use std::fmt::Write;
-use std::sync::Arc;
-
-use async_trait::async_trait;
-use base64::prelude::BASE64_STANDARD;
-use base64::Engine;
-use bytes::Buf;
-use http::StatusCode;
-use log::debug;
-use md5::Digest;
-use md5::Md5;
-use once_cell::sync::Lazy;
-use reqsign::AwsConfig;
-use reqsign::AwsCredentialLoad;
-use reqsign::AwsDefaultLoader;
-use reqsign::AwsV4Signer;
-
-use super::core::*;
-use super::error::parse_error;
-use super::error::parse_wasabi_error_code;
-use super::pager::WasabiPager;
-use super::writer::WasabiWriter;
-use crate::raw::*;
-use crate::*;
-
-/// Allow constructing correct region endpoint if user gives a global endpoint.
-static ENDPOINT_TEMPLATES: Lazy<HashMap<&'static str, &'static str>> = 
Lazy::new(|| {
-    let mut m = HashMap::new();
-    // AWS S3 Service.
-    m.insert(
-        "https://s3.wasabisys.com";,
-        "https://s3.{region}.wasabisys.com";,
-    );
-    m
-});
-
-/// Wasabi (an aws S3 compatible service) support
-#[doc = include_str!("docs.md")]
-#[derive(Default)]
-pub struct WasabiBuilder {
-    root: Option<String>,
-
-    bucket: String,
-    endpoint: Option<String>,
-    region: Option<String>,
-    role_arn: Option<String>,
-    external_id: Option<String>,
-    access_key_id: Option<String>,
-    secret_access_key: Option<String>,
-    server_side_encryption: Option<String>,
-    server_side_encryption_aws_kms_key_id: Option<String>,
-    server_side_encryption_customer_algorithm: Option<String>,
-    server_side_encryption_customer_key: Option<String>,
-    server_side_encryption_customer_key_md5: Option<String>,
-    default_storage_class: Option<String>,
-
-    /// temporary credentials, check the official 
[doc](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_temp.html)
 for detail
-    security_token: Option<String>,
-
-    disable_config_load: bool,
-    disable_ec2_metadata: bool,
-    enable_virtual_host_style: bool,
-
-    http_client: Option<HttpClient>,
-    customed_credential_load: Option<Box<dyn AwsCredentialLoad>>,
-}
-
-impl Debug for WasabiBuilder {
-    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
-        let mut d = f.debug_struct("Builder");
-
-        d.field("root", &self.root)
-            .field("bucket", &self.bucket)
-            .field("endpoint", &self.endpoint)
-            .field("region", &self.region);
-
-        d.finish_non_exhaustive()
-    }
-}
-
-impl WasabiBuilder {
-    /// Set root of this backend.
-    ///
-    /// All operations will happen under this root.
-    pub fn root(&mut self, root: &str) -> &mut Self {
-        self.root = if root.is_empty() {
-            None
-        } else {
-            Some(root.to_string())
-        };
-
-        self
-    }
-
-    /// Set bucket name of this backend.
-    pub fn bucket(&mut self, bucket: &str) -> &mut Self {
-        self.bucket = bucket.to_string();
-
-        self
-    }
-
-    /// Set endpoint of this backend.
-    ///
-    /// Endpoint must be full uri, e.g.
-    ///
-    /// - `https://s3.wasabisys.com` or `https://s3.{region}.wasabisys.com`
-    ///
-    /// If user inputs endpoint without scheme like "s3.wasabisys.com", we
-    /// will prepend "https://"; before it.
-    pub fn endpoint(&mut self, endpoint: &str) -> &mut Self {
-        if !endpoint.is_empty() {
-            // Trim trailing `/` so that we can accept `http://127.0.0.1:9000/`
-            self.endpoint = Some(endpoint.trim_end_matches('/').to_string())
-        }
-
-        self
-    }
-
-    /// Region represent the signing region of this endpoint.
-    ///
-    /// - If region is set, we will take user's input first.
-    /// - If not, the default `us-east-1` will be used.
-    pub fn region(&mut self, region: &str) -> &mut Self {
-        if !region.is_empty() {
-            self.region = Some(region.to_string())
-        }
-
-        self
-    }
-
-    /// Set access_key_id of this backend.
-    ///
-    /// - If access_key_id is set, we will take user's input first.
-    /// - If not, we will try to load it from environment.
-    pub fn access_key_id(&mut self, v: &str) -> &mut Self {
-        if !v.is_empty() {
-            self.access_key_id = Some(v.to_string())
-        }
-
-        self
-    }
-
-    /// Set secret_access_key of this backend.
-    ///
-    /// - If secret_access_key is set, we will take user's input first.
-    /// - If not, we will try to load it from environment.
-    pub fn secret_access_key(&mut self, v: &str) -> &mut Self {
-        if !v.is_empty() {
-            self.secret_access_key = Some(v.to_string())
-        }
-
-        self
-    }
-
-    /// Set role_arn for this backend.
-    pub fn role_arn(&mut self, v: &str) -> &mut Self {
-        if !v.is_empty() {
-            self.role_arn = Some(v.to_string())
-        }
-
-        self
-    }
-
-    /// Set external_id for this backend.
-    pub fn external_id(&mut self, v: &str) -> &mut Self {
-        if !v.is_empty() {
-            self.external_id = Some(v.to_string())
-        }
-
-        self
-    }
-
-    /// Set default storage_class for this backend.
-    /// Unlike S3, wasabi only supports one single storage class,
-    /// which is most like standard S3 storage class,
-    /// check `https://docs.wasabi.com/docs/operations-on-objects` for more 
details.
-    ///
-    /// Available values:
-    /// - `STANDARD`
-    pub fn default_storage_class(&mut self, v: &str) -> &mut Self {
-        if !v.is_empty() {
-            self.default_storage_class = Some(v.to_string())
-        }
-
-        self
-    }
-
-    /// Set server_side_encryption for this backend.
-    ///
-    /// Available values: `AES256`, `aws:kms`.
-    ///
-    /// # Note
-    ///
-    /// This function is the low-level setting for SSE related features.
-    ///
-    /// SSE related options should be set carefully to make them works.
-    /// Please use `server_side_encryption_with_*` helpers if even possible.
-    pub fn server_side_encryption(&mut self, v: &str) -> &mut Self {
-        if !v.is_empty() {
-            self.server_side_encryption = Some(v.to_string())
-        }
-
-        self
-    }
-
-    /// Set server_side_encryption_aws_kms_key_id for this backend
-    ///
-    /// - If `server_side_encryption` set to `aws:kms`, and 
`server_side_encryption_aws_kms_key_id`
-    /// is not set, S3 will use aws managed kms key to encrypt data.
-    /// - If `server_side_encryption` set to `aws:kms`, and 
`server_side_encryption_aws_kms_key_id`
-    /// is a valid kms key id, S3 will use the provided kms key to encrypt 
data.
-    /// - If the `server_side_encryption_aws_kms_key_id` is invalid or not 
found, an error will be
-    /// returned.
-    /// - If `server_side_encryption` is not `aws:kms`, setting 
`server_side_encryption_aws_kms_key_id`
-    /// is a noop.
-    ///
-    /// # Note
-    ///
-    /// This function is the low-level setting for SSE related features.
-    ///
-    /// SSE related options should be set carefully to make them works.
-    /// Please use `server_side_encryption_with_*` helpers if even possible.
-    pub fn server_side_encryption_aws_kms_key_id(&mut self, v: &str) -> &mut 
Self {
-        if !v.is_empty() {
-            self.server_side_encryption_aws_kms_key_id = Some(v.to_string())
-        }
-
-        self
-    }
-
-    /// Set server_side_encryption_customer_algorithm for this backend.
-    ///
-    /// Available values: `AES256`.
-    ///
-    /// # Note
-    ///
-    /// This function is the low-level setting for SSE related features.
-    ///
-    /// SSE related options should be set carefully to make them works.
-    /// Please use `server_side_encryption_with_*` helpers if even possible.
-    pub fn server_side_encryption_customer_algorithm(&mut self, v: &str) -> 
&mut Self {
-        if !v.is_empty() {
-            self.server_side_encryption_customer_algorithm = 
Some(v.to_string())
-        }
-
-        self
-    }
-
-    /// Set server_side_encryption_customer_key for this backend.
-    ///
-    /// # Args
-    ///
-    /// `v`: base64 encoded key that matches algorithm specified in
-    /// `server_side_encryption_customer_algorithm`.
-    ///
-    /// # Note
-    ///
-    /// This function is the low-level setting for SSE related features.
-    ///
-    /// SSE related options should be set carefully to make them works.
-    /// Please use `server_side_encryption_with_*` helpers if even possible.
-    pub fn server_side_encryption_customer_key(&mut self, v: &str) -> &mut 
Self {
-        if !v.is_empty() {
-            self.server_side_encryption_customer_key = Some(v.to_string())
-        }
-
-        self
-    }
-
-    /// Set server_side_encryption_customer_key_md5 for this backend.
-    ///
-    /// # Args
-    ///
-    /// `v`: MD5 digest of key specified in 
`server_side_encryption_customer_key`.
-    ///
-    /// # Note
-    ///
-    /// This function is the low-level setting for SSE related features.
-    ///
-    /// SSE related options should be set carefully to make them works.
-    /// Please use `server_side_encryption_with_*` helpers if even possible.
-    pub fn server_side_encryption_customer_key_md5(&mut self, v: &str) -> &mut 
Self {
-        if !v.is_empty() {
-            self.server_side_encryption_customer_key_md5 = Some(v.to_string())
-        }
-
-        self
-    }
-
-    /// Enable server side encryption with aws managed kms key
-    ///
-    /// As known as: SSE-KMS
-    ///
-    /// NOTE: This function should not be used along with other 
`server_side_encryption_with_` functions.
-    pub fn server_side_encryption_with_aws_managed_kms_key(&mut self) -> &mut 
Self {
-        self.server_side_encryption = Some("aws:kms".to_string());
-        self
-    }
-
-    /// Enable server side encryption with customer managed kms key
-    ///
-    /// As known as: SSE-KMS
-    ///
-    /// NOTE: This function should not be used along with other 
`server_side_encryption_with_` functions.
-    pub fn server_side_encryption_with_customer_managed_kms_key(
-        &mut self,
-        aws_kms_key_id: &str,
-    ) -> &mut Self {
-        self.server_side_encryption = Some("aws:kms".to_string());
-        self.server_side_encryption_aws_kms_key_id = 
Some(aws_kms_key_id.to_string());
-        self
-    }
-
-    /// Enable server side encryption with s3 managed key
-    ///
-    /// As known as: SSE-S3
-    ///
-    /// NOTE: This function should not be used along with other 
`server_side_encryption_with_` functions.
-    pub fn server_side_encryption_with_s3_key(&mut self) -> &mut Self {
-        self.server_side_encryption = Some("AES256".to_string());
-        self
-    }
-
-    /// Enable server side encryption with customer key.
-    ///
-    /// As known as: SSE-C
-    ///
-    /// NOTE: This function should not be used along with other 
`server_side_encryption_with_` functions.
-    pub fn server_side_encryption_with_customer_key(
-        &mut self,
-        algorithm: &str,
-        key: &[u8],
-    ) -> &mut Self {
-        self.server_side_encryption_customer_algorithm = 
Some(algorithm.to_string());
-        self.server_side_encryption_customer_key = 
Some(BASE64_STANDARD.encode(key));
-        self.server_side_encryption_customer_key_md5 =
-            Some(BASE64_STANDARD.encode(Md5::digest(key).as_slice()));
-        self
-    }
-
-    /// Set temporary credential used in service connections
-    ///
-    /// # Warning
-    ///
-    /// security token's lifetime is short and requires users to refresh in 
time.
-    pub fn security_token(&mut self, token: &str) -> &mut Self {
-        if !token.is_empty() {
-            self.security_token = Some(token.to_string());
-        }
-        self
-    }
-
-    /// Disable config load so that opendal will not load config from
-    /// environment.
-    ///
-    /// For examples:
-    ///
-    /// - envs like `AWS_ACCESS_KEY_ID`
-    /// - files like `~/.aws/config`
-    pub fn disable_config_load(&mut self) -> &mut Self {
-        self.disable_config_load = true;
-        self
-    }
-
-    /// Disable load credential from ec2 metadata.
-    ///
-    /// This option is used to disable the default behavior of opendal
-    /// to load credential from ec2 metadata, a.k.a, IMDSv2
-    pub fn disable_ec2_metadata(&mut self) -> &mut Self {
-        self.disable_ec2_metadata = true;
-        self
-    }
-
-    /// Enable virtual host style so that opendal will send API requests
-    /// in virtual host style instead of path style.
-    ///
-    /// - By default, opendal will send API to 
`https://s3.us-east-1.wasabisys.com/bucket_name`
-    /// - Enabled, opendal will send API to 
`https://bucket_name.s3.us-east-1.wasabisys.com`
-    pub fn enable_virtual_host_style(&mut self) -> &mut Self {
-        self.enable_virtual_host_style = true;
-        self
-    }
-
-    /// Adding a customed credential load for service.
-    pub fn customed_credential_load(&mut self, cred: Box<dyn 
AwsCredentialLoad>) -> &mut Self {
-        self.customed_credential_load = Some(cred);
-        self
-    }
-
-    /// Specify the http client that used by this service.
-    ///
-    /// # Notes
-    ///
-    /// This API is part of OpenDAL's Raw API. `HttpClient` could be changed
-    /// during minor updates.
-    pub fn http_client(&mut self, client: HttpClient) -> &mut Self {
-        self.http_client = Some(client);
-        self
-    }
-
-    /// Check if `bucket` is valid
-    /// `bucket` must be not empty and if `enable_virtual_host_style` is true
-    /// it couldn't contain dot(.) character
-    fn is_bucket_valid(&self) -> bool {
-        if self.bucket.is_empty() {
-            return false;
-        }
-        // If enable virtual host style, `bucket` will reside in domain part,
-        // for example `https://bucket_name.s3.us-east-1.amazonaws.com`,
-        // so `bucket` with dot can't be recognized correctly for this format.
-        if self.enable_virtual_host_style && self.bucket.contains('.') {
-            return false;
-        }
-        true
-    }
-
-    /// Build endpoint with given region.
-    fn build_endpoint(&self, region: &str) -> String {
-        let bucket = {
-            debug_assert!(self.is_bucket_valid(), "bucket must be valid");
-
-            self.bucket.as_str()
-        };
-
-        let mut endpoint = match &self.endpoint {
-            Some(endpoint) => {
-                if endpoint.starts_with("http") {
-                    endpoint.to_string()
-                } else {
-                    // Prefix https if endpoint doesn't start with scheme.
-                    format!("https://{endpoint}";)
-                }
-            }
-            None => "https://s3.wasabisys.com".to_string(),
-        };
-
-        // If endpoint contains bucket name, we should trim them.
-        endpoint = endpoint.replace(&format!("//{bucket}."), "//");
-
-        // Update with endpoint templates.
-        endpoint = if let Some(template) = 
ENDPOINT_TEMPLATES.get(endpoint.as_str()) {
-            template.replace("{region}", region)
-        } else {
-            // If we don't know where about this endpoint, just leave
-            // them as it.
-            endpoint.to_string()
-        };
-
-        // Apply virtual host style.
-        if self.enable_virtual_host_style {
-            endpoint = endpoint.replace("//", &format!("//{bucket}."))
-        } else {
-            write!(endpoint, "/{bucket}").expect("write into string must 
succeed");
-        };
-
-        endpoint
-    }
-}
-
-impl Builder for WasabiBuilder {
-    const SCHEME: Scheme = Scheme::Wasabi;
-    type Accessor = WasabiBackend;
-
-    fn from_map(map: HashMap<String, String>) -> Self {
-        let mut builder = WasabiBuilder::default();
-
-        map.get("root").map(|v| builder.root(v));
-        map.get("bucket").map(|v| builder.bucket(v));
-        map.get("endpoint").map(|v| builder.endpoint(v));
-        map.get("region").map(|v| builder.region(v));
-        map.get("access_key_id").map(|v| builder.access_key_id(v));
-        map.get("secret_access_key")
-            .map(|v| builder.secret_access_key(v));
-        map.get("security_token").map(|v| builder.security_token(v));
-        map.get("role_arn").map(|v| builder.role_arn(v));
-        map.get("external_id").map(|v| builder.external_id(v));
-        map.get("server_side_encryption")
-            .map(|v| builder.server_side_encryption(v));
-        map.get("server_side_encryption_aws_kms_key_id")
-            .map(|v| builder.server_side_encryption_aws_kms_key_id(v));
-        map.get("server_side_encryption_customer_algorithm")
-            .map(|v| builder.server_side_encryption_customer_algorithm(v));
-        map.get("server_side_encryption_customer_key")
-            .map(|v| builder.server_side_encryption_customer_key(v));
-        map.get("server_side_encryption_customer_key_md5")
-            .map(|v| builder.server_side_encryption_customer_key_md5(v));
-        map.get("disable_config_load")
-            .filter(|v| *v == "on" || *v == "true")
-            .map(|_| builder.disable_config_load());
-        map.get("disable_ec2_metadata")
-            .filter(|v| *v == "on" || *v == "true")
-            .map(|_| builder.disable_ec2_metadata());
-        map.get("enable_virtual_host_style")
-            .filter(|v| *v == "on" || *v == "true")
-            .map(|_| builder.enable_virtual_host_style());
-        map.get("default_storage_class")
-            .map(|v| builder.default_storage_class(v));
-
-        builder
-    }
-
-    fn build(&mut self) -> Result<Self::Accessor> {
-        debug!("backend build started: {:?}", &self);
-
-        let root = normalize_root(&self.root.take().unwrap_or_default());
-        debug!("backend use root {}", &root);
-
-        // Handle bucket name.
-        let bucket = if self.is_bucket_valid() {
-            Ok(&self.bucket)
-        } else {
-            Err(
-                Error::new(ErrorKind::ConfigInvalid, "The bucket is 
misconfigured")
-                    .with_context("service", Scheme::S3),
-            )
-        }?;
-        debug!("backend use bucket {}", &bucket);
-
-        let default_storage_class = match &self.default_storage_class {
-            None => None,
-            Some(v) => Some(
-                build_header_value(v).map_err(|err| err.with_context("key", 
"storage_class"))?,
-            ),
-        };
-
-        let server_side_encryption = match &self.server_side_encryption {
-            None => None,
-            Some(v) => Some(
-                build_header_value(v)
-                    .map_err(|err| err.with_context("key", 
"server_side_encryption"))?,
-            ),
-        };
-
-        let server_side_encryption_aws_kms_key_id =
-            match &self.server_side_encryption_aws_kms_key_id {
-                None => None,
-                Some(v) => Some(build_header_value(v).map_err(|err| {
-                    err.with_context("key", 
"server_side_encryption_aws_kms_key_id")
-                })?),
-            };
-
-        let server_side_encryption_customer_algorithm =
-            match &self.server_side_encryption_customer_algorithm {
-                None => None,
-                Some(v) => Some(build_header_value(v).map_err(|err| {
-                    err.with_context("key", 
"server_side_encryption_customer_algorithm")
-                })?),
-            };
-
-        let server_side_encryption_customer_key =
-            match &self.server_side_encryption_customer_key {
-                None => None,
-                Some(v) => Some(build_header_value(v).map_err(|err| {
-                    err.with_context("key", 
"server_side_encryption_customer_key")
-                })?),
-            };
-
-        let server_side_encryption_customer_key_md5 =
-            match &self.server_side_encryption_customer_key_md5 {
-                None => None,
-                Some(v) => Some(build_header_value(v).map_err(|err| {
-                    err.with_context("key", 
"server_side_encryption_customer_key_md5")
-                })?),
-            };
-
-        let client = if let Some(client) = self.http_client.take() {
-            client
-        } else {
-            HttpClient::new().map_err(|err| {
-                err.with_operation("Builder::build")
-                    .with_context("service", Scheme::S3)
-            })?
-        };
-
-        let mut cfg = AwsConfig::default();
-        if !self.disable_config_load {
-            cfg = cfg.from_profile();
-            cfg = cfg.from_env();
-        }
-
-        // Setting all value from user input if available.
-        if let Some(v) = self.region.take() {
-            cfg.region = Some(v);
-        }
-        if let Some(v) = self.access_key_id.take() {
-            cfg.access_key_id = Some(v)
-        }
-        if let Some(v) = self.secret_access_key.take() {
-            cfg.secret_access_key = Some(v)
-        }
-        if let Some(v) = self.security_token.take() {
-            cfg.session_token = Some(v)
-        }
-        if let Some(v) = self.role_arn.take() {
-            cfg.role_arn = Some(v)
-        }
-        if let Some(v) = self.external_id.take() {
-            cfg.external_id = Some(v)
-        }
-
-        if cfg.region.is_none() {
-            // region is required to make signer work.
-            //
-            // If we don't know region after loading from builder and env.
-            // We will use `us-east-1` as default.
-            cfg.region = Some("us-east-1".to_string());
-        }
-
-        let region = cfg.region.to_owned().unwrap();
-        debug!("backend use region: {region}");
-
-        // Building endpoint.
-        let endpoint = self.build_endpoint(&region);
-        debug!("backend use endpoint: {endpoint}");
-
-        let mut loader = AwsDefaultLoader::new(client.client(), cfg);
-        if self.disable_ec2_metadata {
-            loader = loader.with_disable_ec2_metadata();
-        }
-
-        let signer = AwsV4Signer::new("s3", &region);
-
-        debug!("backend build finished");
-        Ok(WasabiBackend {
-            core: Arc::new(WasabiCore {
-                bucket: bucket.to_string(),
-                endpoint,
-                root,
-                server_side_encryption,
-                server_side_encryption_aws_kms_key_id,
-                server_side_encryption_customer_algorithm,
-                server_side_encryption_customer_key,
-                server_side_encryption_customer_key_md5,
-                default_storage_class,
-                signer,
-                loader,
-                client,
-            }),
-        })
-    }
-}
-
-/// Backend for wasabi service.
-#[derive(Debug, Clone)]
-pub struct WasabiBackend {
-    core: Arc<WasabiCore>,
-}
-
-#[async_trait]
-impl Accessor for WasabiBackend {
-    type Reader = IncomingAsyncBody;
-    type BlockingReader = ();
-    type Writer = oio::OneShotWriter<WasabiWriter>;
-    type BlockingWriter = ();
-    type Pager = WasabiPager;
-    type BlockingPager = ();
-
-    fn info(&self) -> AccessorInfo {
-        let mut am = AccessorInfo::default();
-        am.set_scheme(Scheme::Wasabi)
-            .set_root(&self.core.root)
-            .set_name(&self.core.bucket)
-            .set_native_capability(Capability {
-                stat: true,
-                stat_with_if_match: true,
-                stat_with_if_none_match: true,
-
-                read: true,
-                read_can_next: true,
-                read_with_range: true,
-
-                write: true,
-                create_dir: true,
-                delete: true,
-                copy: true,
-                rename: true,
-
-                list: true,
-                list_without_delimiter: true,
-                list_with_delimiter_slash: true,
-
-                presign: true,
-                presign_stat: true,
-                presign_read: true,
-                presign_write: true,
-
-                batch: true,
-
-                ..Default::default()
-            });
-
-        am
-    }
-
-    async fn create_dir(&self, path: &str, _: OpCreateDir) -> 
Result<RpCreateDir> {
-        let mut req =
-            self.core
-                .put_object_request(path, Some(0), &OpWrite::default(), 
AsyncBody::Empty)?;
-
-        self.core.sign(&mut req).await?;
-
-        let resp = self.core.send(req).await?;
-
-        let status = resp.status();
-
-        match status {
-            StatusCode::CREATED | StatusCode::OK => {
-                resp.into_body().consume().await?;
-                Ok(RpCreateDir::default())
-            }
-            _ => Err(parse_error(resp).await?),
-        }
-    }
-
-    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, 
Self::Reader)> {
-        let resp = self.core.get_object(path, &args).await?;
-
-        let status = resp.status();
-
-        match status {
-            StatusCode::OK | StatusCode::PARTIAL_CONTENT => Ok((RpRead::new(), 
resp.into_body())),
-            _ => Err(parse_error(resp).await?),
-        }
-    }
-
-    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
-        Ok((
-            RpWrite::default(),
-            oio::OneShotWriter::new(WasabiWriter::new(self.core.clone(), args, 
path.to_string())),
-        ))
-    }
-
-    async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> 
Result<RpCopy> {
-        let resp = self.core.copy_object(from, to).await?;
-
-        let status = resp.status();
-
-        match status {
-            StatusCode::OK => {
-                // According to the documentation, when using copy_object, a 
200 error may occur and we need to detect it.
-                // 
https://docs.aws.amazon.com/AmazonS3/latest/API/API_CopyObject.html#API_CopyObject_RequestSyntax
-                resp.into_body().consume().await?;
-
-                Ok(RpCopy::default())
-            }
-            _ => Err(parse_error(resp).await?),
-        }
-    }
-
-    async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
-        // Stat root always returns a DIR.
-        if path == "/" {
-            return Ok(RpStat::new(Metadata::new(EntryMode::DIR)));
-        }
-
-        let resp = self.core.head_object(path, &args).await?;
-
-        let status = resp.status();
-
-        match status {
-            StatusCode::OK => parse_into_metadata(path, 
resp.headers()).map(RpStat::new),
-            StatusCode::NOT_FOUND if path.ends_with('/') => {
-                Ok(RpStat::new(Metadata::new(EntryMode::DIR)))
-            }
-            _ => Err(parse_error(resp).await?),
-        }
-    }
-
-    async fn delete(&self, path: &str, _: OpDelete) -> Result<RpDelete> {
-        let resp = self.core.delete_object(path).await?;
-
-        let status = resp.status();
-
-        match status {
-            StatusCode::NO_CONTENT => Ok(RpDelete::default()),
-            _ => Err(parse_error(resp).await?),
-        }
-    }
-
-    async fn list(&self, path: &str, args: OpList) -> Result<(RpList, 
Self::Pager)> {
-        Ok((
-            RpList::default(),
-            WasabiPager::new(self.core.clone(), path, args.delimiter(), 
args.limit()),
-        ))
-    }
-
-    async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
-        // We will not send this request out, just for signing.
-        let mut req = match args.operation() {
-            PresignOperation::Stat(v) => self.core.head_object_request(path, 
v)?,
-            PresignOperation::Read(v) => self.core.get_object_request(path, 
v)?,
-            PresignOperation::Write(_) => {
-                self.core
-                    .put_object_request(path, None, &OpWrite::default(), 
AsyncBody::Empty)?
-            }
-        };
-
-        self.core.sign_query(&mut req, args.expire()).await?;
-
-        // We don't need this request anymore, consume it directly.
-        let (parts, _) = req.into_parts();
-
-        Ok(RpPresign::new(PresignedRequest::new(
-            parts.method,
-            parts.uri,
-            parts.headers,
-        )))
-    }
-
-    async fn batch(&self, args: OpBatch) -> Result<RpBatch> {
-        let ops = args.into_operation();
-        if ops.len() > 1000 {
-            return Err(Error::new(
-                ErrorKind::Unsupported,
-                "wasabi services only allow delete up to 1000 keys at once",
-            )
-            .with_context("length", ops.len().to_string()));
-        }
-
-        let paths = ops.into_iter().map(|(p, _)| p).collect();
-
-        let resp = self.core.delete_objects(paths).await?;
-
-        let status = resp.status();
-
-        if let StatusCode::OK = status {
-            let bs = resp.into_body().bytes().await?;
-
-            let result: DeleteObjectsResult =
-                
quick_xml::de::from_reader(bs.reader()).map_err(new_xml_deserialize_error)?;
-
-            let mut batched_result = Vec::with_capacity(result.deleted.len() + 
result.error.len());
-            for i in result.deleted {
-                let path = build_rel_path(&self.core.root, &i.key);
-                batched_result.push((path, Ok(RpDelete::default().into())));
-            }
-            // TODO: we should handle those errors with code.
-            for i in result.error {
-                let path = build_rel_path(&self.core.root, &i.key);
-
-                // set the error kind and mark temporary if retryable
-                let (kind, retryable) =
-                    
parse_wasabi_error_code(&i.code).unwrap_or((ErrorKind::Unexpected, false));
-                let mut err: Error = Error::new(kind, &format!("{i:?}"));
-                if retryable {
-                    err = err.set_temporary();
-                }
-
-                batched_result.push((path, Err(err)));
-            }
-
-            Ok(RpBatch::new(batched_result))
-        } else {
-            Err(parse_error(resp).await?)
-        }
-    }
-
-    /// Execute rename API call
-    /// Wasabi will auto-create missing path for destination `to` if any
-    async fn rename(&self, from: &str, to: &str, _args: OpRename) -> 
Result<RpRename> {
-        let resp = self.core.rename_object(from, to).await?;
-
-        let status = resp.status();
-
-        match status {
-            StatusCode::OK => Ok(RpRename::default()),
-            _ => Err(parse_error(resp).await?),
-        }
-    }
-}
-
-#[cfg(test)]
-mod tests {
-    use super::*;
-
-    #[test]
-    fn test_is_valid_bucket() {
-        let bucket_cases = vec![
-            ("", false, false),
-            ("test", false, true),
-            ("test.xyz", false, true),
-            ("", true, false),
-            ("test", true, true),
-            ("test.xyz", true, false),
-        ];
-
-        for (bucket, enable_virtual_host_style, expected) in bucket_cases {
-            let mut b = WasabiBuilder::default();
-            b.bucket(bucket);
-            if enable_virtual_host_style {
-                b.enable_virtual_host_style();
-            }
-            assert_eq!(b.is_bucket_valid(), expected)
-        }
-    }
-
-    #[test]
-    fn test_build_endpoint() {
-        let _ = tracing_subscriber::fmt().with_test_writer().try_init();
-
-        let endpoint_cases = vec![
-            Some("s3.wasabisys.com"),
-            Some("https://s3.wasabisys.com";),
-            Some("https://s3.us-east-2.wasabisys.com";),
-            None,
-        ];
-
-        for endpoint in &endpoint_cases {
-            let mut b = WasabiBuilder::default();
-            b.bucket("test");
-            if let Some(endpoint) = endpoint {
-                b.endpoint(endpoint);
-            }
-
-            let endpoint = b.build_endpoint("us-east-2");
-            assert_eq!(endpoint, "https://s3.us-east-2.wasabisys.com/test";);
-        }
-
-        for endpoint in &endpoint_cases {
-            let mut b = WasabiBuilder::default();
-            b.bucket("test");
-            b.enable_virtual_host_style();
-            if let Some(endpoint) = endpoint {
-                b.endpoint(endpoint);
-            }
-
-            let endpoint = b.build_endpoint("us-east-2");
-            assert_eq!(endpoint, "https://test.s3.us-east-2.wasabisys.com";);
-        }
-    }
-}
diff --git a/core/src/services/wasabi/core.rs b/core/src/services/wasabi/core.rs
deleted file mode 100644
index 2a44bb564..000000000
--- a/core/src/services/wasabi/core.rs
+++ /dev/null
@@ -1,911 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use std::fmt;
-use std::fmt::Debug;
-use std::fmt::Formatter;
-use std::fmt::Write;
-use std::time::Duration;
-
-use bytes::Bytes;
-use http::header::HeaderName;
-use http::header::CACHE_CONTROL;
-use http::header::CONTENT_DISPOSITION;
-use http::header::CONTENT_LENGTH;
-use http::header::CONTENT_TYPE;
-use http::HeaderValue;
-use http::Request;
-use http::Response;
-use reqsign::AwsCredential;
-use reqsign::AwsDefaultLoader;
-use reqsign::AwsV4Signer;
-use serde::Deserialize;
-use serde::Serialize;
-
-use crate::raw::*;
-use crate::*;
-
-mod constants {
-    pub const X_AMZ_COPY_SOURCE: &str = "x-amz-copy-source";
-
-    pub const X_AMZ_SERVER_SIDE_ENCRYPTION: &str = 
"x-amz-server-side-encryption";
-    pub const X_AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_ALGORITHM: &str =
-        "x-amz-server-side-encryption-customer-algorithm";
-    pub const X_AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY: &str =
-        "x-amz-server-side-encryption-customer-key";
-    pub const X_AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY_MD5: &str =
-        "x-amz-server-side-encryption-customer-key-md5";
-    pub const X_AMZ_SERVER_SIDE_ENCRYPTION_AWS_KMS_KEY_ID: &str =
-        "x-amz-server-side-encryption-aws-kms-key-id";
-    pub const X_AMZ_STORAGE_CLASS: &str = "x-amz-storage-class";
-
-    #[allow(dead_code)]
-    pub const X_AMZ_COPY_SOURCE_SERVER_SIDE_ENCRYPTION_CUSTOMER_ALGORITHM: 
&str =
-        "x-amz-copy-source-server-side-encryption-customer-algorithm";
-    #[allow(dead_code)]
-    pub const X_AMZ_COPY_SOURCE_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY: &str =
-        "x-amz-copy-source-server-side-encryption-customer-key";
-    #[allow(dead_code)]
-    pub const X_AMZ_COPY_SOURCE_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY_MD5: &str =
-        "x-amz-copy-source-server-side-encryption-customer-key-md5";
-
-    pub const RESPONSE_CONTENT_DISPOSITION: &str = 
"response-content-disposition";
-    pub const RESPONSE_CACHE_CONTROL: &str = "response-cache-control";
-
-    pub const DESTINATION: &str = "Destination";
-    pub const OVERWRITE: &str = "Overwrite";
-}
-
-pub struct WasabiCore {
-    pub bucket: String,
-    pub endpoint: String,
-    pub root: String,
-    pub server_side_encryption: Option<HeaderValue>,
-    pub server_side_encryption_aws_kms_key_id: Option<HeaderValue>,
-    pub server_side_encryption_customer_algorithm: Option<HeaderValue>,
-    pub server_side_encryption_customer_key: Option<HeaderValue>,
-    pub server_side_encryption_customer_key_md5: Option<HeaderValue>,
-    pub default_storage_class: Option<HeaderValue>,
-
-    pub signer: AwsV4Signer,
-    pub loader: AwsDefaultLoader,
-    pub client: HttpClient,
-}
-
-impl Debug for WasabiCore {
-    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
-        f.debug_struct("WasabiCore")
-            .field("bucket", &self.bucket)
-            .field("endpoint", &self.endpoint)
-            .field("root", &self.root)
-            .finish_non_exhaustive()
-    }
-}
-
-impl WasabiCore {
-    /// If credential is not found, we will not sign the request.
-    async fn load_credential(&self) -> Result<Option<AwsCredential>> {
-        let cred = self
-            .loader
-            .load()
-            .await
-            .map_err(new_request_credential_error)?;
-
-        if let Some(cred) = cred {
-            Ok(Some(cred))
-        } else {
-            // Mark this error as temporary since it could be caused by AWS 
STS.
-            Err(Error::new(
-                ErrorKind::PermissionDenied,
-                "no valid credential found, please check configuration or try 
again",
-            )
-            .set_temporary())
-        }
-    }
-
-    pub async fn sign<T>(&self, req: &mut Request<T>) -> Result<()> {
-        let cred = if let Some(cred) = self.load_credential().await? {
-            cred
-        } else {
-            return Ok(());
-        };
-
-        self.signer.sign(req, &cred).map_err(new_request_sign_error)
-    }
-
-    pub async fn sign_query<T>(&self, req: &mut Request<T>, duration: 
Duration) -> Result<()> {
-        let cred = if let Some(cred) = self.load_credential().await? {
-            cred
-        } else {
-            return Ok(());
-        };
-
-        self.signer
-            .sign_query(req, duration, &cred)
-            .map_err(new_request_sign_error)
-    }
-
-    #[inline]
-    pub async fn send(&self, req: Request<AsyncBody>) -> 
Result<Response<IncomingAsyncBody>> {
-        self.client.send(req).await
-    }
-
-    /// # Note
-    ///
-    /// header like X_AMZ_SERVER_SIDE_ENCRYPTION doesn't need to set while
-    /// get or stat.
-    pub fn insert_sse_headers(
-        &self,
-        mut req: http::request::Builder,
-        is_write: bool,
-    ) -> http::request::Builder {
-        if is_write {
-            if let Some(v) = &self.server_side_encryption {
-                let mut v = v.clone();
-                v.set_sensitive(true);
-
-                req = req.header(
-                    
HeaderName::from_static(constants::X_AMZ_SERVER_SIDE_ENCRYPTION),
-                    v,
-                )
-            }
-            if let Some(v) = &self.server_side_encryption_aws_kms_key_id {
-                let mut v = v.clone();
-                v.set_sensitive(true);
-
-                req = req.header(
-                    
HeaderName::from_static(constants::X_AMZ_SERVER_SIDE_ENCRYPTION_AWS_KMS_KEY_ID),
-                    v,
-                )
-            }
-        }
-
-        if let Some(v) = &self.server_side_encryption_customer_algorithm {
-            let mut v = v.clone();
-            v.set_sensitive(true);
-
-            req = req.header(
-                
HeaderName::from_static(constants::X_AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_ALGORITHM),
-                v,
-            )
-        }
-        if let Some(v) = &self.server_side_encryption_customer_key {
-            let mut v = v.clone();
-            v.set_sensitive(true);
-
-            req = req.header(
-                
HeaderName::from_static(constants::X_AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY),
-                v,
-            )
-        }
-        if let Some(v) = &self.server_side_encryption_customer_key_md5 {
-            let mut v = v.clone();
-            v.set_sensitive(true);
-
-            req = req.header(
-                
HeaderName::from_static(constants::X_AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY_MD5),
-                v,
-            )
-        }
-
-        req
-    }
-}
-
-impl WasabiCore {
-    pub fn head_object_request(&self, path: &str, args: &OpStat) -> 
Result<Request<AsyncBody>> {
-        let p = build_abs_path(&self.root, path);
-
-        let url = format!("{}/{}", self.endpoint, percent_encode_path(&p));
-
-        let mut req = Request::head(&url);
-
-        req = self.insert_sse_headers(req, false);
-
-        if let Some(v) = args.if_match() {
-            req = req.header(http::header::IF_MATCH, v);
-        }
-
-        if let Some(v) = args.if_none_match() {
-            req = req.header(http::header::IF_NONE_MATCH, v);
-        }
-
-        let req = req
-            .body(AsyncBody::Empty)
-            .map_err(new_request_build_error)?;
-
-        Ok(req)
-    }
-
-    pub fn get_object_request(&self, path: &str, args: &OpRead) -> 
Result<Request<AsyncBody>> {
-        let p = build_abs_path(&self.root, path);
-
-        // Construct headers to add to the request
-        let mut url = format!("{}/{}", self.endpoint, percent_encode_path(&p));
-
-        // Add query arguments to the URL based on response overrides
-        let mut query_args = Vec::new();
-        if let Some(override_content_disposition) = 
args.override_content_disposition() {
-            query_args.push(format!(
-                "{}={}",
-                constants::RESPONSE_CONTENT_DISPOSITION,
-                percent_encode_path(override_content_disposition)
-            ))
-        }
-        if let Some(override_cache_control) = args.override_cache_control() {
-            query_args.push(format!(
-                "{}={}",
-                constants::RESPONSE_CACHE_CONTROL,
-                percent_encode_path(override_cache_control)
-            ))
-        }
-        if !query_args.is_empty() {
-            url.push_str(&format!("?{}", query_args.join("&")));
-        }
-
-        let mut req = Request::get(&url);
-
-        let range = args.range();
-        if !range.is_full() {
-            req = req.header(http::header::RANGE, range.to_header());
-        }
-
-        if let Some(if_none_match) = args.if_none_match() {
-            req = req.header(http::header::IF_NONE_MATCH, if_none_match);
-        }
-
-        // Set SSE headers.
-        // TODO: how will this work with presign?
-        req = self.insert_sse_headers(req, false);
-
-        let req = req
-            .body(AsyncBody::Empty)
-            .map_err(new_request_build_error)?;
-
-        Ok(req)
-    }
-
-    pub async fn get_object(
-        &self,
-        path: &str,
-        args: &OpRead,
-    ) -> Result<Response<IncomingAsyncBody>> {
-        let mut req = self.get_object_request(path, args)?;
-
-        self.sign(&mut req).await?;
-
-        self.send(req).await
-    }
-
-    pub fn put_object_request(
-        &self,
-        path: &str,
-        size: Option<usize>,
-        args: &OpWrite,
-        body: AsyncBody,
-    ) -> Result<Request<AsyncBody>> {
-        let p = build_abs_path(&self.root, path);
-
-        let url = format!("{}/{}", self.endpoint, percent_encode_path(&p));
-
-        let mut req = Request::put(&url);
-
-        if let Some(size) = size {
-            req = req.header(CONTENT_LENGTH, size)
-        }
-
-        if let Some(mime) = args.content_type() {
-            req = req.header(CONTENT_TYPE, mime)
-        }
-
-        if let Some(pos) = args.content_disposition() {
-            req = req.header(CONTENT_DISPOSITION, pos)
-        }
-
-        if let Some(cache_control) = args.cache_control() {
-            req = req.header(CACHE_CONTROL, cache_control)
-        }
-
-        // Set storage class header
-        if let Some(v) = &self.default_storage_class {
-            req = 
req.header(HeaderName::from_static(constants::X_AMZ_STORAGE_CLASS), v);
-        }
-
-        // Set SSE headers.
-        req = self.insert_sse_headers(req, true);
-
-        // Set body
-        let req = req.body(body).map_err(new_request_build_error)?;
-
-        Ok(req)
-    }
-
-    pub async fn head_object(
-        &self,
-        path: &str,
-        args: &OpStat,
-    ) -> Result<Response<IncomingAsyncBody>> {
-        let mut req = self.head_object_request(path, args)?;
-
-        self.sign(&mut req).await?;
-
-        self.send(req).await
-    }
-
-    pub async fn delete_object(&self, path: &str) -> 
Result<Response<IncomingAsyncBody>> {
-        let p = build_abs_path(&self.root, path);
-
-        let url = format!("{}/{}", self.endpoint, percent_encode_path(&p));
-
-        let mut req = Request::delete(&url)
-            .body(AsyncBody::Empty)
-            .map_err(new_request_build_error)?;
-
-        self.sign(&mut req).await?;
-
-        self.send(req).await
-    }
-
-    pub async fn copy_object(&self, from: &str, to: &str) -> 
Result<Response<IncomingAsyncBody>> {
-        let from = build_abs_path(&self.root, from);
-        let to = build_abs_path(&self.root, to);
-
-        let source = format!("{}/{}", self.bucket, percent_encode_path(&from));
-        let target = format!("{}/{}", self.endpoint, percent_encode_path(&to));
-
-        let mut req = Request::put(&target);
-
-        // Set SSE headers.
-        req = self.insert_sse_headers(req, true);
-
-        if let Some(v) = &self.server_side_encryption_customer_algorithm {
-            let mut v = v.clone();
-            v.set_sensitive(true);
-
-            req = req.header(
-                HeaderName::from_static(
-                    
constants::X_AMZ_COPY_SOURCE_SERVER_SIDE_ENCRYPTION_CUSTOMER_ALGORITHM,
-                ),
-                v,
-            )
-        }
-
-        if let Some(v) = &self.server_side_encryption_customer_key {
-            let mut v = v.clone();
-            v.set_sensitive(true);
-
-            req = req.header(
-                HeaderName::from_static(
-                    
constants::X_AMZ_COPY_SOURCE_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY,
-                ),
-                v,
-            )
-        }
-
-        if let Some(v) = &self.server_side_encryption_customer_key_md5 {
-            let mut v = v.clone();
-            v.set_sensitive(true);
-
-            req = req.header(
-                HeaderName::from_static(
-                    
constants::X_AMZ_COPY_SOURCE_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY_MD5,
-                ),
-                v,
-            )
-        }
-
-        let mut req = req
-            .header(constants::X_AMZ_COPY_SOURCE, &source)
-            .body(AsyncBody::Empty)
-            .map_err(new_request_build_error)?;
-
-        self.sign(&mut req).await?;
-
-        self.send(req).await
-    }
-
-    /// Make this functions as `pub(suber)` because `DirStream` depends
-    /// on this.
-    pub async fn list_objects(
-        &self,
-        path: &str,
-        continuation_token: &str,
-        delimiter: &str,
-        limit: Option<usize>,
-    ) -> Result<Response<IncomingAsyncBody>> {
-        let p = build_abs_path(&self.root, path);
-
-        let mut url = format!(
-            "{}?list-type=2&delimiter={delimiter}&prefix={}",
-            self.endpoint,
-            percent_encode_path(&p)
-        );
-        if let Some(limit) = limit {
-            write!(url, "&max-keys={limit}").expect("write into string must 
succeed");
-        }
-        if !continuation_token.is_empty() {
-            // AWS S3 could return continuation-token that contains `=`
-            // which could lead `reqsign` parse query wrongly.
-            // URL encode continuation-token before starting signing so that
-            // our signer will not be confused.
-            write!(
-                url,
-                "&continuation-token={}",
-                percent_encode_path(continuation_token)
-            )
-            .expect("write into string must succeed");
-        }
-
-        let mut req = Request::get(&url)
-            .body(AsyncBody::Empty)
-            .map_err(new_request_build_error)?;
-
-        self.sign(&mut req).await?;
-
-        self.send(req).await
-    }
-
-    pub async fn initiate_multipart_upload(
-        &self,
-        path: &str,
-        args: &OpWrite,
-    ) -> Result<Response<IncomingAsyncBody>> {
-        let p = build_abs_path(&self.root, path);
-
-        let url = format!("{}/{}?uploads", self.endpoint, 
percent_encode_path(&p));
-
-        let mut req = Request::post(&url);
-
-        if let Some(mime) = args.content_type() {
-            req = req.header(CONTENT_TYPE, mime)
-        }
-
-        if let Some(content_disposition) = args.content_disposition() {
-            req = req.header(CONTENT_DISPOSITION, content_disposition)
-        }
-
-        if let Some(cache_control) = args.cache_control() {
-            req = req.header(CACHE_CONTROL, cache_control)
-        }
-
-        // Set storage class header
-        if let Some(v) = &self.default_storage_class {
-            req = 
req.header(HeaderName::from_static(constants::X_AMZ_STORAGE_CLASS), v);
-        }
-
-        // Set SSE headers.
-        let req = self.insert_sse_headers(req, true);
-
-        let mut req = req
-            .body(AsyncBody::Empty)
-            .map_err(new_request_build_error)?;
-
-        self.sign(&mut req).await?;
-
-        self.send(req).await
-    }
-
-    pub fn upload_part_request(
-        &self,
-        path: &str,
-        upload_id: &str,
-        part_number: usize,
-        size: Option<u64>,
-        body: AsyncBody,
-    ) -> Result<Request<AsyncBody>> {
-        let p = build_abs_path(&self.root, path);
-
-        let url = format!(
-            "{}/{}?partNumber={}&uploadId={}",
-            self.endpoint,
-            percent_encode_path(&p),
-            part_number,
-            percent_encode_path(upload_id)
-        );
-
-        let mut req = Request::put(&url);
-
-        if let Some(size) = size {
-            req = req.header(CONTENT_LENGTH, size);
-        }
-
-        // Set SSE headers.
-        req = self.insert_sse_headers(req, true);
-
-        // Set body
-        let req = req.body(body).map_err(new_request_build_error)?;
-
-        Ok(req)
-    }
-
-    pub async fn complete_multipart_upload(
-        &self,
-        path: &str,
-        upload_id: &str,
-        parts: &[CompleteMultipartUploadRequestPart],
-    ) -> Result<Response<IncomingAsyncBody>> {
-        let p = build_abs_path(&self.root, path);
-
-        let url = format!(
-            "{}/{}?uploadId={}",
-            self.endpoint,
-            percent_encode_path(&p),
-            percent_encode_path(upload_id)
-        );
-
-        let req = Request::post(&url);
-
-        // Set SSE headers.
-        let req = self.insert_sse_headers(req, true);
-
-        let content = quick_xml::se::to_string(&CompleteMultipartUploadRequest 
{
-            part: parts.to_vec(),
-        })
-        .map_err(new_xml_deserialize_error)?;
-        // Make sure content length has been set to avoid post with chunked 
encoding.
-        let req = req.header(CONTENT_LENGTH, content.len());
-        // Set content-type to `application/xml` to avoid mixed with form post.
-        let req = req.header(CONTENT_TYPE, "application/xml");
-
-        let mut req = req
-            .body(AsyncBody::Bytes(Bytes::from(content)))
-            .map_err(new_request_build_error)?;
-
-        self.sign(&mut req).await?;
-
-        self.send(req).await
-    }
-
-    /// Abort an on-going multipart upload.
-    pub async fn abort_multipart_upload(
-        &self,
-        path: &str,
-        upload_id: &str,
-    ) -> Result<Response<IncomingAsyncBody>> {
-        let p = build_abs_path(&self.root, path);
-
-        let url = format!(
-            "{}/{}?uploadId={}",
-            self.endpoint,
-            percent_encode_path(&p),
-            percent_encode_path(upload_id)
-        );
-
-        let mut req = Request::delete(&url)
-            .body(AsyncBody::Empty)
-            .map_err(new_request_build_error)?;
-        self.sign(&mut req).await?;
-        self.send(req).await
-    }
-
-    pub async fn delete_objects(&self, paths: Vec<String>) -> 
Result<Response<IncomingAsyncBody>> {
-        let url = format!("{}/?delete", self.endpoint);
-
-        let req = Request::post(&url);
-
-        let content = quick_xml::se::to_string(&DeleteObjectsRequest {
-            object: paths
-                .into_iter()
-                .map(|path| DeleteObjectsRequestObject {
-                    key: build_abs_path(&self.root, &path),
-                })
-                .collect(),
-        })
-        .map_err(new_xml_deserialize_error)?;
-
-        // Make sure content length has been set to avoid post with chunked 
encoding.
-        let req = req.header(CONTENT_LENGTH, content.len());
-        // Set content-type to `application/xml` to avoid mixed with form post.
-        let req = req.header(CONTENT_TYPE, "application/xml");
-        // Set content-md5 as required by API.
-        let req = req.header("CONTENT-MD5", 
format_content_md5(content.as_bytes()));
-
-        let mut req = req
-            .body(AsyncBody::Bytes(Bytes::from(content)))
-            .map_err(new_request_build_error)?;
-
-        self.sign(&mut req).await?;
-
-        self.send(req).await
-    }
-
-    pub async fn put_object(
-        &self,
-        path: &str,
-        size: Option<usize>,
-        args: &OpWrite,
-        body: AsyncBody,
-    ) -> Result<Response<IncomingAsyncBody>> {
-        let mut req = self.put_object_request(path, size, args, body)?;
-
-        self.sign(&mut req).await?;
-
-        self.send(req).await
-    }
-
-    pub async fn rename_object(&self, from: &str, to: &str) -> 
Result<Response<IncomingAsyncBody>> {
-        let from = percent_encode_path(build_abs_path(&self.root, 
from).as_str());
-        let to = percent_encode_path(build_abs_path(&self.root, to).as_str());
-
-        let url = format!("{}/{}", self.endpoint, from);
-
-        let mut req = Request::builder().method("MOVE").uri(url);
-
-        // Set SSE headers.
-        req = self.insert_sse_headers(req, true);
-
-        let mut req = req
-            .header(constants::DESTINATION, to)
-            .header(constants::OVERWRITE, "true")
-            .body(AsyncBody::Empty)
-            .map_err(new_request_build_error)?;
-
-        self.sign(&mut req).await?;
-
-        self.send(req).await
-    }
-
-    pub async fn append_object(
-        &self,
-        path: &str,
-        size: Option<usize>,
-        body: AsyncBody,
-    ) -> Result<Response<IncomingAsyncBody>> {
-        let p = build_abs_path(&self.root, path);
-
-        let url = format!("{}/{}?append", self.endpoint, 
percent_encode_path(&p));
-
-        let mut req = Request::put(&url);
-
-        if let Some(size) = size {
-            req = req.header(CONTENT_LENGTH, size)
-        }
-
-        // Set storage class header
-        if let Some(v) = &self.default_storage_class {
-            req = 
req.header(HeaderName::from_static(constants::X_AMZ_STORAGE_CLASS), v);
-        }
-
-        // Set SSE headers.
-        req = self.insert_sse_headers(req, true);
-
-        let mut req = req.body(body).map_err(new_request_build_error)?;
-
-        self.sign(&mut req).await?;
-
-        self.send(req).await
-    }
-}
-
-/// Result of CreateMultipartUpload
-#[derive(Default, Debug, Deserialize)]
-#[serde(default, rename_all = "PascalCase")]
-pub struct InitiateMultipartUploadResult {
-    pub upload_id: String,
-}
-
-/// Request of CompleteMultipartUploadRequest
-#[derive(Default, Debug, Serialize)]
-#[serde(default, rename = "CompleteMultipartUpload", rename_all = 
"PascalCase")]
-pub struct CompleteMultipartUploadRequest {
-    pub part: Vec<CompleteMultipartUploadRequestPart>,
-}
-
-#[derive(Clone, Default, Debug, Serialize)]
-#[serde(default, rename_all = "PascalCase")]
-pub struct CompleteMultipartUploadRequestPart {
-    #[serde(rename = "PartNumber")]
-    pub part_number: usize,
-    /// # TODO
-    ///
-    /// quick-xml will do escape on `"` which leads to our serialized output is
-    /// not the same as aws s3's example.
-    ///
-    /// Ideally, we could use `serialize_with` to address this (buf failed)
-    ///
-    /// ```ignore
-    /// #[derive(Default, Debug, Serialize)]
-    /// #[serde(default, rename_all = "PascalCase")]
-    /// struct CompleteMultipartUploadRequestPart {
-    ///     #[serde(rename = "PartNumber")]
-    ///     part_number: usize,
-    ///     #[serde(rename = "ETag", serialize_with = "partial_escape")]
-    ///     etag: String,
-    /// }
-    ///
-    /// fn partial_escape<S>(s: &str, ser: S) -> std::result::Result<S::Ok, 
S::Error>
-    /// where
-    ///     S: serde::Serializer,
-    /// {
-    ///     ser.serialize_str(&String::from_utf8_lossy(
-    ///         &quick_xml::escape::partial_escape(s.as_bytes()),
-    ///     ))
-    /// }
-    /// ```
-    ///
-    /// ref: <https://github.com/tafia/quick-xml/issues/362>
-    #[serde(rename = "ETag")]
-    pub etag: String,
-}
-
-/// Request of DeleteObjects.
-#[derive(Default, Debug, Serialize)]
-#[serde(default, rename = "Delete", rename_all = "PascalCase")]
-pub struct DeleteObjectsRequest {
-    pub object: Vec<DeleteObjectsRequestObject>,
-}
-
-#[derive(Default, Debug, Serialize)]
-#[serde(rename_all = "PascalCase")]
-pub struct DeleteObjectsRequestObject {
-    pub key: String,
-}
-
-/// Result of DeleteObjects.
-#[derive(Default, Debug, Deserialize)]
-#[serde(default, rename = "DeleteResult", rename_all = "PascalCase")]
-pub struct DeleteObjectsResult {
-    pub deleted: Vec<DeleteObjectsResultDeleted>,
-    pub error: Vec<DeleteObjectsResultError>,
-}
-
-#[derive(Default, Debug, Deserialize)]
-#[serde(rename_all = "PascalCase")]
-pub struct DeleteObjectsResultDeleted {
-    pub key: String,
-}
-
-#[derive(Default, Debug, Deserialize)]
-#[serde(default, rename_all = "PascalCase")]
-pub struct DeleteObjectsResultError {
-    pub code: String,
-    pub key: String,
-    pub message: String,
-}
-
-#[cfg(test)]
-mod tests {
-    use bytes::Buf;
-    use bytes::Bytes;
-
-    use super::*;
-
-    /// This example is from 
https://docs.aws.amazon.com/AmazonS3/latest/API/API_CreateMultipartUpload.html#API_CreateMultipartUpload_Examples
-    #[test]
-    fn test_deserialize_initiate_multipart_upload_result() {
-        let bs = Bytes::from(
-            r#"<?xml version="1.0" encoding="UTF-8"?>
-            <InitiateMultipartUploadResult 
xmlns="http://s3.amazonaws.com/doc/2006-03-01/";>
-              <Bucket>example-bucket</Bucket>
-              <Key>example-object</Key>
-              
<UploadId>VXBsb2FkIElEIGZvciA2aWWpbmcncyBteS1tb3ZpZS5tMnRzIHVwbG9hZA</UploadId>
-            </InitiateMultipartUploadResult>"#,
-        );
-
-        let out: InitiateMultipartUploadResult =
-            quick_xml::de::from_reader(bs.reader()).expect("must success");
-
-        assert_eq!(
-            out.upload_id,
-            "VXBsb2FkIElEIGZvciA2aWWpbmcncyBteS1tb3ZpZS5tMnRzIHVwbG9hZA"
-        )
-    }
-
-    /// This example is from 
https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html#API_CompleteMultipartUpload_Examples
-    #[test]
-    fn test_serialize_complete_multipart_upload_request() {
-        let req = CompleteMultipartUploadRequest {
-            part: vec![
-                CompleteMultipartUploadRequestPart {
-                    part_number: 1,
-                    etag: "\"a54357aff0632cce46d942af68356b38\"".to_string(),
-                },
-                CompleteMultipartUploadRequestPart {
-                    part_number: 2,
-                    etag: "\"0c78aef83f66abc1fa1e8477f296d394\"".to_string(),
-                },
-                CompleteMultipartUploadRequestPart {
-                    part_number: 3,
-                    etag: "\"acbd18db4cc2f85cedef654fccc4a4d8\"".to_string(),
-                },
-            ],
-        };
-
-        let actual = quick_xml::se::to_string(&req).expect("must succeed");
-
-        pretty_assertions::assert_eq!(
-            actual,
-            r#"<CompleteMultipartUpload>
-             <Part>
-                <PartNumber>1</PartNumber>
-               <ETag>"a54357aff0632cce46d942af68356b38"</ETag>
-             </Part>
-             <Part>
-                <PartNumber>2</PartNumber>
-               <ETag>"0c78aef83f66abc1fa1e8477f296d394"</ETag>
-             </Part>
-             <Part>
-               <PartNumber>3</PartNumber>
-               <ETag>"acbd18db4cc2f85cedef654fccc4a4d8"</ETag>
-             </Part>
-            </CompleteMultipartUpload>"#
-                // Cleanup space and new line
-                .replace([' ', '\n'], "")
-                // Escape `"` by hand to address 
<https://github.com/tafia/quick-xml/issues/362>
-                .replace('"', "&quot;")
-        )
-    }
-
-    /// This example is from 
https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html#API_DeleteObjects_Examples
-    #[test]
-    fn test_serialize_delete_objects_request() {
-        let req = DeleteObjectsRequest {
-            object: vec![
-                DeleteObjectsRequestObject {
-                    key: "sample1.txt".to_string(),
-                },
-                DeleteObjectsRequestObject {
-                    key: "sample2.txt".to_string(),
-                },
-            ],
-        };
-
-        let actual = quick_xml::se::to_string(&req).expect("must succeed");
-
-        pretty_assertions::assert_eq!(
-            actual,
-            r#"<Delete>
-             <Object>
-             <Key>sample1.txt</Key>
-             </Object>
-             <Object>
-               <Key>sample2.txt</Key>
-             </Object>
-             </Delete>"#
-                // Cleanup space and new line
-                .replace([' ', '\n'], "")
-        )
-    }
-
-    /// This example is from 
https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html#API_DeleteObjects_Examples
-    #[test]
-    fn test_deserialize_delete_objects_result() {
-        let bs = Bytes::from(
-            r#"<?xml version="1.0" encoding="UTF-8"?>
-            <DeleteResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/";>
-             <Deleted>
-               <Key>sample1.txt</Key>
-             </Deleted>
-             <Error>
-              <Key>sample2.txt</Key>
-              <Code>AccessDenied</Code>
-              <Message>Access Denied</Message>
-             </Error>
-            </DeleteResult>"#,
-        );
-
-        let out: DeleteObjectsResult =
-            quick_xml::de::from_reader(bs.reader()).expect("must success");
-
-        assert_eq!(out.deleted.len(), 1);
-        assert_eq!(out.deleted[0].key, "sample1.txt");
-        assert_eq!(out.error.len(), 1);
-        assert_eq!(out.error[0].key, "sample2.txt");
-        assert_eq!(out.error[0].code, "AccessDenied");
-        assert_eq!(out.error[0].message, "Access Denied");
-    }
-}
diff --git a/core/src/services/wasabi/docs.md b/core/src/services/wasabi/docs.md
deleted file mode 100644
index 4b2cb2d42..000000000
--- a/core/src/services/wasabi/docs.md
+++ /dev/null
@@ -1,198 +0,0 @@
-## Capabilities
-
-This service can be used to:
-
-- [x] stat
-- [x] read
-- [x] write
-- [x] create_dir
-- [x] delete
-- [x] copy
-- [x] rename
-- [x] list
-- [x] scan
-- [x] presign
-- [ ] blocking
-
-## Configuration
-
-- `root`: Set the work dir for backend.
-- `bucket`: Set the container name for backend.
-- `endpoint`: Set the endpoint for backend.
-- `region`: Set the region for backend.
-- `access_key_id`: Set the access_key_id for backend.
-- `secret_access_key`: Set the secret_access_key for backend.
-- `security_token`: Set the security_token for backend.
-- `default_storage_class`: Set the default storage_class for backend.
-- `server_side_encryption`: Set the server_side_encryption for backend.
-- `server_side_encryption_aws_kms_key_id`: Set the 
server_side_encryption_aws_kms_key_id for backend.
-- `server_side_encryption_customer_algorithm`: Set the 
server_side_encryption_customer_algorithm for kend.
-- `server_side_encryption_customer_key`: Set the 
server_side_encryption_customer_key for backend.
-- `server_side_encryption_customer_key_md5`: Set the 
server_side_encryption_customer_key_md5 for kend.
-- `disable_config_load`: Disable aws config load from env
-- `enable_virtual_host_style`: Enable virtual host style.
-
-Refer to [`WasabiBuilder`]'s public API docs for more information.
-
-### Temporary security credentials
-
-OpenDAL now provides support for S3 temporary security credentials in IAM.
-
-The way to take advantage of this feature is to build your S3 backend with 
`Builder::security_token`.
-
-But OpenDAL will not refresh the temporary security credentials, please keep 
in mind to refresh those entials in time.
-
-### Server Side Encryption
-
-OpenDAL provides full support of S3 Server Side Encryption(SSE) features.
-
-The easiest way to configure them is to use helper functions like
-
-- SSE-KMS: `server_side_encryption_with_aws_managed_kms_key`
-- SSE-KMS: `server_side_encryption_with_customer_managed_kms_key`
-- SSE-S3: `server_side_encryption_with_s3_key`
-- SSE-C: `server_side_encryption_with_customer_key`
-
-If those functions don't fulfill need, low-level options are also provided:
-
-- Use service managed kms key
-  - `server_side_encryption="aws:kms"`
-- Use customer provided kms key
-  - `server_side_encryption="aws:kms"`
-  - `server_side_encryption_aws_kms_key_id="your-kms-key"`
-- Use S3 managed key
-  - `server_side_encryption="AES256"`
-- Use customer key
-  - `server_side_encryption_customer_algorithm="AES256"`
-  - `server_side_encryption_customer_key="base64-of-your-aes256-key"`
-  - `server_side_encryption_customer_key_md5="base64-of-your-aes256-key-md5"`
-
-After SSE have been configured, all requests send by this backed will attach 
those headers.
-
-Reference: [Protecting data using server-side 
encryption](https://docs.aws.amazon.com/AmazonS3/latest/rguide/serv-side-encryption.html)
-
-## Example
-
-### Via Builder
-
-#### Basic Setup
-
-```rust
-use anyhow::Result;
-use opendal::services::Wasabi;
-use opendal::Operator;
-
-#[tokio::main]
-async fn main() -> Result<()> {
-    // Create s3 backend builder.
-    let mut builder = Wasabi::default();
-    // Set the root for s3, all operations will happen under this root.
-    //
-    // NOTE: the root must be absolute path.
-    builder.root("/path/to/dir");
-    // Set the bucket name, this is required.
-    builder.bucket("test");
-    // Set the endpoint.
-    //
-    // For examples:
-    // - "https://s3.wasabisys.com";
-    // - "http://127.0.0.1:9000";
-    // - "https://oss-ap-northeast-1.aliyuncs.com";
-    // - "https://cos.ap-seoul.myqcloud.com";
-    //
-    // Default to "https://s3.wasabisys.com";
-    builder.endpoint("https://s3.wasabisys.com";);
-    // Set the access_key_id and secret_access_key.
-    //
-    // OpenDAL will try load credential from the env.
-    // If credential not set and no valid credential in env, OpenDAL will
-    // send request without signing like anonymous user.
-    builder.access_key_id("access_key_id");
-    builder.secret_access_key("secret_access_key");
-
-    let op: Operator = Operator::new(builder)?.finish();
-
-    Ok(())
-}
-```
-
-#### Wasabi with SSE-C
-
-```rust
-use anyhow::Result;
-use opendal::services::Wasabi;
-use opendal::Operator;
-
-#[tokio::main]
-async fn main() -> Result<()> {
-    let mut builder = Wasabi::default();
-
-    // Enable SSE-C
-    builder.server_side_encryption_with_customer_key("AES256", 
"customer_key".as_bytes());
-
-    let op = Operator::new(builder)?.finish();
-
-    Ok(())
-}
-```
-
-#### Wasabi with SSE-KMS and aws managed kms key
-
-```rust
-use anyhow::Result;
-use opendal::services::Wasabi;
-use opendal::Operator;
-
-#[tokio::main]
-async fn main() -> Result<()> {
-    let mut builder = Wasabi::default();
-
-    // Enable SSE-KMS with aws managed kms key
-    builder.server_side_encryption_with_aws_managed_kms_key();
-
-    let op = Operator::new(builder)?.finish();
-
-    Ok(())
-}
-```
-
-#### Wasabi with SSE-KMS and customer managed kms key
-
-```rust
-use anyhow::Result;
-use opendal::services::Wasabi;
-use opendal::Operator;
-
-#[tokio::main]
-async fn main() -> Result<()> {
-    let mut builder = Wasabi::default();
-
-    // Enable SSE-KMS with customer managed kms key
-    
builder.server_side_encryption_with_customer_managed_kms_key("aws_kms_key_id");
-
-    let op = Operator::new(builder)?.finish();
-
-    Ok(())
-}
-```
-
-#### Wasabi with SSE-S3
-
-```rust
-use anyhow::Result;
-use log::info;
-use opendal::services::Wasabi;
-use opendal::Operator;
-
-#[tokio::main]
-async fn main() -> Result<()> {
-    let mut builder = Wasabi::default();
-
-    // Enable SSE-S3
-    builder.server_side_encryption_with_s3_key();
-
-    let op = Operator::new(builder)?.finish();
-
-    Ok(())
-}
-```
\ No newline at end of file
diff --git a/core/src/services/wasabi/error.rs 
b/core/src/services/wasabi/error.rs
deleted file mode 100644
index 74bb4aa2e..000000000
--- a/core/src/services/wasabi/error.rs
+++ /dev/null
@@ -1,131 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use bytes::Buf;
-use http::Response;
-use http::StatusCode;
-use quick_xml::de;
-use serde::Deserialize;
-
-use crate::raw::*;
-use crate::Error;
-use crate::ErrorKind;
-use crate::Result;
-
-/// WasabiError is the error returned by wasabi service.
-#[derive(Default, Debug, Deserialize)]
-#[serde(default, rename_all = "PascalCase")]
-struct WasabiError {
-    code: String,
-    message: String,
-    resource: String,
-    request_id: String,
-}
-
-/// Parse error response into Error.
-pub async fn parse_error(resp: Response<IncomingAsyncBody>) -> Result<Error> {
-    let (parts, body) = resp.into_parts();
-    let bs = body.bytes().await?;
-
-    let (mut kind, mut retryable) = match parts.status {
-        StatusCode::NOT_FOUND => (ErrorKind::NotFound, false),
-        StatusCode::FORBIDDEN => (ErrorKind::PermissionDenied, false),
-        StatusCode::PRECONDITION_FAILED | StatusCode::NOT_MODIFIED => {
-            (ErrorKind::ConditionNotMatch, false)
-        }
-        StatusCode::INTERNAL_SERVER_ERROR
-        | StatusCode::BAD_GATEWAY
-        | StatusCode::SERVICE_UNAVAILABLE
-        | StatusCode::GATEWAY_TIMEOUT => (ErrorKind::Unexpected, true),
-        _ => (ErrorKind::Unexpected, false),
-    };
-
-    let (message, wasabi_err) = de::from_reader::<_, 
WasabiError>(bs.clone().reader())
-        .map(|err| (format!("{err:?}"), Some(err)))
-        .unwrap_or_else(|_| (String::from_utf8_lossy(&bs).into_owned(), None));
-
-    if let Some(wasabi_err) = wasabi_err {
-        (kind, retryable) = 
parse_wasabi_error_code(&wasabi_err.code).unwrap_or((kind, retryable));
-    }
-
-    let mut err = Error::new(kind, &message);
-
-    err = with_error_response_context(err, parts);
-
-    if retryable {
-        err = err.set_temporary();
-    }
-
-    Ok(err)
-}
-
-/// Returns the `Error kind` of this code and whether the error is retryable.
-/// All possible error code: 
<https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html#ErrorCodeList>
-pub fn parse_wasabi_error_code(code: &str) -> Option<(ErrorKind, bool)> {
-    match code {
-        // > Your socket connection to the server was not read from
-        // > or written to within the timeout period."
-        //
-        // It's Ok for us to retry it again.
-        "RequestTimeout" => Some((ErrorKind::Unexpected, true)),
-        // > An internal error occurred. Try again.
-        "InternalError" => Some((ErrorKind::Unexpected, true)),
-        // > A conflicting conditional operation is currently in progress
-        // > against this resource. Try again.
-        "OperationAborted" => Some((ErrorKind::Unexpected, true)),
-        // > Please reduce your request rate.
-        //
-        // It's Ok to retry since later on the request rate may get reduced.
-        "SlowDown" => Some((ErrorKind::RateLimited, true)),
-        // > Service is unable to handle request.
-        //
-        // ServiceUnavailable is considered a retryable error because it 
typically
-        // indicates a temporary issue with the service or server, such as 
high load,
-        // maintenance, or an internal problem.
-        "ServiceUnavailable" => Some((ErrorKind::Unexpected, true)),
-        _ => None,
-    }
-}
-
-#[cfg(test)]
-mod tests {
-    use super::*;
-
-    /// Error response example is from 
https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html
-    #[test]
-    fn test_parse_error() {
-        let bs = bytes::Bytes::from(
-            r#"
-<?xml version="1.0" encoding="UTF-8"?>
-<Error>
-  <Code>NoSuchKey</Code>
-  <Message>The resource you requested does not exist</Message>
-  <Resource>/mybucket/myfoto.jpg</Resource>
-  <RequestId>4442587FB7D0A2F9</RequestId>
-</Error>
-"#,
-        );
-
-        let out: WasabiError = de::from_reader(bs.reader()).expect("must 
success");
-        println!("{out:?}");
-
-        assert_eq!(out.code, "NoSuchKey");
-        assert_eq!(out.message, "The resource you requested does not exist");
-        assert_eq!(out.resource, "/mybucket/myfoto.jpg");
-        assert_eq!(out.request_id, "4442587FB7D0A2F9");
-    }
-}
diff --git a/core/src/services/wasabi/mod.rs b/core/src/services/wasabi/mod.rs
deleted file mode 100644
index 7356abc05..000000000
--- a/core/src/services/wasabi/mod.rs
+++ /dev/null
@@ -1,24 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-mod backend;
-pub use backend::WasabiBuilder as Wasabi;
-
-mod core;
-mod error;
-mod pager;
-mod writer;
diff --git a/core/src/services/wasabi/pager.rs 
b/core/src/services/wasabi/pager.rs
deleted file mode 100644
index 038d69a14..000000000
--- a/core/src/services/wasabi/pager.rs
+++ /dev/null
@@ -1,231 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use std::sync::Arc;
-
-use async_trait::async_trait;
-use bytes::Buf;
-use quick_xml::de;
-use serde::Deserialize;
-
-use super::core::WasabiCore;
-use super::error::parse_error;
-use crate::raw::*;
-use crate::EntryMode;
-use crate::Metadata;
-use crate::Result;
-
-pub struct WasabiPager {
-    core: Arc<WasabiCore>,
-
-    path: String,
-    delimiter: String,
-    limit: Option<usize>,
-
-    token: String,
-    done: bool,
-}
-
-impl WasabiPager {
-    pub fn new(core: Arc<WasabiCore>, path: &str, delimiter: &str, limit: 
Option<usize>) -> Self {
-        Self {
-            core,
-
-            path: path.to_string(),
-            delimiter: delimiter.to_string(),
-            limit,
-
-            token: "".to_string(),
-            done: false,
-        }
-    }
-}
-
-#[async_trait]
-impl oio::Page for WasabiPager {
-    async fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> {
-        if self.done {
-            return Ok(None);
-        }
-
-        let resp = self
-            .core
-            .list_objects(&self.path, &self.token, &self.delimiter, self.limit)
-            .await?;
-
-        if resp.status() != http::StatusCode::OK {
-            return Err(parse_error(resp).await?);
-        }
-
-        let bs = resp.into_body().bytes().await?;
-
-        let output: Output = 
de::from_reader(bs.reader()).map_err(new_xml_deserialize_error)?;
-
-        // Try our best to check whether this list is done.
-        //
-        // - Check `is_truncated`
-        // - Check `next_continuation_token`
-        // - Check the length of `common_prefixes` and `contents` (very rarely 
case)
-        self.done = if let Some(is_truncated) = output.is_truncated {
-            !is_truncated
-        } else if let Some(next_continuation_token) = 
output.next_continuation_token.as_ref() {
-            next_continuation_token.is_empty()
-        } else {
-            output.common_prefixes.is_empty() && output.contents.is_empty()
-        };
-        self.token = 
output.next_continuation_token.clone().unwrap_or_default();
-
-        let mut entries = Vec::with_capacity(output.common_prefixes.len() + 
output.contents.len());
-
-        for prefix in output.common_prefixes {
-            let de = oio::Entry::new(
-                &build_rel_path(&self.core.root, &prefix.prefix),
-                Metadata::new(EntryMode::DIR),
-            );
-
-            entries.push(de);
-        }
-
-        for object in output.contents {
-            // s3 could return the dir itself in contents
-            // which endswith `/`.
-            // We should ignore them.
-            if object.key.ends_with('/') {
-                continue;
-            }
-
-            let mut meta = Metadata::new(EntryMode::FILE);
-
-            meta.set_etag(&object.etag);
-            meta.set_content_md5(object.etag.trim_matches('"'));
-            meta.set_content_length(object.size);
-
-            // object.last_modified provides more precious time that contains
-            // nanosecond, let's trim them.
-            
meta.set_last_modified(parse_datetime_from_rfc3339(object.last_modified.as_str())?);
-
-            let de = oio::Entry::new(&build_rel_path(&self.core.root, 
&object.key), meta);
-
-            entries.push(de);
-        }
-
-        Ok(Some(entries))
-    }
-}
-
-/// Output of ListBucket/ListObjects.
-///
-/// ## Note
-///
-/// Use `Option` in `is_truncated` and `next_continuation_token` to make
-/// the behavior more clear so that we can be compatible to more s3 services.
-///
-/// And enable `serde(default)` so that we can keep going even when some field
-/// is not exist.
-#[derive(Default, Debug, Deserialize)]
-#[serde(default, rename_all = "PascalCase")]
-struct Output {
-    is_truncated: Option<bool>,
-    next_continuation_token: Option<String>,
-    common_prefixes: Vec<OutputCommonPrefix>,
-    contents: Vec<OutputContent>,
-}
-
-#[derive(Default, Debug, Eq, PartialEq, Deserialize)]
-#[serde(rename_all = "PascalCase")]
-struct OutputContent {
-    key: String,
-    size: u64,
-    last_modified: String,
-    #[serde(rename = "ETag")]
-    etag: String,
-}
-
-#[derive(Default, Debug, Eq, PartialEq, Deserialize)]
-#[serde(rename_all = "PascalCase")]
-struct OutputCommonPrefix {
-    prefix: String,
-}
-
-#[cfg(test)]
-mod tests {
-    use super::*;
-
-    #[test]
-    fn test_parse_list_output() {
-        let bs = bytes::Bytes::from(
-            r#"<ListBucketResult 
xmlns="http://s3.amazonaws.com/doc/2006-03-01/";>
-  <Name>example-bucket</Name>
-  <Prefix>photos/2006/</Prefix>
-  <KeyCount>3</KeyCount>
-  <MaxKeys>1000</MaxKeys>
-  <Delimiter>/</Delimiter>
-  <IsTruncated>false</IsTruncated>
-  <Contents>
-    <Key>photos/2006</Key>
-    <LastModified>2016-04-30T23:51:29.000Z</LastModified>
-    <ETag>"d41d8cd98f00b204e9800998ecf8427e"</ETag>
-    <Size>56</Size>
-    <StorageClass>STANDARD</StorageClass>
-  </Contents>
-  <Contents>
-    <Key>photos/2007</Key>
-    <LastModified>2016-04-30T23:51:29.000Z</LastModified>
-    <ETag>"d41d8cd98f00b204e9800998ecf8427e"</ETag>
-    <Size>100</Size>
-    <StorageClass>STANDARD</StorageClass>
-  </Contents>
-
-  <CommonPrefixes>
-    <Prefix>photos/2006/February/</Prefix>
-  </CommonPrefixes>
-  <CommonPrefixes>
-    <Prefix>photos/2006/January/</Prefix>
-  </CommonPrefixes>
-</ListBucketResult>"#,
-        );
-
-        let out: Output = de::from_reader(bs.reader()).expect("must success");
-
-        assert!(!out.is_truncated.unwrap());
-        assert!(out.next_continuation_token.is_none());
-        assert_eq!(
-            out.common_prefixes
-                .iter()
-                .map(|v| v.prefix.clone())
-                .collect::<Vec<String>>(),
-            vec!["photos/2006/February/", "photos/2006/January/"]
-        );
-        assert_eq!(
-            out.contents,
-            vec![
-                OutputContent {
-                    key: "photos/2006".to_string(),
-                    size: 56,
-                    etag: "\"d41d8cd98f00b204e9800998ecf8427e\"".to_string(),
-                    last_modified: "2016-04-30T23:51:29.000Z".to_string(),
-                },
-                OutputContent {
-                    key: "photos/2007".to_string(),
-                    size: 100,
-                    last_modified: "2016-04-30T23:51:29.000Z".to_string(),
-                    etag: "\"d41d8cd98f00b204e9800998ecf8427e\"".to_string(),
-                }
-            ]
-        )
-    }
-}
diff --git a/core/src/services/wasabi/writer.rs 
b/core/src/services/wasabi/writer.rs
deleted file mode 100644
index 50fc8ff4f..000000000
--- a/core/src/services/wasabi/writer.rs
+++ /dev/null
@@ -1,65 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use std::sync::Arc;
-
-use async_trait::async_trait;
-use http::StatusCode;
-
-use super::core::*;
-use super::error::parse_error;
-use crate::raw::oio::WriteBuf;
-use crate::raw::*;
-use crate::*;
-
-pub struct WasabiWriter {
-    core: Arc<WasabiCore>,
-
-    op: OpWrite,
-    path: String,
-}
-
-impl WasabiWriter {
-    pub fn new(core: Arc<WasabiCore>, op: OpWrite, path: String) -> Self {
-        WasabiWriter { core, op, path }
-    }
-}
-
-#[async_trait]
-impl oio::OneShotWrite for WasabiWriter {
-    async fn write_once(&self, bs: &dyn WriteBuf) -> Result<()> {
-        let bs = 
oio::ChunkedBytes::from_vec(bs.vectored_bytes(bs.remaining()));
-
-        let resp = self
-            .core
-            .put_object(
-                &self.path,
-                Some(bs.len()),
-                &self.op,
-                AsyncBody::ChunkedBytes(bs),
-            )
-            .await?;
-
-        match resp.status() {
-            StatusCode::CREATED | StatusCode::OK => {
-                resp.into_body().consume().await?;
-                Ok(())
-            }
-            _ => Err(parse_error(resp).await?),
-        }
-    }
-}
diff --git a/core/src/types/operator/builder.rs 
b/core/src/types/operator/builder.rs
index 211444064..9a70ab0be 100644
--- a/core/src/types/operator/builder.rs
+++ b/core/src/types/operator/builder.rs
@@ -233,8 +233,6 @@ impl Operator {
             Scheme::Tikv => Self::from_map::<services::Tikv>(map)?.finish(),
             #[cfg(feature = "services-vercel-artifacts")]
             Scheme::VercelArtifacts => 
Self::from_map::<services::VercelArtifacts>(map)?.finish(),
-            #[cfg(feature = "services-wasabi")]
-            Scheme::Wasabi => 
Self::from_map::<services::Wasabi>(map)?.finish(),
             #[cfg(feature = "services-webdav")]
             Scheme::Webdav => 
Self::from_map::<services::Webdav>(map)?.finish(),
             #[cfg(feature = "services-webhdfs")]
diff --git a/core/src/types/scheme.rs b/core/src/types/scheme.rs
index 32b84c04c..63f331dc5 100644
--- a/core/src/types/scheme.rs
+++ b/core/src/types/scheme.rs
@@ -113,8 +113,6 @@ pub enum Scheme {
     Supabase,
     /// [Vercel Artifacts][crate::services::VercelArtifacts]: Vercel Artifacts 
service, as known as Vercel Remote Caching.
     VercelArtifacts,
-    /// [wasabi][crate::services::Wasabi]: Wasabi service
-    Wasabi,
     /// [webdav][crate::services::Webdav]: WebDAV support.
     Webdav,
     /// [webhdfs][crate::services::Webhdfs]: WebHDFS RESTful API Services
@@ -237,8 +235,6 @@ impl Scheme {
             Scheme::Tikv,
             #[cfg(feature = "services-vercel-artifacts")]
             Scheme::VercelArtifacts,
-            #[cfg(feature = "services-wasabi")]
-            Scheme::Wasabi,
             #[cfg(feature = "services-webdav")]
             Scheme::Webdav,
             #[cfg(feature = "services-webhdfs")]
@@ -313,7 +309,6 @@ impl FromStr for Scheme {
             "supabase" => Ok(Scheme::Supabase),
             "oss" => Ok(Scheme::Oss),
             "vercel_artifacts" => Ok(Scheme::VercelArtifacts),
-            "wasabi" => Ok(Scheme::Wasabi),
             "webdav" => Ok(Scheme::Webdav),
             "webhdfs" => Ok(Scheme::Webhdfs),
             "tikv" => Ok(Scheme::Tikv),
@@ -366,7 +361,6 @@ impl From<Scheme> for &'static str {
             Scheme::Supabase => "supabase",
             Scheme::VercelArtifacts => "vercel_artifacts",
             Scheme::Oss => "oss",
-            Scheme::Wasabi => "wasabi",
             Scheme::Webdav => "webdav",
             Scheme::Webhdfs => "webhdfs",
             Scheme::Redb => "redb",


Reply via email to