This is an automated email from the ASF dual-hosted git repository.
suyanhanx pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
The following commit(s) were added to refs/heads/main by this push:
new d119ea33 feat: Add COS service support (#2269)
d119ea33 is described below
commit d119ea33380d4fa092878f03f135d336cd8bb037
Author: Xuanwo <[email protected]>
AuthorDate: Thu May 18 22:00:40 2023 +0800
feat: Add COS service support (#2269)
Signed-off-by: Xuanwo <[email protected]>
---
.env.example | 6 +
Cargo.lock | 31 +--
core/Cargo.toml | 8 +-
core/src/services/azblob/backend.rs | 1 +
core/src/services/azdfs/backend.rs | 1 +
core/src/services/cos/backend.rs | 468 ++++++++++++++++++++++++++++++++++++
core/src/services/cos/core.rs | 290 ++++++++++++++++++++++
core/src/services/cos/error.rs | 106 ++++++++
core/src/services/cos/mod.rs | 24 ++
core/src/services/cos/pager.rs | 210 ++++++++++++++++
core/src/services/cos/writer.rs | 76 ++++++
core/src/services/mod.rs | 5 +
core/src/types/scheme.rs | 4 +
core/tests/behavior/main.rs | 2 +
14 files changed, 1216 insertions(+), 16 deletions(-)
diff --git a/.env.example b/.env.example
index d6e64ed2..7c5e02e4 100644
--- a/.env.example
+++ b/.env.example
@@ -4,6 +4,12 @@ OPENDAL_MEMORY_TEST=on
OPENDAL_FS_TEST=false
OPENDAL_FS_ROOT=/path/to/dir
OPENDAL_FS_ATOMIC_WRITE_DIR=/path/to/tempdir
+# cos
+OPENDAL_COS_TEST=false
+OPENDAL_COS_BUCKET=opendal-testing-1318209832
+OPENDAL_COS_ENDPOINT=https://cos.ap-singapore.myqcloud.com
+OPENDAL_COS_SECRET_ID=<secret_id>
+OPENDAL_COS_SECRET_KEY=<secret_key>
# s3
OPENDAL_S3_TEST=false
OPENDAL_S3_BUCKET=<bucket>
diff --git a/Cargo.lock b/Cargo.lock
index bf0603b3..f05e4547 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1103,9 +1103,9 @@ checksum =
"eaa37046cc0f6c3cc6090fbdbf73ef0b8ef4cfcc37f6befc0020f63e8cf121e1"
[[package]]
name = "der"
-version = "0.6.1"
+version = "0.7.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "f1a467a65c5e759bce6e65eaf91cc29f466cdc57cb65777bd646872a8a1fd4de"
+checksum = "56acb310e15652100da43d130af8d97b509e95af61aab1c5a7939ef24337ee17"
dependencies = [
"const-oid",
"pem-rfc7468",
@@ -3088,9 +3088,9 @@ dependencies = [
[[package]]
name = "pem-rfc7468"
-version = "0.6.0"
+version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "24d159833a9105500e0398934e205e0773f0b27529557134ecfc51c27646adac"
+checksum = "88b39c9bfcfc231068454382784bb460aae594343fb030d46e9f50a645418412"
dependencies = [
"base64ct",
]
@@ -3135,21 +3135,20 @@ checksum =
"8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
name = "pkcs1"
-version = "0.4.1"
+version = "0.7.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "eff33bdbdfc54cc98a2eca766ebdec3e1b8fb7387523d5c9c9a2891da856f719"
+checksum = "c8ffb9f10fa047879315e6625af03c164b16962a5368d724ed16323b68ace47f"
dependencies = [
"der",
"pkcs8",
"spki",
- "zeroize",
]
[[package]]
name = "pkcs8"
-version = "0.9.0"
+version = "0.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "9eca2c590a5f85da82668fa685c09ce2888b9430e83299debf1f34b65fd4a4ba"
+checksum = "f950b2377845cebe5cf8b5165cb3cc1a5e0fa5cfa3e1f7f55707d8fd82e0a7b7"
dependencies = [
"der",
"spki",
@@ -3715,9 +3714,9 @@ checksum =
"456c603be3e8d448b072f410900c09faf164fbce2d480456f50eea6e25f9c848"
[[package]]
name = "reqsign"
-version = "0.10.1"
+version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "5d36dae58fbc1db90e1cf14ca8fabcba92b7aa3c282d5e46bcdf16b9c28ab04c"
+checksum = "0ef945b588044afa4ee3a87d3cbfadd8c47304e8a6fe17f7b9976aa215a47a5f"
dependencies = [
"anyhow",
"async-trait",
@@ -3832,11 +3831,12 @@ dependencies = [
[[package]]
name = "rsa"
-version = "0.8.2"
+version = "0.9.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "55a77d189da1fee555ad95b7e50e7457d91c0e089ec68ca69ad2989413bbdab4"
+checksum = "6ab43bb47d23c1a631b4b680199a45255dce26fa9ab2fa902581f624ff13e6a8"
dependencies = [
"byteorder",
+ "const-oid",
"digest",
"num-bigint-dig",
"num-integer",
@@ -3846,6 +3846,7 @@ dependencies = [
"pkcs8",
"rand_core 0.6.4",
"signature",
+ "spki",
"subtle",
"zeroize",
]
@@ -4297,9 +4298,9 @@ dependencies = [
[[package]]
name = "spki"
-version = "0.6.0"
+version = "0.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "67cf02bbac7a337dc36e4f5a693db6c21e7863f45070f7064577eb4367a3212b"
+checksum = "9d1e996ef02c474957d681f1b05213dfb0abab947b446a62d37770b23500184a"
dependencies = [
"base64ct",
"der",
diff --git a/core/Cargo.toml b/core/Cargo.toml
index ccbc2451..e8b23005 100644
--- a/core/Cargo.toml
+++ b/core/Cargo.toml
@@ -38,6 +38,7 @@ default = [
"rustls",
"services-azblob",
"services-azdfs",
+ "services-cos",
"services-fs",
"services-gcs",
"services-ghac",
@@ -100,6 +101,11 @@ services-azdfs = [
"reqsign?/services-azblob",
"reqsign?/reqwest_request",
]
+services-cos = [
+ "dep:reqsign",
+ "reqsign?/services-tencent",
+ "reqsign?/reqwest_request",
+]
services-dashmap = ["dep:dashmap"]
services-fs = ["tokio/fs"]
services-ftp = ["dep:suppaftp", "dep:lazy-regex", "dep:bb8", "dep:async-tls"]
@@ -196,7 +202,7 @@ redis = { version = "0.22", features = [
"tokio-comp",
"connection-manager",
], optional = true }
-reqsign = { version = "0.10.1", default-features = false, optional = true }
+reqsign = { version = "0.11.0", default-features = false, optional = true }
reqwest = { version = "0.11.13", features = [
"stream",
], default-features = false }
diff --git a/core/src/services/azblob/backend.rs
b/core/src/services/azblob/backend.rs
index 9712c4c9..a4c5f68a 100644
--- a/core/src/services/azblob/backend.rs
+++ b/core/src/services/azblob/backend.rs
@@ -388,6 +388,7 @@ impl Builder for AzblobBuilder {
.or_else(||
infer_storage_name_from_endpoint(endpoint.as_str())),
account_key: self.account_key.clone(),
sas_token: self.sas_token.clone(),
+ ..Default::default()
};
let cred_loader = AzureStorageLoader::new(config_loader);
diff --git a/core/src/services/azdfs/backend.rs
b/core/src/services/azdfs/backend.rs
index d428a780..97bffbe9 100644
--- a/core/src/services/azdfs/backend.rs
+++ b/core/src/services/azdfs/backend.rs
@@ -261,6 +261,7 @@ impl Builder for AzdfsBuilder {
.or_else(||
infer_storage_name_from_endpoint(endpoint.as_str())),
account_key: self.account_key.clone(),
sas_token: None,
+ ..Default::default()
};
let cred_loader = AzureStorageLoader::new(config_loader);
diff --git a/core/src/services/cos/backend.rs b/core/src/services/cos/backend.rs
new file mode 100644
index 00000000..5ca4e418
--- /dev/null
+++ b/core/src/services/cos/backend.rs
@@ -0,0 +1,468 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+use std::fmt::Debug;
+use std::sync::Arc;
+
+use async_trait::async_trait;
+use http::StatusCode;
+use http::Uri;
+use log::debug;
+use reqsign::TencentCosConfig;
+use reqsign::TencentCosCredentialLoader;
+use reqsign::TencentCosSigner;
+
+use super::core::CosCore;
+use super::error::parse_error;
+use super::pager::CosPager;
+use super::writer::CosWriter;
+use crate::ops::*;
+use crate::raw::*;
+use crate::*;
+
+/// Huawei Cloud COS services support.
+///
+/// # Capabilities
+///
+/// This service can be used to:
+///
+/// - [x] stat
+/// - [x] read
+/// - [x] write
+/// - [x] create_dir
+/// - [x] delete
+/// - [x] copy
+/// - [ ] rename
+/// - [x] list
+/// - [x] scan
+/// - [ ] presign
+/// - [ ] blocking
+///
+/// # Configuration
+///
+/// - `root`: Set the work directory for backend
+/// - `bucket`: Set the container name for backend
+/// - `endpoint`: Customizable endpoint setting
+/// - `access_key_id`: Set the access_key_id for backend.
+/// - `secret_access_key`: Set the secret_access_key for backend.
+///
+/// You can refer to [`CosBuilder`]'s docs for more information
+///
+/// # Example
+///
+/// ## Via Builder
+///
+/// ```no_run
+/// use anyhow::Result;
+/// use opendal::services::Cos;
+/// use opendal::Operator;
+///
+/// #[tokio::main]
+/// async fn main() -> Result<()> {
+/// // create backend builder
+/// let mut builder = Cos::default();
+///
+/// // set the storage bucket for OpenDAL
+/// builder.bucket("test");
+/// // set the endpoint for OpenDAL
+/// builder.endpoint("https://cos.ap-singapore.myqcloud.com");
+/// // Set the access_key_id and secret_access_key.
+/// //
+/// // OpenDAL will try load credential from the env.
+/// // If credential not set and no valid credential in env, OpenDAL will
+/// // send request without signing like anonymous user.
+/// builder.secret_id("secret_id");
+/// builder.secret_key("secret_access_key");
+///
+/// let op: Operator = Operator::new(builder)?.finish();
+///
+/// Ok(())
+/// }
+/// ```
+#[derive(Default, Clone)]
+pub struct CosBuilder {
+ root: Option<String>,
+ endpoint: Option<String>,
+ secret_id: Option<String>,
+ secret_key: Option<String>,
+ bucket: Option<String>,
+ http_client: Option<HttpClient>,
+}
+
+impl Debug for CosBuilder {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("Builder")
+ .field("root", &self.root)
+ .field("endpoint", &self.endpoint)
+ .field("secret_id", &"<redacted>")
+ .field("secret_key", &"<redacted>")
+ .field("bucket", &self.bucket)
+ .finish()
+ }
+}
+
+impl CosBuilder {
+ /// Set root of this backend.
+ ///
+ /// All operations will happen under this root.
+ pub fn root(&mut self, root: &str) -> &mut Self {
+ if !root.is_empty() {
+ self.root = Some(root.to_string())
+ }
+
+ self
+ }
+
+ /// Set endpoint of this backend.
+ ///
+ /// NOTE: no bucket or account id in endpoint, we will trim them if exists.
+ ///
+ /// # Examples
+ ///
+ /// - `https://cos.ap-singapore.myqcloud.com`
+ pub fn endpoint(&mut self, endpoint: &str) -> &mut Self {
+ if !endpoint.is_empty() {
+ self.endpoint = Some(endpoint.trim_end_matches('/').to_string());
+ }
+
+ self
+ }
+
+ /// Set secret_id of this backend.
+ /// - If it is set, we will take user's input first.
+ /// - If not, we will try to load it from environment.
+ pub fn secret_id(&mut self, secret_id: &str) -> &mut Self {
+ if !secret_id.is_empty() {
+ self.secret_id = Some(secret_id.to_string());
+ }
+
+ self
+ }
+
+ /// Set secret_key of this backend.
+ /// - If it is set, we will take user's input first.
+ /// - If not, we will try to load it from environment.
+ pub fn secret_key(&mut self, secret_key: &str) -> &mut Self {
+ if !secret_key.is_empty() {
+ self.secret_key = Some(secret_key.to_string());
+ }
+
+ self
+ }
+
+ /// Set bucket of this backend.
+ /// The param is required.
+ pub fn bucket(&mut self, bucket: &str) -> &mut Self {
+ if !bucket.is_empty() {
+ self.bucket = Some(bucket.to_string());
+ }
+
+ self
+ }
+
+ /// Specify the http client that used by this service.
+ ///
+ /// # Notes
+ ///
+ /// This API is part of OpenDAL's Raw API. `HttpClient` could be changed
+ /// during minor updates.
+ pub fn http_client(&mut self, client: HttpClient) -> &mut Self {
+ self.http_client = Some(client);
+ self
+ }
+}
+
+impl Builder for CosBuilder {
+ const SCHEME: Scheme = Scheme::Cos;
+ type Accessor = CosBackend;
+
+ fn from_map(map: HashMap<String, String>) -> Self {
+ let mut builder = CosBuilder::default();
+
+ map.get("root").map(|v| builder.root(v));
+ map.get("bucket").map(|v| builder.bucket(v));
+ map.get("endpoint").map(|v| builder.endpoint(v));
+ map.get("secret_id").map(|v| builder.secret_id(v));
+ map.get("secret_key").map(|v| builder.secret_key(v));
+
+ builder
+ }
+
+ fn build(&mut self) -> Result<Self::Accessor> {
+ debug!("backend build started: {:?}", &self);
+
+ let root = normalize_root(&self.root.take().unwrap_or_default());
+ debug!("backend use root {}", root);
+
+ let bucket = match &self.bucket {
+ Some(bucket) => Ok(bucket.to_string()),
+ None => Err(
+ Error::new(ErrorKind::ConfigInvalid, "The bucket is
misconfigured")
+ .with_context("service", Scheme::Cos),
+ ),
+ }?;
+ debug!("backend use bucket {}", &bucket);
+
+ let uri = match &self.endpoint {
+ Some(endpoint) => endpoint.parse::<Uri>().map_err(|err| {
+ Error::new(ErrorKind::ConfigInvalid, "endpoint is invalid")
+ .with_context("service", Scheme::Cos)
+ .with_context("endpoint", endpoint)
+ .set_source(err)
+ }),
+ None => Err(Error::new(ErrorKind::ConfigInvalid, "endpoint is
empty")
+ .with_context("service", Scheme::Cos)),
+ }?;
+
+ let scheme = match uri.scheme_str() {
+ Some(scheme) => scheme.to_string(),
+ None => "https".to_string(),
+ };
+
+ // If endpoint contains bucket name, we should trim them.
+ let endpoint = uri.host().unwrap().replace(&format!("//{bucket}."),
"//");
+ debug!("backend use endpoint {}", &endpoint);
+
+ let client = if let Some(client) = self.http_client.take() {
+ client
+ } else {
+ HttpClient::new().map_err(|err| {
+ err.with_operation("Builder::build")
+ .with_context("service", Scheme::Cos)
+ })?
+ };
+
+ let config = TencentCosConfig {
+ access_key_id: self.secret_id.take(),
+ secret_access_key: self.secret_key.take(),
+ security_token: None,
+ };
+
+ let cred_loader = TencentCosCredentialLoader::new(config);
+
+ let signer = TencentCosSigner::new();
+
+ debug!("backend build finished");
+ Ok(CosBackend {
+ core: Arc::new(CosCore {
+ bucket: bucket.clone(),
+ root,
+ endpoint: format!("{}://{}.{}", &scheme, &bucket, &endpoint),
+ signer,
+ loader: cred_loader,
+ client,
+ }),
+ })
+ }
+}
+
+/// Backend for Huaweicloud COS services.
+#[derive(Debug, Clone)]
+pub struct CosBackend {
+ core: Arc<CosCore>,
+}
+
+#[async_trait]
+impl Accessor for CosBackend {
+ type Reader = IncomingAsyncBody;
+ type BlockingReader = ();
+ type Writer = CosWriter;
+ type BlockingWriter = ();
+ type Appender = ();
+ type Pager = CosPager;
+ type BlockingPager = ();
+
+ fn info(&self) -> AccessorInfo {
+ let mut am = AccessorInfo::default();
+ am.set_scheme(Scheme::Cos)
+ .set_root(&self.core.root)
+ .set_name(&self.core.bucket)
+ .set_capability(Capability {
+ stat: true,
+ stat_with_if_match: true,
+ stat_with_if_none_match: true,
+
+ read: true,
+ read_can_next: true,
+ read_with_range: true,
+ read_with_if_match: true,
+ read_with_if_none_match: true,
+
+ write: true,
+ write_with_content_type: true,
+ write_with_cache_control: true,
+
+ delete: true,
+ create_dir: true,
+ copy: true,
+
+ list: true,
+ list_with_delimiter_slash: true,
+ list_without_delimiter: true,
+
+ presign: true,
+ presign_stat: true,
+ presign_read: true,
+ presign_write: true,
+
+ ..Default::default()
+ });
+
+ am
+ }
+
+ async fn create_dir(&self, path: &str, _: OpCreateDir) ->
Result<RpCreateDir> {
+ let mut req =
+ self.core
+ .cos_put_object_request(path, Some(0), None, None,
AsyncBody::Empty)?;
+
+ self.core.sign(&mut req).await?;
+
+ let resp = self.core.send(req).await?;
+
+ let status = resp.status();
+
+ match status {
+ StatusCode::CREATED | StatusCode::OK => {
+ resp.into_body().consume().await?;
+ Ok(RpCreateDir::default())
+ }
+ _ => Err(parse_error(resp).await?),
+ }
+ }
+
+ async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead,
Self::Reader)> {
+ let resp = self
+ .core
+ .cos_get_object(path, args.range(), args.if_match(),
args.if_none_match())
+ .await?;
+
+ let status = resp.status();
+
+ match status {
+ StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
+ let meta = parse_into_metadata(path, resp.headers())?;
+ Ok((RpRead::with_metadata(meta), resp.into_body()))
+ }
+ _ => Err(parse_error(resp).await?),
+ }
+ }
+
+ async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite,
Self::Writer)> {
+ if args.content_length().is_none() {
+ return Err(Error::new(
+ ErrorKind::Unsupported,
+ "write without content length is not supported",
+ ));
+ }
+
+ Ok((
+ RpWrite::default(),
+ CosWriter::new(self.core.clone(), args, path.to_string()),
+ ))
+ }
+
+ async fn copy(&self, from: &str, to: &str, _args: OpCopy) ->
Result<RpCopy> {
+ let resp = self.core.cos_copy_object(from, to).await?;
+
+ let status = resp.status();
+
+ match status {
+ StatusCode::OK => {
+ resp.into_body().consume().await?;
+ Ok(RpCopy::default())
+ }
+ _ => Err(parse_error(resp).await?),
+ }
+ }
+
+ async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
+ // Stat root always returns a DIR.
+ if path == "/" {
+ return Ok(RpStat::new(Metadata::new(EntryMode::DIR)));
+ }
+
+ let resp = self
+ .core
+ .cos_head_object(path, args.if_match(), args.if_none_match())
+ .await?;
+
+ let status = resp.status();
+
+ // The response is very similar to azblob.
+ match status {
+ StatusCode::OK => parse_into_metadata(path,
resp.headers()).map(RpStat::new),
+ StatusCode::NOT_FOUND if path.ends_with('/') => {
+ Ok(RpStat::new(Metadata::new(EntryMode::DIR)))
+ }
+ _ => Err(parse_error(resp).await?),
+ }
+ }
+
+ async fn delete(&self, path: &str, _: OpDelete) -> Result<RpDelete> {
+ let resp = self.core.cos_delete_object(path).await?;
+
+ let status = resp.status();
+
+ match status {
+ StatusCode::NO_CONTENT | StatusCode::ACCEPTED |
StatusCode::NOT_FOUND => {
+ Ok(RpDelete::default())
+ }
+ _ => Err(parse_error(resp).await?),
+ }
+ }
+
+ async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
+ let mut req = match args.operation() {
+ PresignOperation::Stat(v) => {
+ self.core
+ .cos_head_object_request(path, v.if_match(),
v.if_none_match())?
+ }
+ PresignOperation::Read(v) => self.core.cos_get_object_request(
+ path,
+ v.range(),
+ v.if_match(),
+ v.if_none_match(),
+ )?,
+ PresignOperation::Write(v) => self.core.cos_put_object_request(
+ path,
+ None,
+ v.content_type(),
+ v.cache_control(),
+ AsyncBody::Empty,
+ )?,
+ };
+ self.core.sign_query(&mut req, args.expire()).await?;
+
+ // We don't need this request anymore, consume it directly.
+ let (parts, _) = req.into_parts();
+
+ Ok(RpPresign::new(PresignedRequest::new(
+ parts.method,
+ parts.uri,
+ parts.headers,
+ )))
+ }
+
+ async fn list(&self, path: &str, args: OpList) -> Result<(RpList,
Self::Pager)> {
+ Ok((
+ RpList::default(),
+ CosPager::new(self.core.clone(), path, args.delimiter(),
args.limit()),
+ ))
+ }
+}
diff --git a/core/src/services/cos/core.rs b/core/src/services/cos/core.rs
new file mode 100644
index 00000000..bfcb8a2a
--- /dev/null
+++ b/core/src/services/cos/core.rs
@@ -0,0 +1,290 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::fmt::Debug;
+use std::fmt::Formatter;
+use std::time::Duration;
+
+use http::header::CACHE_CONTROL;
+use http::header::CONTENT_LENGTH;
+use http::header::CONTENT_TYPE;
+use http::header::IF_MATCH;
+use http::header::IF_NONE_MATCH;
+use http::Request;
+use http::Response;
+use reqsign::TencentCosCredential;
+use reqsign::TencentCosCredentialLoader;
+use reqsign::TencentCosSigner;
+
+use crate::raw::*;
+use crate::*;
+
+pub struct CosCore {
+ pub bucket: String,
+ pub root: String,
+ pub endpoint: String,
+
+ pub signer: TencentCosSigner,
+ pub loader: TencentCosCredentialLoader,
+ pub client: HttpClient,
+}
+
+impl Debug for CosCore {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("Backend")
+ .field("root", &self.root)
+ .field("bucket", &self.bucket)
+ .field("endpoint", &self.endpoint)
+ .finish_non_exhaustive()
+ }
+}
+
+impl CosCore {
+ async fn load_credential(&self) -> Result<Option<TencentCosCredential>> {
+ let cred = self
+ .loader
+ .load()
+ .await
+ .map_err(new_request_credential_error)?;
+
+ if let Some(cred) = cred {
+ Ok(Some(cred))
+ } else {
+ Ok(None)
+ }
+ }
+
+ pub async fn sign<T>(&self, req: &mut Request<T>) -> Result<()> {
+ let cred = if let Some(cred) = self.load_credential().await? {
+ cred
+ } else {
+ return Ok(());
+ };
+
+ self.signer.sign(req, &cred).map_err(new_request_sign_error)
+ }
+
+ pub async fn sign_query<T>(&self, req: &mut Request<T>, duration:
Duration) -> Result<()> {
+ let cred = if let Some(cred) = self.load_credential().await? {
+ cred
+ } else {
+ return Ok(());
+ };
+
+ self.signer
+ .sign_query(req, duration, &cred)
+ .map_err(new_request_sign_error)
+ }
+
+ #[inline]
+ pub async fn send(&self, req: Request<AsyncBody>) ->
Result<Response<IncomingAsyncBody>> {
+ self.client.send(req).await
+ }
+}
+
+impl CosCore {
+ pub async fn cos_get_object(
+ &self,
+ path: &str,
+ range: BytesRange,
+ if_match: Option<&str>,
+ if_none_match: Option<&str>,
+ ) -> Result<Response<IncomingAsyncBody>> {
+ let mut req = self.cos_get_object_request(path, range, if_match,
if_none_match)?;
+
+ self.sign(&mut req).await?;
+
+ self.send(req).await
+ }
+
+ pub fn cos_get_object_request(
+ &self,
+ path: &str,
+ range: BytesRange,
+ if_match: Option<&str>,
+ if_none_match: Option<&str>,
+ ) -> Result<Request<AsyncBody>> {
+ let p = build_abs_path(&self.root, path);
+
+ let url = format!("{}/{}", self.endpoint, percent_encode_path(&p));
+
+ let mut req = Request::get(&url);
+
+ if let Some(if_match) = if_match {
+ req = req.header(IF_MATCH, if_match);
+ }
+
+ if !range.is_full() {
+ req = req.header(http::header::RANGE, range.to_header())
+ }
+
+ if let Some(if_none_match) = if_none_match {
+ req = req.header(IF_NONE_MATCH, if_none_match);
+ }
+
+ let req = req
+ .body(AsyncBody::Empty)
+ .map_err(new_request_build_error)?;
+
+ Ok(req)
+ }
+
+ pub fn cos_put_object_request(
+ &self,
+ path: &str,
+ size: Option<usize>,
+ content_type: Option<&str>,
+ cache_control: Option<&str>,
+ body: AsyncBody,
+ ) -> Result<Request<AsyncBody>> {
+ let p = build_abs_path(&self.root, path);
+
+ let url = format!("{}/{}", self.endpoint, percent_encode_path(&p));
+
+ let mut req = Request::put(&url);
+
+ if let Some(size) = size {
+ req = req.header(CONTENT_LENGTH, size)
+ }
+ if let Some(cache_control) = cache_control {
+ req = req.header(CACHE_CONTROL, cache_control)
+ }
+
+ if let Some(mime) = content_type {
+ req = req.header(CONTENT_TYPE, mime)
+ }
+
+ let req = req.body(body).map_err(new_request_build_error)?;
+
+ Ok(req)
+ }
+
+ pub async fn cos_head_object(
+ &self,
+ path: &str,
+ if_match: Option<&str>,
+ if_none_match: Option<&str>,
+ ) -> Result<Response<IncomingAsyncBody>> {
+ let mut req = self.cos_head_object_request(path, if_match,
if_none_match)?;
+
+ self.sign(&mut req).await?;
+
+ self.send(req).await
+ }
+
+ pub fn cos_head_object_request(
+ &self,
+ path: &str,
+ if_match: Option<&str>,
+ if_none_match: Option<&str>,
+ ) -> Result<Request<AsyncBody>> {
+ let p = build_abs_path(&self.root, path);
+
+ let url = format!("{}/{}", self.endpoint, percent_encode_path(&p));
+
+ let mut req = Request::head(&url);
+
+ if let Some(if_match) = if_match {
+ req = req.header(IF_MATCH, if_match);
+ }
+
+ if let Some(if_none_match) = if_none_match {
+ req = req.header(IF_NONE_MATCH, if_none_match);
+ }
+
+ let req = req
+ .body(AsyncBody::Empty)
+ .map_err(new_request_build_error)?;
+
+ Ok(req)
+ }
+
+ pub async fn cos_delete_object(&self, path: &str) ->
Result<Response<IncomingAsyncBody>> {
+ let p = build_abs_path(&self.root, path);
+
+ let url = format!("{}/{}", self.endpoint, percent_encode_path(&p));
+
+ let req = Request::delete(&url);
+
+ let mut req = req
+ .body(AsyncBody::Empty)
+ .map_err(new_request_build_error)?;
+
+ self.sign(&mut req).await?;
+
+ self.send(req).await
+ }
+
+ pub async fn cos_copy_object(
+ &self,
+ from: &str,
+ to: &str,
+ ) -> Result<Response<IncomingAsyncBody>> {
+ let source = build_abs_path(&self.root, from);
+ let target = build_abs_path(&self.root, to);
+
+ let source = format!("/{}/{}", self.bucket,
percent_encode_path(&source));
+ let url = format!("{}/{}", self.endpoint,
percent_encode_path(&target));
+
+ let mut req = Request::put(&url)
+ .header("x-cos-copy-source", percent_encode_path(&source))
+ .body(AsyncBody::Empty)
+ .map_err(new_request_build_error)?;
+
+ self.sign(&mut req).await?;
+
+ self.send(req).await
+ }
+
+ pub async fn cos_list_objects(
+ &self,
+ path: &str,
+ next_marker: &str,
+ delimiter: &str,
+ limit: Option<usize>,
+ ) -> Result<Response<IncomingAsyncBody>> {
+ let p = build_abs_path(&self.root, path);
+
+ let mut queries = vec![];
+ if !path.is_empty() {
+ queries.push(format!("prefix={}", percent_encode_path(&p)));
+ }
+ if !delimiter.is_empty() {
+ queries.push(format!("delimiter={delimiter}"));
+ }
+ if let Some(limit) = limit {
+ queries.push(format!("max-keys={limit}"));
+ }
+ if !next_marker.is_empty() {
+ queries.push(format!("marker={next_marker}"));
+ }
+
+ let url = if queries.is_empty() {
+ self.endpoint.to_string()
+ } else {
+ format!("{}?{}", self.endpoint, queries.join("&"))
+ };
+
+ let mut req = Request::get(&url)
+ .body(AsyncBody::Empty)
+ .map_err(new_request_build_error)?;
+
+ self.sign(&mut req).await?;
+
+ self.send(req).await
+ }
+}
diff --git a/core/src/services/cos/error.rs b/core/src/services/cos/error.rs
new file mode 100644
index 00000000..ba451659
--- /dev/null
+++ b/core/src/services/cos/error.rs
@@ -0,0 +1,106 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use bytes::Buf;
+use http::Response;
+use http::StatusCode;
+use quick_xml::de;
+use serde::Deserialize;
+
+use crate::raw::*;
+use crate::Error;
+use crate::ErrorKind;
+use crate::Result;
+
+/// CosError is the error returned by cos service.
+#[derive(Default, Debug, Deserialize)]
+#[serde(default, rename_all = "PascalCase")]
+struct CosError {
+ code: String,
+ message: String,
+ resource: String,
+ request_id: String,
+ host_id: String,
+}
+
+/// Parse error response into Error.
+pub async fn parse_error(resp: Response<IncomingAsyncBody>) -> Result<Error> {
+ let (parts, body) = resp.into_parts();
+ let bs = body.bytes().await?;
+
+ let (kind, retryable) = match parts.status {
+ StatusCode::NOT_FOUND => (ErrorKind::NotFound, false),
+ StatusCode::FORBIDDEN => (ErrorKind::PermissionDenied, false),
+ StatusCode::PRECONDITION_FAILED | StatusCode::NOT_MODIFIED => {
+ (ErrorKind::ConditionNotMatch, false)
+ }
+ StatusCode::INTERNAL_SERVER_ERROR
+ | StatusCode::BAD_GATEWAY
+ | StatusCode::SERVICE_UNAVAILABLE
+ | StatusCode::GATEWAY_TIMEOUT => (ErrorKind::Unexpected, true),
+ // COS could return `520 Origin Error` errors which should be retried.
+ v if v.as_u16() == 520 => (ErrorKind::Unexpected, true),
+
+ _ => (ErrorKind::Unexpected, false),
+ };
+
+ let message = match de::from_reader::<_, CosError>(bs.clone().reader()) {
+ Ok(cos_error) => format!("{cos_error:?}"),
+ Err(_) => String::from_utf8_lossy(&bs).into_owned(),
+ };
+
+ let mut err = Error::new(kind, &message).with_context("response",
format!("{parts:?}"));
+
+ if retryable {
+ err = err.set_temporary();
+ }
+
+ Ok(err)
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_parse_error() {
+ let bs = bytes::Bytes::from(
+ r#"
+<?xml version="1.0" encoding="UTF-8"?>
+<Error>
+<Code>NoSuchKey</Code>
+<Message>The resource you requested does not exist</Message>
+<Resource>/example-bucket/object</Resource>
+<RequestId>001B21A61C6C0000013402C4616D5285</RequestId>
+<HostId>RkRCRDJENDc5MzdGQkQ4OUY3MTI4NTQ3NDk2Mjg0M0FBQUFBQUFBYmJiYmJiYmJD</HostId>
+</Error>
+"#,
+ );
+
+ let out: CosError = de::from_reader(bs.reader()).expect("must
success");
+ println!("{out:?}");
+
+ assert_eq!(out.code, "NoSuchKey");
+ assert_eq!(out.message, "The resource you requested does not exist");
+ assert_eq!(out.resource, "/example-bucket/object");
+ assert_eq!(out.request_id, "001B21A61C6C0000013402C4616D5285");
+ assert_eq!(
+ out.host_id,
+ "RkRCRDJENDc5MzdGQkQ4OUY3MTI4NTQ3NDk2Mjg0M0FBQUFBQUFBYmJiYmJiYmJD"
+ );
+ }
+}
diff --git a/core/src/services/cos/mod.rs b/core/src/services/cos/mod.rs
new file mode 100644
index 00000000..b51de4b7
--- /dev/null
+++ b/core/src/services/cos/mod.rs
@@ -0,0 +1,24 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+mod backend;
+pub use backend::CosBuilder as Cos;
+
+mod core;
+mod error;
+mod pager;
+mod writer;
diff --git a/core/src/services/cos/pager.rs b/core/src/services/cos/pager.rs
new file mode 100644
index 00000000..10784eb2
--- /dev/null
+++ b/core/src/services/cos/pager.rs
@@ -0,0 +1,210 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use async_trait::async_trait;
+use bytes::Buf;
+use quick_xml::de;
+use serde::Deserialize;
+
+use super::core::CosCore;
+use super::error::parse_error;
+use crate::raw::*;
+use crate::EntryMode;
+use crate::Error;
+use crate::ErrorKind;
+use crate::Metadata;
+use crate::Result;
+
+pub struct CosPager {
+ core: Arc<CosCore>,
+ path: String,
+ delimiter: String,
+ limit: Option<usize>,
+
+ next_marker: String,
+ done: bool,
+}
+
+impl CosPager {
+ pub fn new(core: Arc<CosCore>, path: &str, delimiter: &str, limit:
Option<usize>) -> Self {
+ Self {
+ core,
+ path: path.to_string(),
+ delimiter: delimiter.to_string(),
+ limit,
+
+ next_marker: "".to_string(),
+ done: false,
+ }
+ }
+}
+
+#[async_trait]
+impl oio::Page for CosPager {
+ async fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> {
+ if self.done {
+ return Ok(None);
+ }
+
+ let resp = self
+ .core
+ .cos_list_objects(&self.path, &self.next_marker, &self.delimiter,
self.limit)
+ .await?;
+
+ if resp.status() != http::StatusCode::OK {
+ return Err(parse_error(resp).await?);
+ }
+
+ let bs = resp.into_body().bytes().await?;
+
+ let output: Output = de::from_reader(bs.reader())
+ .map_err(|e| Error::new(ErrorKind::Unexpected, "deserialize
xml").set_source(e))?;
+
+ // Try our best to check whether this list is done.
+ //
+ // - Check `next_marker`
+ self.done = match output.next_marker.as_ref() {
+ None => true,
+ Some(next_marker) => next_marker.is_empty(),
+ };
+ self.next_marker = output.next_marker.clone().unwrap_or_default();
+
+ let common_prefixes = output.common_prefixes;
+ let mut entries = Vec::with_capacity(common_prefixes.len() +
output.contents.len());
+
+ for prefix in common_prefixes {
+ let de = oio::Entry::new(
+ &build_rel_path(&self.core.root, &prefix.prefix),
+ Metadata::new(EntryMode::DIR),
+ );
+
+ entries.push(de);
+ }
+
+ for object in output.contents {
+ if object.key.ends_with('/') {
+ continue;
+ }
+
+ let meta =
Metadata::new(EntryMode::FILE).with_content_length(object.size);
+
+ let de = oio::Entry::new(&build_rel_path(&self.core.root,
&object.key), meta);
+
+ entries.push(de);
+ }
+
+ Ok(Some(entries))
+ }
+}
+
+#[derive(Default, Debug, Deserialize)]
+#[serde(default, rename_all = "PascalCase")]
+struct Output {
+ name: String,
+ prefix: String,
+ contents: Vec<Content>,
+ common_prefixes: Vec<CommonPrefix>,
+ marker: String,
+ next_marker: Option<String>,
+}
+
+#[derive(Default, Debug, Deserialize)]
+#[serde(default, rename_all = "PascalCase")]
+struct CommonPrefix {
+ prefix: String,
+}
+
+#[derive(Default, Debug, Deserialize)]
+#[serde(default, rename_all = "PascalCase")]
+struct Content {
+ key: String,
+ size: u64,
+}
+
+#[cfg(test)]
+mod tests {
+ use bytes::Buf;
+
+ use super::*;
+
+ #[test]
+ fn test_parse_xml() {
+ let bs = bytes::Bytes::from(
+ r#"<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
+<ListBucketResult>
+ <Name>examplebucket</Name>
+ <Prefix>obj</Prefix>
+ <Marker>obj002</Marker>
+ <NextMarker>obj004</NextMarker>
+ <MaxKeys>1000</MaxKeys>
+ <IsTruncated>false</IsTruncated>
+ <Contents>
+ <Key>obj002</Key>
+ <LastModified>2015-07-01T02:11:19.775Z</LastModified>
+ <ETag>"a72e382246ac83e86bd203389849e71d"</ETag>
+ <Size>9</Size>
+ <Owner>
+ <ID>b4bf1b36d9ca43d984fbcb9491b6fce9</ID>
+ </Owner>
+ <StorageClass>STANDARD</StorageClass>
+ </Contents>
+ <Contents>
+ <Key>obj003</Key>
+ <LastModified>2015-07-01T02:11:19.775Z</LastModified>
+ <ETag>"a72e382246ac83e86bd203389849e71d"</ETag>
+ <Size>10</Size>
+ <Owner>
+ <ID>b4bf1b36d9ca43d984fbcb9491b6fce9</ID>
+ </Owner>
+ <StorageClass>STANDARD</StorageClass>
+ </Contents>
+ <CommonPrefixes>
+ <Prefix>hello</Prefix>
+ </CommonPrefixes>
+ <CommonPrefixes>
+ <Prefix>world</Prefix>
+ </CommonPrefixes>
+</ListBucketResult>"#,
+ );
+ let out: Output = de::from_reader(bs.reader()).expect("must success");
+
+ assert_eq!(out.name, "examplebucket".to_string());
+ assert_eq!(out.prefix, "obj".to_string());
+ assert_eq!(out.marker, "obj002".to_string());
+ assert_eq!(out.next_marker, Some("obj004".to_string()),);
+ assert_eq!(
+ out.contents
+ .iter()
+ .map(|v| v.key.clone())
+ .collect::<Vec<String>>(),
+ ["obj002", "obj003"],
+ );
+ assert_eq!(
+ out.contents.iter().map(|v| v.size).collect::<Vec<u64>>(),
+ [9, 10],
+ );
+ assert_eq!(
+ out.common_prefixes
+ .iter()
+ .map(|v| v.prefix.clone())
+ .collect::<Vec<String>>(),
+ ["hello", "world"],
+ )
+ }
+}
diff --git a/core/src/services/cos/writer.rs b/core/src/services/cos/writer.rs
new file mode 100644
index 00000000..f063002d
--- /dev/null
+++ b/core/src/services/cos/writer.rs
@@ -0,0 +1,76 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use async_trait::async_trait;
+use bytes::Bytes;
+use http::StatusCode;
+
+use super::core::CosCore;
+use super::error::parse_error;
+use crate::ops::OpWrite;
+use crate::raw::*;
+use crate::*;
+
+pub struct CosWriter {
+ core: Arc<CosCore>,
+
+ op: OpWrite,
+ path: String,
+}
+
+impl CosWriter {
+ pub fn new(core: Arc<CosCore>, op: OpWrite, path: String) -> Self {
+ CosWriter { core, op, path }
+ }
+}
+
+#[async_trait]
+impl oio::Write for CosWriter {
+ async fn write(&mut self, bs: Bytes) -> Result<()> {
+ let mut req = self.core.cos_put_object_request(
+ &self.path,
+ Some(bs.len()),
+ self.op.content_type(),
+ self.op.cache_control(),
+ AsyncBody::Bytes(bs),
+ )?;
+
+ self.core.sign(&mut req).await?;
+
+ let resp = self.core.send(req).await?;
+
+ let status = resp.status();
+
+ match status {
+ StatusCode::CREATED | StatusCode::OK => {
+ resp.into_body().consume().await?;
+ Ok(())
+ }
+ _ => Err(parse_error(resp).await?),
+ }
+ }
+
+ async fn abort(&mut self) -> Result<()> {
+ Ok(())
+ }
+
+ async fn close(&mut self) -> Result<()> {
+ Ok(())
+ }
+}
diff --git a/core/src/services/mod.rs b/core/src/services/mod.rs
index 0ce43b03..8dd850d4 100644
--- a/core/src/services/mod.rs
+++ b/core/src/services/mod.rs
@@ -29,6 +29,11 @@ mod azdfs;
#[cfg(feature = "services-azdfs")]
pub use azdfs::Azdfs;
+#[cfg(feature = "services-cos")]
+mod cos;
+#[cfg(feature = "services-cos")]
+pub use cos::Cos;
+
#[cfg(feature = "services-dashmap")]
mod dashmap;
#[cfg(feature = "services-dashmap")]
diff --git a/core/src/types/scheme.rs b/core/src/types/scheme.rs
index 27c4fefa..edc1dde1 100644
--- a/core/src/types/scheme.rs
+++ b/core/src/types/scheme.rs
@@ -35,6 +35,8 @@ pub enum Scheme {
Azblob,
/// [azdfs][crate::services::Azdfs]: Azure Data Lake Storage Gen2.
Azdfs,
+ /// [cos][crate::services::Cos]: Tencent Cloud Object Storage services.
+ Cos,
/// [dashmap][crate::services::Dashmap]: dashmap backend support.
Dashmap,
/// [fs][crate::services::Fs]: POSIX alike file system.
@@ -123,6 +125,7 @@ impl FromStr for Scheme {
match s.as_str() {
"azblob" => Ok(Scheme::Azblob),
"azdfs" => Ok(Scheme::Azdfs),
+ "cos" => Ok(Scheme::Cos),
"dashmap" => Ok(Scheme::Dashmap),
"fs" => Ok(Scheme::Fs),
"gcs" => Ok(Scheme::Gcs),
@@ -155,6 +158,7 @@ impl From<Scheme> for &'static str {
match v {
Scheme::Azblob => "azblob",
Scheme::Azdfs => "azdfs",
+ Scheme::Cos => "cos",
Scheme::Dashmap => "dashmap",
Scheme::Fs => "fs",
Scheme::Gcs => "gcs",
diff --git a/core/tests/behavior/main.rs b/core/tests/behavior/main.rs
index fd477f05..6e190c44 100644
--- a/core/tests/behavior/main.rs
+++ b/core/tests/behavior/main.rs
@@ -95,6 +95,8 @@ macro_rules! behavior_tests {
behavior_tests!(Azblob);
#[cfg(feature = "services-azdfs")]
behavior_tests!(Azdfs);
+#[cfg(feature = "services-cos")]
+behavior_tests!(Cos);
#[cfg(feature = "services-dashmap")]
behavior_tests!(Dashmap);
#[cfg(feature = "services-fs")]