This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
The following commit(s) were added to refs/heads/main by this push:
new 7b0e3af2 feat: Add wasabi service implementation (#2004)
7b0e3af2 is described below
commit 7b0e3af28a4b9e3dbc7815dbf172f636074dcf6d
Author: Eric <[email protected]>
AuthorDate: Mon Apr 17 12:59:04 2023 +0800
feat: Add wasabi service implementation (#2004)
* add wasabi service implementation
* sync change from s3, format files
* re-implement wasabi service based on review comments
* fix ci build
---
core/Cargo.toml | 5 +
core/src/docs/concepts.rs | 2 +-
core/src/raw/accessor.rs | 2 +-
core/src/services/mod.rs | 5 +
core/src/services/s3/core.rs | 17 +-
core/src/services/wasabi/backend.rs | 1164 ++++++++++++++++++++++++++++++
core/src/services/{s3 => wasabi}/core.rs | 152 ++--
core/src/services/wasabi/error.rs | 107 +++
core/src/services/wasabi/mod.rs | 24 +
core/src/services/wasabi/pager.rs | 231 ++++++
core/src/services/wasabi/writer.rs | 107 +++
core/src/types/scheme.rs | 4 +
12 files changed, 1756 insertions(+), 64 deletions(-)
diff --git a/core/Cargo.toml b/core/Cargo.toml
index 3484ec82..f8b3ec78 100644
--- a/core/Cargo.toml
+++ b/core/Cargo.toml
@@ -130,6 +130,11 @@ services-s3 = [
"reqsign?/reqwest_request",
]
services-sled = ["dep:sled"]
+services-wasabi = [
+ "dep:reqsign",
+ "reqsign?/services-aws",
+ "reqsign?/reqwest_request",
+]
services-webdav = []
services-webhdfs = []
diff --git a/core/src/docs/concepts.rs b/core/src/docs/concepts.rs
index 5e87a988..aa01e6e9 100644
--- a/core/src/docs/concepts.rs
+++ b/core/src/docs/concepts.rs
@@ -19,7 +19,7 @@
//!
//! OpenDAL provides a unified abstraction for all storage services.
//!
-//! There are three core concepts in OpenDAL:
+//! There are two core concepts in OpenDAL:
//!
//! - [`Builder`]: Build an instance of underlying services.
//! - [`Operator`]: A bridge between underlying implementation detail and
unified abstraction.
diff --git a/core/src/raw/accessor.rs b/core/src/raw/accessor.rs
index 4bf41c39..84d4b7aa 100644
--- a/core/src/raw/accessor.rs
+++ b/core/src/raw/accessor.rs
@@ -53,7 +53,7 @@ pub trait Accessor: Send + Sync + Debug + Unpin + 'static {
/// BlockingReader is the associated reader that could return in
/// `blocking_read` operation.
type BlockingReader: oio::BlockingRead;
- /// Reader is the associated writer the could return in `write` operation.
+ /// Writer is the associated writer the could return in `write` operation.
type Writer: oio::Write;
/// BlockingWriter is the associated writer the could return in
/// `blocking_write` operation.
diff --git a/core/src/services/mod.rs b/core/src/services/mod.rs
index 8dff87f3..4c1b7397 100644
--- a/core/src/services/mod.rs
+++ b/core/src/services/mod.rs
@@ -119,6 +119,11 @@ mod sled;
#[cfg(feature = "services-sled")]
pub use self::sled::Sled;
+#[cfg(feature = "services-wasabi")]
+mod wasabi;
+#[cfg(feature = "services-wasabi")]
+pub use wasabi::Wasabi;
+
#[cfg(feature = "services-webdav")]
mod webdav;
#[cfg(feature = "services-webdav")]
diff --git a/core/src/services/s3/core.rs b/core/src/services/s3/core.rs
index 4f099188..e236f2e5 100644
--- a/core/src/services/s3/core.rs
+++ b/core/src/services/s3/core.rs
@@ -342,22 +342,7 @@ impl S3Core {
path: &str,
if_none_match: Option<&str>,
) -> Result<Response<IncomingAsyncBody>> {
- let p = build_abs_path(&self.root, path);
-
- let url = format!("{}/{}", self.endpoint, percent_encode_path(&p));
-
- let mut req = Request::head(&url);
-
- // Set SSE headers.
- req = self.insert_sse_headers(req, false);
-
- if let Some(if_none_match) = if_none_match {
- req = req.header(IF_NONE_MATCH, if_none_match);
- }
-
- let mut req = req
- .body(AsyncBody::Empty)
- .map_err(new_request_build_error)?;
+ let mut req = self.s3_head_object_request(path, if_none_match)?;
self.sign(&mut req).await?;
diff --git a/core/src/services/wasabi/backend.rs
b/core/src/services/wasabi/backend.rs
new file mode 100644
index 00000000..de4735b8
--- /dev/null
+++ b/core/src/services/wasabi/backend.rs
@@ -0,0 +1,1164 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+use std::fmt::Debug;
+use std::fmt::Formatter;
+use std::fmt::Write;
+use std::sync::Arc;
+
+use async_trait::async_trait;
+use base64::prelude::BASE64_STANDARD;
+use base64::Engine;
+use bytes::Buf;
+use http::StatusCode;
+use log::debug;
+use md5::Digest;
+use md5::Md5;
+use once_cell::sync::Lazy;
+use reqsign::AwsConfig;
+use reqsign::AwsCredentialLoad;
+use reqsign::AwsLoader;
+use reqsign::AwsV4Signer;
+
+use super::core::*;
+use super::error::parse_error;
+use super::pager::WasabiPager;
+use super::writer::WasabiWriter;
+use crate::ops::*;
+use crate::raw::*;
+use crate::*;
+
+/// Allow constructing correct region endpoint if user gives a global endpoint.
+static ENDPOINT_TEMPLATES: Lazy<HashMap<&'static str, &'static str>> =
Lazy::new(|| {
+ let mut m = HashMap::new();
+ // AWS S3 Service.
+ m.insert(
+ "https://s3.wasabisys.com",
+ "https://s3.{region}.wasabisys.com",
+ );
+ m
+});
+
+/// Wasabi (an aws S3 compatible service) support
+///
+/// # Capabilities
+///
+/// This service can be used to:
+///
+/// - [x] read
+/// - [x] write
+/// - [x] copy
+/// - [x] list
+/// - [x] scan
+/// - [x] presign
+/// - [x] rename
+/// - [ ] blocking
+///
+/// # Configuration
+///
+/// - `root`: Set the work dir for backend.
+/// - `bucket`: Set the container name for backend.
+/// - `endpoint`: Set the endpoint for backend.
+/// - `region`: Set the region for backend.
+/// - `access_key_id`: Set the access_key_id for backend.
+/// - `secret_access_key`: Set the secret_access_key for backend.
+/// - `security_token`: Set the security_token for backend.
+/// - `default_storage_class`: Set the default storage_class for backend.
+/// - `server_side_encryption`: Set the server_side_encryption for backend.
+/// - `server_side_encryption_aws_kms_key_id`: Set the
server_side_encryption_aws_kms_key_id for backend.
+/// - `server_side_encryption_customer_algorithm`: Set the
server_side_encryption_customer_algorithm for backend.
+/// - `server_side_encryption_customer_key`: Set the
server_side_encryption_customer_key for backend.
+/// - `server_side_encryption_customer_key_md5`: Set the
server_side_encryption_customer_key_md5 for backend.
+/// - `disable_config_load`: Disable aws config load from env
+/// - `enable_virtual_host_style`: Enable virtual host style.
+///
+/// Refer to [`WasabiBuilder`]'s public API docs for more information.
+///
+/// # Temporary security credentials
+///
+/// OpenDAL now provides support for S3 temporary security credentials in IAM.
+///
+/// The way to take advantage of this feature is to build your S3 backend with
`Builder::security_token`.
+///
+/// But OpenDAL will not refresh the temporary security credentials, please
keep in mind to refresh those credentials in time.
+///
+/// # Server Side Encryption
+///
+/// OpenDAL provides full support of S3 Server Side Encryption(SSE) features.
+///
+/// The easiest way to configure them is to use helper functions like
+///
+/// - SSE-KMS: `server_side_encryption_with_aws_managed_kms_key`
+/// - SSE-KMS: `server_side_encryption_with_customer_managed_kms_key`
+/// - SSE-S3: `server_side_encryption_with_s3_key`
+/// - SSE-C: `server_side_encryption_with_customer_key`
+///
+/// If those functions don't fulfill need, low-level options are also provided:
+///
+/// - Use service managed kms key
+/// - `server_side_encryption="aws:kms"`
+/// - Use customer provided kms key
+/// - `server_side_encryption="aws:kms"`
+/// - `server_side_encryption_aws_kms_key_id="your-kms-key"`
+/// - Use S3 managed key
+/// - `server_side_encryption="AES256"`
+/// - Use customer key
+/// - `server_side_encryption_customer_algorithm="AES256"`
+/// - `server_side_encryption_customer_key="base64-of-your-aes256-key"`
+/// -
`server_side_encryption_customer_key_md5="base64-of-your-aes256-key-md5"`
+///
+/// After SSE have been configured, all requests send by this backed will
attach those headers.
+///
+/// Reference: [Protecting data using server-side
encryption](https://docs.aws.amazon.com/AmazonS3/latest/userguide/serv-side-encryption.html)
+///
+/// # Example
+///
+/// ## Basic Setup
+///
+/// ```no_run
+/// use std::sync::Arc;
+///
+/// use anyhow::Result;
+/// use opendal::services::Wasabi;
+/// use opendal::Operator;
+///
+/// #[tokio::main]
+/// async fn main() -> Result<()> {
+/// // Create s3 backend builder.
+/// let mut builder = Wasabi::default();
+/// // Set the root for s3, all operations will happen under this root.
+/// //
+/// // NOTE: the root must be absolute path.
+/// builder.root("/path/to/dir");
+/// // Set the bucket name, this is required.
+/// builder.bucket("test");
+/// // Set the endpoint.
+/// //
+/// // For examples:
+/// // - "https://s3.wasabisys.com"
+/// // - "http://127.0.0.1:9000"
+/// // - "https://oss-ap-northeast-1.aliyuncs.com"
+/// // - "https://cos.ap-seoul.myqcloud.com"
+/// //
+/// // Default to "https://s3.wasabisys.com"
+/// builder.endpoint("https://s3.wasabisys.com");
+/// // Set the access_key_id and secret_access_key.
+/// //
+/// // OpenDAL will try load credential from the env.
+/// // If credential not set and no valid credential in env, OpenDAL will
+/// // send request without signing like anonymous user.
+/// builder.access_key_id("access_key_id");
+/// builder.secret_access_key("secret_access_key");
+///
+/// let op: Operator = Operator::new(builder)?.finish();
+///
+/// Ok(())
+/// }
+/// ```
+///
+/// ## Wasabi with SSE-C
+///
+/// ```no_run
+/// use anyhow::Result;
+/// use log::info;
+/// use opendal::services::Wasabi;
+/// use opendal::Operator;
+///
+/// #[tokio::main]
+/// async fn main() -> Result<()> {
+/// let mut builder = Wasabi::default();
+///
+/// // Setup builders
+///
+/// // Enable SSE-C
+/// builder.server_side_encryption_with_customer_key("AES256",
"customer_key".as_bytes());
+///
+/// let op = Operator::new(builder)?.finish();
+/// info!("operator: {:?}", op);
+///
+/// // Writing your testing code here.
+///
+/// Ok(())
+/// }
+/// ```
+///
+/// ## Wasabi with SSE-KMS and aws managed kms key
+///
+/// ```no_run
+/// use anyhow::Result;
+/// use log::info;
+/// use opendal::services::Wasabi;
+/// use opendal::Operator;
+///
+/// #[tokio::main]
+/// async fn main() -> Result<()> {
+/// let mut builder = Wasabi::default();
+///
+/// // Setup builders
+///
+/// // Enable SSE-KMS with aws managed kms key
+/// builder.server_side_encryption_with_aws_managed_kms_key();
+///
+/// let op = Operator::new(builder)?.finish();
+/// info!("operator: {:?}", op);
+///
+/// // Writing your testing code here.
+///
+/// Ok(())
+/// }
+/// ```
+///
+/// ## Wasabi with SSE-KMS and customer managed kms key
+///
+/// ```no_run
+/// use anyhow::Result;
+/// use log::info;
+/// use opendal::services::Wasabi;
+/// use opendal::Operator;
+///
+/// #[tokio::main]
+/// async fn main() -> Result<()> {
+/// let mut builder = Wasabi::default();
+///
+/// // Setup builders
+///
+/// // Enable SSE-KMS with customer managed kms key
+///
builder.server_side_encryption_with_customer_managed_kms_key("aws_kms_key_id");
+///
+/// let op = Operator::new(builder)?.finish();
+/// info!("operator: {:?}", op);
+///
+/// // Writing your testing code here.
+///
+/// Ok(())
+/// }
+/// ```
+///
+/// ## Wasabi with SSE-S3
+///
+/// ```no_run
+/// use anyhow::Result;
+/// use log::info;
+/// use opendal::services::Wasabi;
+/// use opendal::Operator;
+///
+/// #[tokio::main]
+/// async fn main() -> Result<()> {
+/// let mut builder = Wasabi::default();
+///
+/// // Setup builders
+///
+/// // Enable SSE-S3
+/// builder.server_side_encryption_with_s3_key();
+///
+/// let op = Operator::new(builder)?.finish();
+/// info!("operator: {:?}", op);
+///
+/// // Writing your testing code here.
+///
+/// Ok(())
+/// }
+/// ```
+#[derive(Default)]
+pub struct WasabiBuilder {
+ root: Option<String>,
+
+ bucket: String,
+ endpoint: Option<String>,
+ region: Option<String>,
+ role_arn: Option<String>,
+ external_id: Option<String>,
+ access_key_id: Option<String>,
+ secret_access_key: Option<String>,
+ server_side_encryption: Option<String>,
+ server_side_encryption_aws_kms_key_id: Option<String>,
+ server_side_encryption_customer_algorithm: Option<String>,
+ server_side_encryption_customer_key: Option<String>,
+ server_side_encryption_customer_key_md5: Option<String>,
+ default_storage_class: Option<String>,
+
+ /// temporary credentials, check the official
[doc](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_temp.html)
for detail
+ security_token: Option<String>,
+
+ disable_config_load: bool,
+ disable_ec2_metadata: bool,
+ enable_virtual_host_style: bool,
+
+ http_client: Option<HttpClient>,
+ customed_credential_load: Option<Box<dyn AwsCredentialLoad>>,
+}
+
+impl Debug for WasabiBuilder {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ let mut d = f.debug_struct("Builder");
+
+ d.field("root", &self.root)
+ .field("bucket", &self.bucket)
+ .field("endpoint", &self.endpoint)
+ .field("region", &self.region);
+
+ d.finish_non_exhaustive()
+ }
+}
+
+impl WasabiBuilder {
+ /// Set root of this backend.
+ ///
+ /// All operations will happen under this root.
+ pub fn root(&mut self, root: &str) -> &mut Self {
+ self.root = if root.is_empty() {
+ None
+ } else {
+ Some(root.to_string())
+ };
+
+ self
+ }
+
+ /// Set bucket name of this backend.
+ pub fn bucket(&mut self, bucket: &str) -> &mut Self {
+ self.bucket = bucket.to_string();
+
+ self
+ }
+
+ /// Set endpoint of this backend.
+ ///
+ /// Endpoint must be full uri, e.g.
+ ///
+ /// - `https://s3.wasabisys.com` or `https://s3.{region}.wasabisys.com`
+ ///
+ /// If user inputs endpoint without scheme like "s3.wasabisys.com", we
+ /// will prepend "https://" before it.
+ pub fn endpoint(&mut self, endpoint: &str) -> &mut Self {
+ if !endpoint.is_empty() {
+ // Trim trailing `/` so that we can accept `http://127.0.0.1:9000/`
+ self.endpoint = Some(endpoint.trim_end_matches('/').to_string())
+ }
+
+ self
+ }
+
+ /// Region represent the signing region of this endpoint.
+ ///
+ /// - If region is set, we will take user's input first.
+ /// - If not, the default `us-east-1` will be used.
+ pub fn region(&mut self, region: &str) -> &mut Self {
+ if !region.is_empty() {
+ self.region = Some(region.to_string())
+ }
+
+ self
+ }
+
+ /// Set access_key_id of this backend.
+ ///
+ /// - If access_key_id is set, we will take user's input first.
+ /// - If not, we will try to load it from environment.
+ pub fn access_key_id(&mut self, v: &str) -> &mut Self {
+ if !v.is_empty() {
+ self.access_key_id = Some(v.to_string())
+ }
+
+ self
+ }
+
+ /// Set secret_access_key of this backend.
+ ///
+ /// - If secret_access_key is set, we will take user's input first.
+ /// - If not, we will try to load it from environment.
+ pub fn secret_access_key(&mut self, v: &str) -> &mut Self {
+ if !v.is_empty() {
+ self.secret_access_key = Some(v.to_string())
+ }
+
+ self
+ }
+
+ /// Set role_arn for this backend.
+ pub fn role_arn(&mut self, v: &str) -> &mut Self {
+ if !v.is_empty() {
+ self.role_arn = Some(v.to_string())
+ }
+
+ self
+ }
+
+ /// Set external_id for this backend.
+ pub fn external_id(&mut self, v: &str) -> &mut Self {
+ if !v.is_empty() {
+ self.external_id = Some(v.to_string())
+ }
+
+ self
+ }
+
+ /// Set default storage_class for this backend.
+ /// Unlike S3, wasabi only supports one single storage class,
+ /// which is most like standard S3 storage class,
+ /// check `https://docs.wasabi.com/docs/operations-on-objects` for more
details.
+ ///
+ /// Available values:
+ /// - `STANDARD`
+ pub fn default_storage_class(&mut self, v: &str) -> &mut Self {
+ if !v.is_empty() {
+ self.default_storage_class = Some(v.to_string())
+ }
+
+ self
+ }
+
+ /// Set server_side_encryption for this backend.
+ ///
+ /// Available values: `AES256`, `aws:kms`.
+ ///
+ /// # Note
+ ///
+ /// This function is the low-level setting for SSE related features.
+ ///
+ /// SSE related options should be set carefully to make them works.
+ /// Please use `server_side_encryption_with_*` helpers if even possible.
+ pub fn server_side_encryption(&mut self, v: &str) -> &mut Self {
+ if !v.is_empty() {
+ self.server_side_encryption = Some(v.to_string())
+ }
+
+ self
+ }
+
+ /// Set server_side_encryption_aws_kms_key_id for this backend
+ ///
+ /// - If `server_side_encryption` set to `aws:kms`, and
`server_side_encryption_aws_kms_key_id`
+ /// is not set, S3 will use aws managed kms key to encrypt data.
+ /// - If `server_side_encryption` set to `aws:kms`, and
`server_side_encryption_aws_kms_key_id`
+ /// is a valid kms key id, S3 will use the provided kms key to encrypt
data.
+ /// - If the `server_side_encryption_aws_kms_key_id` is invalid or not
found, an error will be
+ /// returned.
+ /// - If `server_side_encryption` is not `aws:kms`, setting
`server_side_encryption_aws_kms_key_id`
+ /// is a noop.
+ ///
+ /// # Note
+ ///
+ /// This function is the low-level setting for SSE related features.
+ ///
+ /// SSE related options should be set carefully to make them works.
+ /// Please use `server_side_encryption_with_*` helpers if even possible.
+ pub fn server_side_encryption_aws_kms_key_id(&mut self, v: &str) -> &mut
Self {
+ if !v.is_empty() {
+ self.server_side_encryption_aws_kms_key_id = Some(v.to_string())
+ }
+
+ self
+ }
+
+ /// Set server_side_encryption_customer_algorithm for this backend.
+ ///
+ /// Available values: `AES256`.
+ ///
+ /// # Note
+ ///
+ /// This function is the low-level setting for SSE related features.
+ ///
+ /// SSE related options should be set carefully to make them works.
+ /// Please use `server_side_encryption_with_*` helpers if even possible.
+ pub fn server_side_encryption_customer_algorithm(&mut self, v: &str) ->
&mut Self {
+ if !v.is_empty() {
+ self.server_side_encryption_customer_algorithm =
Some(v.to_string())
+ }
+
+ self
+ }
+
+ /// Set server_side_encryption_customer_key for this backend.
+ ///
+ /// # Args
+ ///
+ /// `v`: base64 encoded key that matches algorithm specified in
+ /// `server_side_encryption_customer_algorithm`.
+ ///
+ /// # Note
+ ///
+ /// This function is the low-level setting for SSE related features.
+ ///
+ /// SSE related options should be set carefully to make them works.
+ /// Please use `server_side_encryption_with_*` helpers if even possible.
+ pub fn server_side_encryption_customer_key(&mut self, v: &str) -> &mut
Self {
+ if !v.is_empty() {
+ self.server_side_encryption_customer_key = Some(v.to_string())
+ }
+
+ self
+ }
+
+ /// Set server_side_encryption_customer_key_md5 for this backend.
+ ///
+ /// # Args
+ ///
+ /// `v`: MD5 digest of key specified in
`server_side_encryption_customer_key`.
+ ///
+ /// # Note
+ ///
+ /// This function is the low-level setting for SSE related features.
+ ///
+ /// SSE related options should be set carefully to make them works.
+ /// Please use `server_side_encryption_with_*` helpers if even possible.
+ pub fn server_side_encryption_customer_key_md5(&mut self, v: &str) -> &mut
Self {
+ if !v.is_empty() {
+ self.server_side_encryption_customer_key_md5 = Some(v.to_string())
+ }
+
+ self
+ }
+
+ /// Enable server side encryption with aws managed kms key
+ ///
+ /// As known as: SSE-KMS
+ ///
+ /// NOTE: This function should not be used along with other
`server_side_encryption_with_` functions.
+ pub fn server_side_encryption_with_aws_managed_kms_key(&mut self) -> &mut
Self {
+ self.server_side_encryption = Some("aws:kms".to_string());
+ self
+ }
+
+ /// Enable server side encryption with customer managed kms key
+ ///
+ /// As known as: SSE-KMS
+ ///
+ /// NOTE: This function should not be used along with other
`server_side_encryption_with_` functions.
+ pub fn server_side_encryption_with_customer_managed_kms_key(
+ &mut self,
+ aws_kms_key_id: &str,
+ ) -> &mut Self {
+ self.server_side_encryption = Some("aws:kms".to_string());
+ self.server_side_encryption_aws_kms_key_id =
Some(aws_kms_key_id.to_string());
+ self
+ }
+
+ /// Enable server side encryption with s3 managed key
+ ///
+ /// As known as: SSE-S3
+ ///
+ /// NOTE: This function should not be used along with other
`server_side_encryption_with_` functions.
+ pub fn server_side_encryption_with_s3_key(&mut self) -> &mut Self {
+ self.server_side_encryption = Some("AES256".to_string());
+ self
+ }
+
+ /// Enable server side encryption with customer key.
+ ///
+ /// As known as: SSE-C
+ ///
+ /// NOTE: This function should not be used along with other
`server_side_encryption_with_` functions.
+ pub fn server_side_encryption_with_customer_key(
+ &mut self,
+ algorithm: &str,
+ key: &[u8],
+ ) -> &mut Self {
+ self.server_side_encryption_customer_algorithm =
Some(algorithm.to_string());
+ self.server_side_encryption_customer_key =
Some(BASE64_STANDARD.encode(key));
+ self.server_side_encryption_customer_key_md5 =
+ Some(BASE64_STANDARD.encode(Md5::digest(key).as_slice()));
+ self
+ }
+
+ /// Set temporary credential used in service connections
+ ///
+ /// # Warning
+ ///
+ /// security token's lifetime is short and requires users to refresh in
time.
+ pub fn security_token(&mut self, token: &str) -> &mut Self {
+ if !token.is_empty() {
+ self.security_token = Some(token.to_string());
+ }
+ self
+ }
+
+ /// Disable config load so that opendal will not load config from
+ /// environment.
+ ///
+ /// For examples:
+ ///
+ /// - envs like `AWS_ACCESS_KEY_ID`
+ /// - files like `~/.aws/config`
+ pub fn disable_config_load(&mut self) -> &mut Self {
+ self.disable_config_load = true;
+ self
+ }
+
+ /// Disable load credential from ec2 metadata.
+ ///
+ /// This option is used to disable the default behavior of opendal
+ /// to load credential from ec2 metadata, a.k.a, IMDSv2
+ pub fn disable_ec2_metadata(&mut self) -> &mut Self {
+ self.disable_ec2_metadata = true;
+ self
+ }
+
+ /// Enable virtual host style so that opendal will send API requests
+ /// in virtual host style instead of path style.
+ ///
+ /// - By default, opendal will send API to
`https://s3.us-east-1.wasabisys.com/bucket_name`
+ /// - Enabled, opendal will send API to
`https://bucket_name.s3.us-east-1.wasabisys.com`
+ pub fn enable_virtual_host_style(&mut self) -> &mut Self {
+ self.enable_virtual_host_style = true;
+ self
+ }
+
+ /// Adding a customed credential load for service.
+ pub fn customed_credential_load(&mut self, cred: Box<dyn
AwsCredentialLoad>) -> &mut Self {
+ self.customed_credential_load = Some(cred);
+ self
+ }
+
+ /// Specify the http client that used by this service.
+ ///
+ /// # Notes
+ ///
+ /// This API is part of OpenDAL's Raw API. `HttpClient` could be changed
+ /// during minor updates.
+ pub fn http_client(&mut self, client: HttpClient) -> &mut Self {
+ self.http_client = Some(client);
+ self
+ }
+
+ /// Check if `bucket` is valid
+ /// `bucket` must be not empty and if `enable_virtual_host_style` is true
+ /// it couldn't contain dot(.) character
+ fn is_bucket_valid(&self) -> bool {
+ if self.bucket.is_empty() {
+ return false;
+ }
+ // If enable virtual host style, `bucket` will reside in domain part,
+ // for example `https://bucket_name.s3.us-east-1.amazonaws.com`,
+ // so `bucket` with dot can't be recognized correctly for this format.
+ if self.enable_virtual_host_style && self.bucket.contains('.') {
+ return false;
+ }
+ true
+ }
+
+ /// Build endpoint with given region.
+ fn build_endpoint(&self, region: &str) -> String {
+ let bucket = {
+ debug_assert!(self.is_bucket_valid(), "bucket must be valid");
+
+ self.bucket.as_str()
+ };
+
+ let mut endpoint = match &self.endpoint {
+ Some(endpoint) => {
+ if endpoint.starts_with("http") {
+ endpoint.to_string()
+ } else {
+ // Prefix https if endpoint doesn't start with scheme.
+ format!("https://{endpoint}")
+ }
+ }
+ None => "https://s3.wasabisys.com".to_string(),
+ };
+
+ // If endpoint contains bucket name, we should trim them.
+ endpoint = endpoint.replace(&format!("//{bucket}."), "//");
+
+ // Update with endpoint templates.
+ endpoint = if let Some(template) =
ENDPOINT_TEMPLATES.get(endpoint.as_str()) {
+ template.replace("{region}", region)
+ } else {
+ // If we don't know where about this endpoint, just leave
+ // them as it.
+ endpoint.to_string()
+ };
+
+ // Apply virtual host style.
+ if self.enable_virtual_host_style {
+ endpoint = endpoint.replace("//", &format!("//{bucket}."))
+ } else {
+ write!(endpoint, "/{bucket}").expect("write into string must
succeed");
+ };
+
+ endpoint
+ }
+}
+
+impl Builder for WasabiBuilder {
+ const SCHEME: Scheme = Scheme::Wasabi;
+ type Accessor = WasabiBackend;
+
+ fn from_map(map: HashMap<String, String>) -> Self {
+ let mut builder = WasabiBuilder::default();
+
+ map.get("root").map(|v| builder.root(v));
+ map.get("bucket").map(|v| builder.bucket(v));
+ map.get("endpoint").map(|v| builder.endpoint(v));
+ map.get("region").map(|v| builder.region(v));
+ map.get("access_key_id").map(|v| builder.access_key_id(v));
+ map.get("secret_access_key")
+ .map(|v| builder.secret_access_key(v));
+ map.get("security_token").map(|v| builder.security_token(v));
+ map.get("role_arn").map(|v| builder.role_arn(v));
+ map.get("external_id").map(|v| builder.external_id(v));
+ map.get("server_side_encryption")
+ .map(|v| builder.server_side_encryption(v));
+ map.get("server_side_encryption_aws_kms_key_id")
+ .map(|v| builder.server_side_encryption_aws_kms_key_id(v));
+ map.get("server_side_encryption_customer_algorithm")
+ .map(|v| builder.server_side_encryption_customer_algorithm(v));
+ map.get("server_side_encryption_customer_key")
+ .map(|v| builder.server_side_encryption_customer_key(v));
+ map.get("server_side_encryption_customer_key_md5")
+ .map(|v| builder.server_side_encryption_customer_key_md5(v));
+ map.get("disable_config_load")
+ .filter(|v| *v == "on" || *v == "true")
+ .map(|_| builder.disable_config_load());
+ map.get("disable_ec2_metadata")
+ .filter(|v| *v == "on" || *v == "true")
+ .map(|_| builder.disable_ec2_metadata());
+ map.get("enable_virtual_host_style")
+ .filter(|v| *v == "on" || *v == "true")
+ .map(|_| builder.enable_virtual_host_style());
+ map.get("default_storage_class")
+ .map(|v| builder.default_storage_class(v));
+
+ builder
+ }
+
+ fn build(&mut self) -> Result<Self::Accessor> {
+ debug!("backend build started: {:?}", &self);
+
+ let root = normalize_root(&self.root.take().unwrap_or_default());
+ debug!("backend use root {}", &root);
+
+ // Handle bucket name.
+ let bucket = if self.is_bucket_valid() {
+ Ok(&self.bucket)
+ } else {
+ Err(
+ Error::new(ErrorKind::ConfigInvalid, "The bucket is
misconfigured")
+ .with_context("service", Scheme::S3),
+ )
+ }?;
+ debug!("backend use bucket {}", &bucket);
+
+ let default_storage_class = match &self.default_storage_class {
+ None => None,
+ Some(v) => Some(
+ build_header_value(v).map_err(|err| err.with_context("key",
"storage_class"))?,
+ ),
+ };
+
+ let server_side_encryption = match &self.server_side_encryption {
+ None => None,
+ Some(v) => Some(
+ build_header_value(v)
+ .map_err(|err| err.with_context("key",
"server_side_encryption"))?,
+ ),
+ };
+
+ let server_side_encryption_aws_kms_key_id =
+ match &self.server_side_encryption_aws_kms_key_id {
+ None => None,
+ Some(v) => Some(build_header_value(v).map_err(|err| {
+ err.with_context("key",
"server_side_encryption_aws_kms_key_id")
+ })?),
+ };
+
+ let server_side_encryption_customer_algorithm =
+ match &self.server_side_encryption_customer_algorithm {
+ None => None,
+ Some(v) => Some(build_header_value(v).map_err(|err| {
+ err.with_context("key",
"server_side_encryption_customer_algorithm")
+ })?),
+ };
+
+ let server_side_encryption_customer_key =
+ match &self.server_side_encryption_customer_key {
+ None => None,
+ Some(v) => Some(build_header_value(v).map_err(|err| {
+ err.with_context("key",
"server_side_encryption_customer_key")
+ })?),
+ };
+
+ let server_side_encryption_customer_key_md5 =
+ match &self.server_side_encryption_customer_key_md5 {
+ None => None,
+ Some(v) => Some(build_header_value(v).map_err(|err| {
+ err.with_context("key",
"server_side_encryption_customer_key_md5")
+ })?),
+ };
+
+ let client = if let Some(client) = self.http_client.take() {
+ client
+ } else {
+ HttpClient::new().map_err(|err| {
+ err.with_operation("Builder::build")
+ .with_context("service", Scheme::S3)
+ })?
+ };
+
+ let mut cfg = AwsConfig::default();
+ if !self.disable_config_load {
+ cfg = cfg.from_profile();
+ cfg = cfg.from_env();
+ }
+
+ // Setting all value from user input if available.
+ if let Some(v) = self.region.take() {
+ cfg.region = Some(v);
+ }
+ if let Some(v) = self.access_key_id.take() {
+ cfg.access_key_id = Some(v)
+ }
+ if let Some(v) = self.secret_access_key.take() {
+ cfg.secret_access_key = Some(v)
+ }
+ if let Some(v) = self.security_token.take() {
+ cfg.session_token = Some(v)
+ }
+ if let Some(v) = self.role_arn.take() {
+ cfg.role_arn = Some(v)
+ }
+ if let Some(v) = self.external_id.take() {
+ cfg.external_id = Some(v)
+ }
+
+ if cfg.region.is_none() {
+ // region is required to make signer work.
+ //
+ // If we don't know region after loading from builder and env.
+ // We will use `us-east-1` as default.
+ cfg.region = Some("us-east-1".to_string());
+ }
+
+ let region = cfg.region.to_owned().unwrap();
+ debug!("backend use region: {region}");
+
+ // Building endpoint.
+ let endpoint = self.build_endpoint(®ion);
+ debug!("backend use endpoint: {endpoint}");
+
+ let mut loader = AwsLoader::new(client.client(),
cfg).with_allow_anonymous();
+ if self.disable_ec2_metadata {
+ loader = loader.with_disable_ec2_metadata();
+ }
+ if let Some(v) = self.customed_credential_load.take() {
+ loader = loader.with_customed_credential_loader(v);
+ }
+
+ let signer = AwsV4Signer::new("s3", ®ion);
+
+ debug!("backend build finished");
+ Ok(WasabiBackend {
+ core: Arc::new(WasabiCore {
+ bucket: bucket.to_string(),
+ endpoint,
+ root,
+ server_side_encryption,
+ server_side_encryption_aws_kms_key_id,
+ server_side_encryption_customer_algorithm,
+ server_side_encryption_customer_key,
+ server_side_encryption_customer_key_md5,
+ default_storage_class,
+ signer,
+ loader,
+ client,
+ }),
+ })
+ }
+}
+
+/// Backend for wasabi service.
+#[derive(Debug, Clone)]
+pub struct WasabiBackend {
+ core: Arc<WasabiCore>,
+}
+
+#[async_trait]
+impl Accessor for WasabiBackend {
+ type Reader = IncomingAsyncBody;
+ type BlockingReader = ();
+ type Writer = WasabiWriter;
+ type BlockingWriter = ();
+ type Pager = WasabiPager;
+ type BlockingPager = ();
+
+ fn info(&self) -> AccessorInfo {
+ use AccessorCapability::*;
+ use AccessorHint::*;
+
+ let mut am = AccessorInfo::default();
+ am.set_scheme(Scheme::Wasabi)
+ .set_root(&self.core.root)
+ .set_name(&self.core.bucket)
+ .set_max_batch_operations(1000)
+ .set_capabilities(Read | Write | List | Scan | Presign | Batch |
Copy | Rename)
+ .set_hints(ReadStreamable);
+
+ am
+ }
+
+ async fn create(&self, path: &str, _: OpCreate) -> Result<RpCreate> {
+ let mut req =
+ self.core
+ .put_object_request(path, Some(0), None, None, None,
AsyncBody::Empty)?;
+
+ self.core.sign(&mut req).await?;
+
+ let resp = self.core.send(req).await?;
+
+ let status = resp.status();
+
+ match status {
+ StatusCode::CREATED | StatusCode::OK => {
+ resp.into_body().consume().await?;
+ Ok(RpCreate::default())
+ }
+ _ => Err(parse_error(resp).await?),
+ }
+ }
+
+ async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead,
Self::Reader)> {
+ let resp = self
+ .core
+ .get_object(path, args.range(), args.if_none_match())
+ .await?;
+
+ let status = resp.status();
+
+ match status {
+ StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
+ let meta = parse_into_metadata(path, resp.headers())?;
+ Ok((RpRead::with_metadata(meta), resp.into_body()))
+ }
+ _ => Err(parse_error(resp).await?),
+ }
+ }
+
+ async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite,
Self::Writer)> {
+ Ok((
+ RpWrite::default(),
+ WasabiWriter::new(self.core.clone(), args, path.to_string(), None),
+ ))
+ }
+
+ async fn copy(&self, from: &str, to: &str, _args: OpCopy) ->
Result<RpCopy> {
+ let resp = self.core.copy_object(from, to).await?;
+
+ let status = resp.status();
+
+ match status {
+ StatusCode::OK => {
+ // According to the documentation, when using copy_object, a
200 error may occur and we need to detect it.
+ //
https://docs.aws.amazon.com/AmazonS3/latest/API/API_CopyObject.html#API_CopyObject_RequestSyntax
+ resp.into_body().consume().await?;
+
+ Ok(RpCopy::default())
+ }
+ _ => Err(parse_error(resp).await?),
+ }
+ }
+
+ async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
+ // Stat root always returns a DIR.
+ if path == "/" {
+ return Ok(RpStat::new(Metadata::new(EntryMode::DIR)));
+ }
+
+ let resp = self.core.head_object(path, args.if_none_match()).await?;
+
+ let status = resp.status();
+
+ match status {
+ StatusCode::OK => parse_into_metadata(path,
resp.headers()).map(RpStat::new),
+ StatusCode::NOT_FOUND if path.ends_with('/') => {
+ Ok(RpStat::new(Metadata::new(EntryMode::DIR)))
+ }
+ _ => Err(parse_error(resp).await?),
+ }
+ }
+
+ async fn delete(&self, path: &str, _: OpDelete) -> Result<RpDelete> {
+ let resp = self.core.delete_object(path).await?;
+
+ let status = resp.status();
+
+ match status {
+ StatusCode::NO_CONTENT => Ok(RpDelete::default()),
+ _ => Err(parse_error(resp).await?),
+ }
+ }
+
+ async fn list(&self, path: &str, args: OpList) -> Result<(RpList,
Self::Pager)> {
+ Ok((
+ RpList::default(),
+ WasabiPager::new(self.core.clone(), path, "/", args.limit()),
+ ))
+ }
+
+ async fn scan(&self, path: &str, args: OpScan) -> Result<(RpScan,
Self::Pager)> {
+ Ok((
+ RpScan::default(),
+ WasabiPager::new(self.core.clone(), path, "", args.limit()),
+ ))
+ }
+
+ async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
+ // We will not send this request out, just for signing.
+ let mut req = match args.operation() {
+ PresignOperation::Stat(v) => self.core.head_object_request(path,
v.if_none_match())?,
+ PresignOperation::Read(v) => self.core.get_object_request(
+ path,
+ v.range(),
+ v.override_content_disposition(),
+ v.override_cache_control(),
+ v.if_none_match(),
+ )?,
+ PresignOperation::Write(_) => {
+ self.core
+ .put_object_request(path, None, None, None, None,
AsyncBody::Empty)?
+ }
+ };
+
+ self.core.sign_query(&mut req, args.expire()).await?;
+
+ // We don't need this request anymore, consume it directly.
+ let (parts, _) = req.into_parts();
+
+ Ok(RpPresign::new(PresignedRequest::new(
+ parts.method,
+ parts.uri,
+ parts.headers,
+ )))
+ }
+
+ async fn batch(&self, args: OpBatch) -> Result<RpBatch> {
+ let ops = args.into_operation();
+ if ops.len() > 1000 {
+ return Err(Error::new(
+ ErrorKind::Unsupported,
+ "s3 services only allow delete up to 1000 keys at once",
+ )
+ .with_context("length", ops.len().to_string()));
+ }
+
+ let paths = ops.into_iter().map(|(p, _)| p).collect();
+
+ let resp = self.core.delete_objects(paths).await?;
+
+ let status = resp.status();
+
+ if let StatusCode::OK = status {
+ let bs = resp.into_body().bytes().await?;
+
+ let result: DeleteObjectsResult =
+
quick_xml::de::from_reader(bs.reader()).map_err(new_xml_deserialize_error)?;
+
+ let mut batched_result = Vec::with_capacity(result.deleted.len() +
result.error.len());
+ for i in result.deleted {
+ let path = build_rel_path(&self.core.root, &i.key);
+ batched_result.push((path, Ok(RpDelete::default().into())));
+ }
+ // TODO: we should handle those errors with code.
+ for i in result.error {
+ let path = build_rel_path(&self.core.root, &i.key);
+
+ batched_result.push((
+ path,
+ Err(Error::new(ErrorKind::Unexpected, &format!("{i:?}"))),
+ ));
+ }
+
+ Ok(RpBatch::new(batched_result))
+ } else {
+ Err(parse_error(resp).await?)
+ }
+ }
+
+ /// Execute rename API call
+ /// Wasabi will auto-create missing path for destination `to` if any
+ async fn rename(&self, from: &str, to: &str, _args: OpRename) ->
Result<RpRename> {
+ let resp = self.core.rename_object(from, to).await?;
+
+ let status = resp.status();
+
+ match status {
+ StatusCode::OK => Ok(RpRename::default()),
+ _ => Err(parse_error(resp).await?),
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_is_valid_bucket() {
+ let bucket_cases = vec![
+ ("", false, false),
+ ("test", false, true),
+ ("test.xyz", false, true),
+ ("", true, false),
+ ("test", true, true),
+ ("test.xyz", true, false),
+ ];
+
+ for (bucket, enable_virtual_host_style, expected) in bucket_cases {
+ let mut b = WasabiBuilder::default();
+ b.bucket(bucket);
+ if enable_virtual_host_style {
+ b.enable_virtual_host_style();
+ }
+ assert_eq!(b.is_bucket_valid(), expected)
+ }
+ }
+
+ #[test]
+ fn test_build_endpoint() {
+ let _ = env_logger::try_init();
+
+ let endpoint_cases = vec![
+ Some("s3.wasabisys.com"),
+ Some("https://s3.wasabisys.com"),
+ Some("https://s3.us-east-2.amazonaws.com"),
+ None,
+ ];
+
+ for endpoint in &endpoint_cases {
+ let mut b = WasabiBuilder::default();
+ b.bucket("test");
+ if let Some(endpoint) = endpoint {
+ b.endpoint(endpoint);
+ }
+
+ let endpoint = b.build_endpoint("us-east-2");
+ assert_eq!(endpoint, "https://s3.us-east-2.wasabisys.com/test");
+ }
+
+ for endpoint in &endpoint_cases {
+ let mut b = WasabiBuilder::default();
+ b.bucket("test");
+ b.enable_virtual_host_style();
+ if let Some(endpoint) = endpoint {
+ b.endpoint(endpoint);
+ }
+
+ let endpoint = b.build_endpoint("us-east-2");
+ assert_eq!(endpoint, "https://test.s3.us-east-2.wasabisys.com");
+ }
+ }
+}
diff --git a/core/src/services/s3/core.rs b/core/src/services/wasabi/core.rs
similarity index 89%
copy from core/src/services/s3/core.rs
copy to core/src/services/wasabi/core.rs
index 4f099188..4fdc385f 100644
--- a/core/src/services/s3/core.rs
+++ b/core/src/services/wasabi/core.rs
@@ -24,11 +24,11 @@ use std::time::Duration;
use backon::ExponentialBuilder;
use backon::Retryable;
use bytes::Bytes;
+use http::header::HeaderName;
use http::header::CACHE_CONTROL;
use http::header::CONTENT_DISPOSITION;
use http::header::CONTENT_LENGTH;
use http::header::CONTENT_TYPE;
-use http::header::{HeaderName, IF_NONE_MATCH};
use http::HeaderValue;
use http::Request;
use http::Response;
@@ -56,21 +56,27 @@ mod constants {
"x-amz-server-side-encryption-aws-kms-key-id";
pub const X_AMZ_STORAGE_CLASS: &str = "x-amz-storage-class";
+ #[allow(dead_code)]
pub const X_AMZ_COPY_SOURCE_SERVER_SIDE_ENCRYPTION_CUSTOMER_ALGORITHM:
&str =
"x-amz-copy-source-server-side-encryption-customer-algorithm";
+ #[allow(dead_code)]
pub const X_AMZ_COPY_SOURCE_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY: &str =
"x-amz-copy-source-server-side-encryption-customer-key";
+ #[allow(dead_code)]
pub const X_AMZ_COPY_SOURCE_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY_MD5: &str =
"x-amz-copy-source-server-side-encryption-customer-key-md5";
pub const RESPONSE_CONTENT_DISPOSITION: &str =
"response-content-disposition";
pub const RESPONSE_CACHE_CONTROL: &str = "response-cache-control";
+
+ pub const DESTINATION: &str = "Destination";
+ pub const OVERWRITE: &str = "Overwrite";
}
static BACKOFF: Lazy<ExponentialBuilder> =
Lazy::new(|| ExponentialBuilder::default().with_jitter());
-pub struct S3Core {
+pub struct WasabiCore {
pub bucket: String,
pub endpoint: String,
pub root: String,
@@ -86,9 +92,9 @@ pub struct S3Core {
pub client: HttpClient,
}
-impl Debug for S3Core {
+impl Debug for WasabiCore {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
- f.debug_struct("S3Core")
+ f.debug_struct("WasabiCore")
.field("bucket", &self.bucket)
.field("endpoint", &self.endpoint)
.field("root", &self.root)
@@ -96,7 +102,7 @@ impl Debug for S3Core {
}
}
-impl S3Core {
+impl WasabiCore {
/// If credential is not found, we will not sign the request.
async fn load_credential(&self) -> Result<Option<AwsCredential>> {
let cred = { || self.loader.load() }
@@ -141,7 +147,7 @@ impl S3Core {
/// # Note
///
/// header like X_AMZ_SERVER_SIDE_ENCRYPTION doesn't need to set while
- // get or stat.
+ /// get or stat.
pub fn insert_sse_headers(
&self,
mut req: http::request::Builder,
@@ -200,8 +206,8 @@ impl S3Core {
}
}
-impl S3Core {
- pub fn s3_head_object_request(
+impl WasabiCore {
+ pub fn head_object_request(
&self,
path: &str,
if_none_match: Option<&str>,
@@ -215,7 +221,7 @@ impl S3Core {
req = self.insert_sse_headers(req, false);
if let Some(if_none_match) = if_none_match {
- req = req.header(IF_NONE_MATCH, if_none_match);
+ req = req.header(http::header::IF_NONE_MATCH, if_none_match);
}
let req = req
@@ -225,7 +231,7 @@ impl S3Core {
Ok(req)
}
- pub fn s3_get_object_request(
+ pub fn get_object_request(
&self,
path: &str,
range: BytesRange,
@@ -265,7 +271,7 @@ impl S3Core {
}
if let Some(if_none_match) = if_none_match {
- req = req.header(IF_NONE_MATCH, if_none_match);
+ req = req.header(http::header::IF_NONE_MATCH, if_none_match);
}
// Set SSE headers.
@@ -279,20 +285,20 @@ impl S3Core {
Ok(req)
}
- pub async fn s3_get_object(
+ pub async fn get_object(
&self,
path: &str,
range: BytesRange,
if_none_match: Option<&str>,
) -> Result<Response<IncomingAsyncBody>> {
- let mut req = self.s3_get_object_request(path, range, None, None,
if_none_match)?;
+ let mut req = self.get_object_request(path, range, None, None,
if_none_match)?;
self.sign(&mut req).await?;
self.send(req).await
}
- pub fn s3_put_object_request(
+ pub fn put_object_request(
&self,
path: &str,
size: Option<usize>,
@@ -337,34 +343,19 @@ impl S3Core {
Ok(req)
}
- pub async fn s3_head_object(
+ pub async fn head_object(
&self,
path: &str,
if_none_match: Option<&str>,
) -> Result<Response<IncomingAsyncBody>> {
- let p = build_abs_path(&self.root, path);
-
- let url = format!("{}/{}", self.endpoint, percent_encode_path(&p));
-
- let mut req = Request::head(&url);
-
- // Set SSE headers.
- req = self.insert_sse_headers(req, false);
-
- if let Some(if_none_match) = if_none_match {
- req = req.header(IF_NONE_MATCH, if_none_match);
- }
-
- let mut req = req
- .body(AsyncBody::Empty)
- .map_err(new_request_build_error)?;
+ let mut req = self.head_object_request(path, if_none_match)?;
self.sign(&mut req).await?;
self.send(req).await
}
- pub async fn s3_delete_object(&self, path: &str) ->
Result<Response<IncomingAsyncBody>> {
+ pub async fn delete_object(&self, path: &str) ->
Result<Response<IncomingAsyncBody>> {
let p = build_abs_path(&self.root, path);
let url = format!("{}/{}", self.endpoint, percent_encode_path(&p));
@@ -378,11 +369,7 @@ impl S3Core {
self.send(req).await
}
- pub async fn s3_copy_object(
- &self,
- from: &str,
- to: &str,
- ) -> Result<Response<IncomingAsyncBody>> {
+ pub async fn copy_object(&self, from: &str, to: &str) ->
Result<Response<IncomingAsyncBody>> {
let from = build_abs_path(&self.root, from);
let to = build_abs_path(&self.root, to);
@@ -442,7 +429,7 @@ impl S3Core {
/// Make this functions as `pub(suber)` because `DirStream` depends
/// on this.
- pub async fn s3_list_objects(
+ pub async fn list_objects(
&self,
path: &str,
continuation_token: &str,
@@ -481,7 +468,7 @@ impl S3Core {
self.send(req).await
}
- pub async fn s3_initiate_multipart_upload(
+ pub async fn initiate_multipart_upload(
&self,
path: &str,
content_type: Option<&str>,
@@ -523,7 +510,7 @@ impl S3Core {
self.send(req).await
}
- pub fn s3_upload_part_request(
+ pub fn upload_part_request(
&self,
path: &str,
upload_id: &str,
@@ -556,7 +543,7 @@ impl S3Core {
Ok(req)
}
- pub async fn s3_complete_multipart_upload(
+ pub async fn complete_multipart_upload(
&self,
path: &str,
upload_id: &str,
@@ -595,7 +582,7 @@ impl S3Core {
}
/// Abort an on-going multipart upload.
- pub async fn s3_abort_multipart_upload(
+ pub async fn abort_multipart_upload(
&self,
path: &str,
upload_id: &str,
@@ -616,10 +603,7 @@ impl S3Core {
self.send(req).await
}
- pub async fn s3_delete_objects(
- &self,
- paths: Vec<String>,
- ) -> Result<Response<IncomingAsyncBody>> {
+ pub async fn delete_objects(&self, paths: Vec<String>) ->
Result<Response<IncomingAsyncBody>> {
let url = format!("{}/?delete", self.endpoint);
let req = Request::post(&url);
@@ -649,6 +633,82 @@ impl S3Core {
self.send(req).await
}
+
+ pub async fn put_object(
+ &self,
+ path: &str,
+ size: Option<usize>,
+ content_type: Option<&str>,
+ content_disposition: Option<&str>,
+ cache_control: Option<&str>,
+ body: AsyncBody,
+ ) -> Result<Response<IncomingAsyncBody>> {
+ let mut req = self.put_object_request(
+ path,
+ size,
+ content_type,
+ content_disposition,
+ cache_control,
+ body,
+ )?;
+
+ self.sign(&mut req).await?;
+
+ self.send(req).await
+ }
+
+ pub async fn rename_object(&self, from: &str, to: &str) ->
Result<Response<IncomingAsyncBody>> {
+ let from = percent_encode_path(build_abs_path(&self.root,
from).as_str());
+ let to = percent_encode_path(build_abs_path(&self.root, to).as_str());
+
+ let url = format!("{}/{}", self.endpoint, from);
+
+ let mut req = Request::builder().method("MOVE").uri(url);
+
+ // Set SSE headers.
+ req = self.insert_sse_headers(req, true);
+
+ let mut req = req
+ .header(constants::DESTINATION, to)
+ .header(constants::OVERWRITE, "true")
+ .body(AsyncBody::Empty)
+ .map_err(new_request_build_error)?;
+
+ self.sign(&mut req).await?;
+
+ self.send(req).await
+ }
+
+ pub async fn append_object(
+ &self,
+ path: &str,
+ size: Option<usize>,
+ body: AsyncBody,
+ ) -> Result<Response<IncomingAsyncBody>> {
+ let p = build_abs_path(&self.root, path);
+
+ let url = format!("{}/{}?append", self.endpoint,
percent_encode_path(&p));
+
+ let mut req = Request::put(&url);
+
+ if let Some(size) = size {
+ req = req.header(CONTENT_LENGTH, size)
+ }
+
+ // Set storage class header
+ if let Some(v) = &self.default_storage_class {
+ req =
req.header(HeaderName::from_static(constants::X_AMZ_STORAGE_CLASS), v);
+ }
+
+ // Set SSE headers.
+ req = self.insert_sse_headers(req, true);
+
+ let mut req = req.body(body).map_err(new_request_build_error)?;
+
+ self.sign(&mut req).await?;
+
+ self.send(req).await
+ }
}
/// Result of CreateMultipartUpload
diff --git a/core/src/services/wasabi/error.rs
b/core/src/services/wasabi/error.rs
new file mode 100644
index 00000000..1eb5407e
--- /dev/null
+++ b/core/src/services/wasabi/error.rs
@@ -0,0 +1,107 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use bytes::Buf;
+use http::Response;
+use http::StatusCode;
+use quick_xml::de;
+use serde::Deserialize;
+
+use crate::raw::*;
+use crate::Error;
+use crate::ErrorKind;
+use crate::Result;
+
+/// WasabiError is the error returned by wasabi service.
+#[derive(Default, Debug, Deserialize)]
+#[serde(default, rename_all = "PascalCase")]
+struct WasabiError {
+ code: String,
+ message: String,
+ resource: String,
+ request_id: String,
+}
+
+/// Parse error response into Error.
+pub async fn parse_error(resp: Response<IncomingAsyncBody>) -> Result<Error> {
+ let (parts, body) = resp.into_parts();
+ let bs = body.bytes().await?;
+
+ let (mut kind, mut retryable) = match parts.status {
+ StatusCode::NOT_FOUND => (ErrorKind::NotFound, false),
+ StatusCode::FORBIDDEN => (ErrorKind::PermissionDenied, false),
+ StatusCode::PRECONDITION_FAILED => (ErrorKind::PreconditionFailed,
false),
+ StatusCode::INTERNAL_SERVER_ERROR
+ | StatusCode::BAD_GATEWAY
+ | StatusCode::SERVICE_UNAVAILABLE
+ | StatusCode::GATEWAY_TIMEOUT => (ErrorKind::Unexpected, true),
+ _ => (ErrorKind::Unexpected, false),
+ };
+
+ let (message, wasabi_err) = de::from_reader::<_,
WasabiError>(bs.clone().reader())
+ .map(|err| (format!("{err:?}"), Some(err)))
+ .unwrap_or_else(|_| (String::from_utf8_lossy(&bs).into_owned(), None));
+
+ // All possible error code:
<https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html#ErrorCodeList>
+ if let Some(wasabi_err) = wasabi_err {
+ (kind, retryable) = match wasabi_err.code.as_str() {
+ // > Your socket connection to the server was not read from
+ // > or written to within the timeout period."
+ //
+ // It's Ok for us to retry it again.
+ "RequestTimeout" => (ErrorKind::Unexpected, true),
+ _ => (kind, retryable),
+ }
+ }
+
+ let mut err = Error::new(kind, &message).with_context("response",
format!("{parts:?}"));
+
+ if retryable {
+ err = err.set_temporary();
+ }
+
+ Ok(err)
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ /// Error response example is from
https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html
+ #[test]
+ fn test_parse_error() {
+ let bs = bytes::Bytes::from(
+ r#"
+<?xml version="1.0" encoding="UTF-8"?>
+<Error>
+ <Code>NoSuchKey</Code>
+ <Message>The resource you requested does not exist</Message>
+ <Resource>/mybucket/myfoto.jpg</Resource>
+ <RequestId>4442587FB7D0A2F9</RequestId>
+</Error>
+"#,
+ );
+
+ let out: WasabiError = de::from_reader(bs.reader()).expect("must
success");
+ println!("{out:?}");
+
+ assert_eq!(out.code, "NoSuchKey");
+ assert_eq!(out.message, "The resource you requested does not exist");
+ assert_eq!(out.resource, "/mybucket/myfoto.jpg");
+ assert_eq!(out.request_id, "4442587FB7D0A2F9");
+ }
+}
diff --git a/core/src/services/wasabi/mod.rs b/core/src/services/wasabi/mod.rs
new file mode 100644
index 00000000..7356abc0
--- /dev/null
+++ b/core/src/services/wasabi/mod.rs
@@ -0,0 +1,24 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+mod backend;
+pub use backend::WasabiBuilder as Wasabi;
+
+mod core;
+mod error;
+mod pager;
+mod writer;
diff --git a/core/src/services/wasabi/pager.rs
b/core/src/services/wasabi/pager.rs
new file mode 100644
index 00000000..038d69a1
--- /dev/null
+++ b/core/src/services/wasabi/pager.rs
@@ -0,0 +1,231 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use async_trait::async_trait;
+use bytes::Buf;
+use quick_xml::de;
+use serde::Deserialize;
+
+use super::core::WasabiCore;
+use super::error::parse_error;
+use crate::raw::*;
+use crate::EntryMode;
+use crate::Metadata;
+use crate::Result;
+
+pub struct WasabiPager {
+ core: Arc<WasabiCore>,
+
+ path: String,
+ delimiter: String,
+ limit: Option<usize>,
+
+ token: String,
+ done: bool,
+}
+
+impl WasabiPager {
+ pub fn new(core: Arc<WasabiCore>, path: &str, delimiter: &str, limit:
Option<usize>) -> Self {
+ Self {
+ core,
+
+ path: path.to_string(),
+ delimiter: delimiter.to_string(),
+ limit,
+
+ token: "".to_string(),
+ done: false,
+ }
+ }
+}
+
+#[async_trait]
+impl oio::Page for WasabiPager {
+ async fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> {
+ if self.done {
+ return Ok(None);
+ }
+
+ let resp = self
+ .core
+ .list_objects(&self.path, &self.token, &self.delimiter, self.limit)
+ .await?;
+
+ if resp.status() != http::StatusCode::OK {
+ return Err(parse_error(resp).await?);
+ }
+
+ let bs = resp.into_body().bytes().await?;
+
+ let output: Output =
de::from_reader(bs.reader()).map_err(new_xml_deserialize_error)?;
+
+ // Try our best to check whether this list is done.
+ //
+ // - Check `is_truncated`
+ // - Check `next_continuation_token`
+ // - Check the length of `common_prefixes` and `contents` (very rarely
case)
+ self.done = if let Some(is_truncated) = output.is_truncated {
+ !is_truncated
+ } else if let Some(next_continuation_token) =
output.next_continuation_token.as_ref() {
+ next_continuation_token.is_empty()
+ } else {
+ output.common_prefixes.is_empty() && output.contents.is_empty()
+ };
+ self.token =
output.next_continuation_token.clone().unwrap_or_default();
+
+ let mut entries = Vec::with_capacity(output.common_prefixes.len() +
output.contents.len());
+
+ for prefix in output.common_prefixes {
+ let de = oio::Entry::new(
+ &build_rel_path(&self.core.root, &prefix.prefix),
+ Metadata::new(EntryMode::DIR),
+ );
+
+ entries.push(de);
+ }
+
+ for object in output.contents {
+ // s3 could return the dir itself in contents
+ // which endswith `/`.
+ // We should ignore them.
+ if object.key.ends_with('/') {
+ continue;
+ }
+
+ let mut meta = Metadata::new(EntryMode::FILE);
+
+ meta.set_etag(&object.etag);
+ meta.set_content_md5(object.etag.trim_matches('"'));
+ meta.set_content_length(object.size);
+
+ // object.last_modified provides more precious time that contains
+ // nanosecond, let's trim them.
+
meta.set_last_modified(parse_datetime_from_rfc3339(object.last_modified.as_str())?);
+
+ let de = oio::Entry::new(&build_rel_path(&self.core.root,
&object.key), meta);
+
+ entries.push(de);
+ }
+
+ Ok(Some(entries))
+ }
+}
+
+/// Output of ListBucket/ListObjects.
+///
+/// ## Note
+///
+/// Use `Option` in `is_truncated` and `next_continuation_token` to make
+/// the behavior more clear so that we can be compatible to more s3 services.
+///
+/// And enable `serde(default)` so that we can keep going even when some field
+/// is not exist.
+#[derive(Default, Debug, Deserialize)]
+#[serde(default, rename_all = "PascalCase")]
+struct Output {
+ is_truncated: Option<bool>,
+ next_continuation_token: Option<String>,
+ common_prefixes: Vec<OutputCommonPrefix>,
+ contents: Vec<OutputContent>,
+}
+
+#[derive(Default, Debug, Eq, PartialEq, Deserialize)]
+#[serde(rename_all = "PascalCase")]
+struct OutputContent {
+ key: String,
+ size: u64,
+ last_modified: String,
+ #[serde(rename = "ETag")]
+ etag: String,
+}
+
+#[derive(Default, Debug, Eq, PartialEq, Deserialize)]
+#[serde(rename_all = "PascalCase")]
+struct OutputCommonPrefix {
+ prefix: String,
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_parse_list_output() {
+ let bs = bytes::Bytes::from(
+ r#"<ListBucketResult
xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
+ <Name>example-bucket</Name>
+ <Prefix>photos/2006/</Prefix>
+ <KeyCount>3</KeyCount>
+ <MaxKeys>1000</MaxKeys>
+ <Delimiter>/</Delimiter>
+ <IsTruncated>false</IsTruncated>
+ <Contents>
+ <Key>photos/2006</Key>
+ <LastModified>2016-04-30T23:51:29.000Z</LastModified>
+ <ETag>"d41d8cd98f00b204e9800998ecf8427e"</ETag>
+ <Size>56</Size>
+ <StorageClass>STANDARD</StorageClass>
+ </Contents>
+ <Contents>
+ <Key>photos/2007</Key>
+ <LastModified>2016-04-30T23:51:29.000Z</LastModified>
+ <ETag>"d41d8cd98f00b204e9800998ecf8427e"</ETag>
+ <Size>100</Size>
+ <StorageClass>STANDARD</StorageClass>
+ </Contents>
+
+ <CommonPrefixes>
+ <Prefix>photos/2006/February/</Prefix>
+ </CommonPrefixes>
+ <CommonPrefixes>
+ <Prefix>photos/2006/January/</Prefix>
+ </CommonPrefixes>
+</ListBucketResult>"#,
+ );
+
+ let out: Output = de::from_reader(bs.reader()).expect("must success");
+
+ assert!(!out.is_truncated.unwrap());
+ assert!(out.next_continuation_token.is_none());
+ assert_eq!(
+ out.common_prefixes
+ .iter()
+ .map(|v| v.prefix.clone())
+ .collect::<Vec<String>>(),
+ vec!["photos/2006/February/", "photos/2006/January/"]
+ );
+ assert_eq!(
+ out.contents,
+ vec![
+ OutputContent {
+ key: "photos/2006".to_string(),
+ size: 56,
+ etag: "\"d41d8cd98f00b204e9800998ecf8427e\"".to_string(),
+ last_modified: "2016-04-30T23:51:29.000Z".to_string(),
+ },
+ OutputContent {
+ key: "photos/2007".to_string(),
+ size: 100,
+ last_modified: "2016-04-30T23:51:29.000Z".to_string(),
+ etag: "\"d41d8cd98f00b204e9800998ecf8427e\"".to_string(),
+ }
+ ]
+ )
+ }
+}
diff --git a/core/src/services/wasabi/writer.rs
b/core/src/services/wasabi/writer.rs
new file mode 100644
index 00000000..90d193df
--- /dev/null
+++ b/core/src/services/wasabi/writer.rs
@@ -0,0 +1,107 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use async_trait::async_trait;
+use bytes::Bytes;
+use http::StatusCode;
+
+use super::core::*;
+use super::error::parse_error;
+use crate::ops::OpWrite;
+use crate::raw::*;
+use crate::*;
+
+pub struct WasabiWriter {
+ core: Arc<WasabiCore>,
+
+ op: OpWrite,
+ path: String,
+
+ upload_id: Option<String>,
+}
+
+impl WasabiWriter {
+ pub fn new(
+ core: Arc<WasabiCore>,
+ op: OpWrite,
+ path: String,
+ upload_id: Option<String>,
+ ) -> Self {
+ WasabiWriter {
+ core,
+
+ op,
+ path,
+ upload_id,
+ }
+ }
+}
+
+#[async_trait]
+impl oio::Write for WasabiWriter {
+ async fn write(&mut self, bs: Bytes) -> Result<()> {
+ debug_assert!(
+ self.upload_id.is_none(),
+ "Writer initiated with upload id, but users trying to call write,
must be buggy"
+ );
+
+ let resp = self
+ .core
+ .put_object(
+ &self.path,
+ Some(bs.len()),
+ self.op.content_type(),
+ self.op.content_disposition(),
+ self.op.cache_control(),
+ AsyncBody::Bytes(bs),
+ )
+ .await?;
+
+ match resp.status() {
+ StatusCode::CREATED | StatusCode::OK => {
+ resp.into_body().consume().await?;
+ Ok(())
+ }
+ _ => Err(parse_error(resp).await?),
+ }
+ }
+
+ async fn append(&mut self, bs: Bytes) -> Result<()> {
+ let resp = self
+ .core
+ .append_object(&self.path, Some(bs.len()), AsyncBody::Bytes(bs))
+ .await?;
+
+ match resp.status() {
+ StatusCode::CREATED | StatusCode::OK | StatusCode::NO_CONTENT => {
+ resp.into_body().consume().await?;
+ Ok(())
+ }
+ _ => Err(parse_error(resp).await?),
+ }
+ }
+
+ async fn abort(&mut self) -> Result<()> {
+ Ok(())
+ }
+
+ async fn close(&mut self) -> Result<()> {
+ Ok(())
+ }
+}
diff --git a/core/src/types/scheme.rs b/core/src/types/scheme.rs
index c5c1b1d9..aff72868 100644
--- a/core/src/types/scheme.rs
+++ b/core/src/types/scheme.rs
@@ -71,6 +71,8 @@ pub enum Scheme {
S3,
/// [sled][crate::services::Sled]: Sled services
Sled,
+ /// [wasabi][crate::services::Wasabi]: Wasabi service
+ Wasabi,
/// [webdav][crate::services::Webdav]: WebDAV support.
Webdav,
/// [webhdfs][crate::services::Webhdfs]: WebHDFS RESTful API Services
@@ -129,6 +131,7 @@ impl FromStr for Scheme {
"s3" => Ok(Scheme::S3),
"sled" => Ok(Scheme::Sled),
"oss" => Ok(Scheme::Oss),
+ "wasabi" => Ok(Scheme::Wasabi),
"webdav" => Ok(Scheme::Webdav),
"webhdfs" => Ok(Scheme::Webhdfs),
_ => Ok(Scheme::Custom(Box::leak(s.into_boxed_str()))),
@@ -159,6 +162,7 @@ impl From<Scheme> for &'static str {
Scheme::S3 => "s3",
Scheme::Sled => "sled",
Scheme::Oss => "oss",
+ Scheme::Wasabi => "wasabi",
Scheme::Webdav => "webdav",
Scheme::Webhdfs => "webhdfs",
Scheme::Custom(v) => v,