This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git
The following commit(s) were added to refs/heads/main by this push:
new dc32a6acf feat(core): Implement list with deleted and versions for cos
(#5514)
dc32a6acf is described below
commit dc32a6acfcbfce63c2cb2541bdfb9648523e4958
Author: hoslo <[email protected]>
AuthorDate: Wed Jan 8 17:36:22 2025 +0800
feat(core): Implement list with deleted and versions for cos (#5514)
Co-authored-by: Xuanwo <[email protected]>
---
.github/services/cos/cos/action.yml | 7 +-
core/src/services/cos/backend.rs | 52 ++++++++++++--
core/src/services/cos/config.rs | 2 +
core/src/services/cos/core.rs | 133 ++++++++++++++++++++++++++++++++--
core/src/services/cos/delete.rs | 4 +-
core/src/services/cos/lister.rs | 140 ++++++++++++++++++++++++++++++++++++
6 files changed, 324 insertions(+), 14 deletions(-)
diff --git a/.github/services/cos/cos/action.yml
b/.github/services/cos/cos/action.yml
index 6dc23ede7..c310a6d14 100644
--- a/.github/services/cos/cos/action.yml
+++ b/.github/services/cos/cos/action.yml
@@ -16,7 +16,7 @@
# under the License.
name: cos
-description: 'Behavior test for COS. This service is sponsored by
@datafuse_labs.'
+description: "Behavior test for COS. This service is sponsored by
@datafuse_labs."
runs:
using: "composite"
@@ -30,3 +30,8 @@ runs:
OPENDAL_COS_ENDPOINT: op://services/cos/endpoint
OPENDAL_COS_SECRET_ID: op://services/cos/secret_id
OPENDAL_COS_SECRET_KEY: op://services/cos/secret_key
+
+ - name: Add extra settings
+ shell: bash
+ run: |
+ echo "OPENDAL_COS_ENABLE_VERSIONING=true" >> $GITHUB_ENV
diff --git a/core/src/services/cos/backend.rs b/core/src/services/cos/backend.rs
index 86d7e342c..4bbb658c8 100644
--- a/core/src/services/cos/backend.rs
+++ b/core/src/services/cos/backend.rs
@@ -29,9 +29,10 @@ use reqsign::TencentCosSigner;
use super::core::*;
use super::delete::CosDeleter;
use super::error::parse_error;
-use super::lister::CosLister;
+use super::lister::{CosLister, CosListers, CosObjectVersionsLister};
use super::writer::CosWriter;
use super::writer::CosWriters;
+use crate::raw::oio::PageLister;
use crate::raw::*;
use crate::services::CosConfig;
use crate::*;
@@ -123,6 +124,13 @@ impl CosBuilder {
self
}
+ /// Set bucket versioning status for this backend
+ pub fn enable_versioning(mut self, enabled: bool) -> Self {
+ self.config.enable_versioning = enabled;
+
+ self
+ }
+
/// Disable config load so that opendal will not load config from
/// environment.
///
@@ -215,6 +223,7 @@ impl Builder for CosBuilder {
bucket: bucket.clone(),
root,
endpoint: format!("{}://{}.{}", &scheme, &bucket, &endpoint),
+ enable_versioning: self.config.enable_versioning,
signer,
loader: cred_loader,
client,
@@ -232,7 +241,7 @@ pub struct CosBackend {
impl Access for CosBackend {
type Reader = HttpBody;
type Writer = CosWriters;
- type Lister = oio::PageLister<CosLister>;
+ type Lister = CosListers;
type Deleter = oio::OneShotDeleter<CosDeleter>;
type BlockingReader = ();
type BlockingWriter = ();
@@ -253,15 +262,18 @@ impl Access for CosBackend {
stat_has_content_type: true,
stat_has_content_encoding: true,
stat_has_content_range: true,
+ stat_with_version: self.core.enable_versioning,
stat_has_etag: true,
stat_has_content_md5: true,
stat_has_last_modified: true,
stat_has_content_disposition: true,
+ stat_has_version: true,
read: true,
read_with_if_match: true,
read_with_if_none_match: true,
+ read_with_version: self.core.enable_versioning,
write: true,
write_can_empty: true,
@@ -270,8 +282,8 @@ impl Access for CosBackend {
write_with_content_type: true,
write_with_cache_control: true,
write_with_content_disposition: true,
- // TODO: set this to false while version has been enabled.
- write_with_if_not_exists: true,
+ // Cos doesn't support forbid overwrite while version has been
enabled.
+ write_with_if_not_exists: !self.core.enable_versioning,
// The min multipart size of COS is 1 MiB.
//
// ref:
<https://www.tencentcloud.com/document/product/436/14112>
@@ -286,10 +298,13 @@ impl Access for CosBackend {
},
delete: true,
+ delete_with_version: self.core.enable_versioning,
copy: true,
list: true,
list_with_recursive: true,
+ list_with_versions: self.core.enable_versioning,
+ list_with_deleted: self.core.enable_versioning,
list_has_content_length: true,
presign: true,
@@ -311,7 +326,16 @@ impl Access for CosBackend {
let status = resp.status();
match status {
- StatusCode::OK => parse_into_metadata(path,
resp.headers()).map(RpStat::new),
+ StatusCode::OK => {
+ let headers = resp.headers();
+ let mut meta = parse_into_metadata(path, headers)?;
+
+ if let Some(v) = parse_header_to_str(headers,
"x-cos-version-id")? {
+ meta.set_version(v);
+ }
+
+ Ok(RpStat::new(meta))
+ }
_ => Err(parse_error(resp)),
}
}
@@ -357,8 +381,22 @@ impl Access for CosBackend {
}
async fn list(&self, path: &str, args: OpList) -> Result<(RpList,
Self::Lister)> {
- let l = CosLister::new(self.core.clone(), path, args.recursive(),
args.limit());
- Ok((RpList::default(), oio::PageLister::new(l)))
+ let l = if args.versions() || args.deleted() {
+ TwoWays::Two(PageLister::new(CosObjectVersionsLister::new(
+ self.core.clone(),
+ path,
+ args,
+ )))
+ } else {
+ TwoWays::One(PageLister::new(CosLister::new(
+ self.core.clone(),
+ path,
+ args.recursive(),
+ args.limit(),
+ )))
+ };
+
+ Ok((RpList::default(), l))
}
async fn copy(&self, from: &str, to: &str, _args: OpCopy) ->
Result<RpCopy> {
diff --git a/core/src/services/cos/config.rs b/core/src/services/cos/config.rs
index 8c73f00ca..87436a84b 100644
--- a/core/src/services/cos/config.rs
+++ b/core/src/services/cos/config.rs
@@ -35,6 +35,8 @@ pub struct CosConfig {
pub secret_key: Option<String>,
/// Bucket of this backend.
pub bucket: Option<String>,
+ /// is bucket versioning enabled for this bucket
+ pub enable_versioning: bool,
/// Disable config load so that opendal will not load config from
pub disable_config_load: bool,
}
diff --git a/core/src/services/cos/core.rs b/core/src/services/cos/core.rs
index 4d28b2688..020b57bab 100644
--- a/core/src/services/cos/core.rs
+++ b/core/src/services/cos/core.rs
@@ -17,6 +17,7 @@
use std::fmt::Debug;
use std::fmt::Formatter;
+use std::fmt::Write;
use std::time::Duration;
use bytes::Bytes;
@@ -37,10 +38,15 @@ use serde::Serialize;
use crate::raw::*;
use crate::*;
+pub mod constants {
+ pub const COS_QUERY_VERSION_ID: &str = "versionId";
+}
+
pub struct CosCore {
pub bucket: String,
pub root: String,
pub endpoint: String,
+ pub enable_versioning: bool,
pub signer: TencentCosSigner,
pub loader: TencentCosCredentialLoader,
@@ -125,7 +131,19 @@ impl CosCore {
) -> Result<Request<Buffer>> {
let p = build_abs_path(&self.root, path);
- let url = format!("{}/{}", self.endpoint, percent_encode_path(&p));
+ let mut url = format!("{}/{}", self.endpoint, percent_encode_path(&p));
+
+ let mut query_args = Vec::new();
+ if let Some(version) = args.version() {
+ query_args.push(format!(
+ "{}={}",
+ constants::COS_QUERY_VERSION_ID,
+ percent_decode_path(version)
+ ))
+ }
+ if !query_args.is_empty() {
+ url.push_str(&format!("?{}", query_args.join("&")));
+ }
let mut req = Request::get(&url);
@@ -200,7 +218,19 @@ impl CosCore {
pub fn cos_head_object_request(&self, path: &str, args: &OpStat) ->
Result<Request<Buffer>> {
let p = build_abs_path(&self.root, path);
- let url = format!("{}/{}", self.endpoint, percent_encode_path(&p));
+ let mut url = format!("{}/{}", self.endpoint, percent_encode_path(&p));
+
+ let mut query_args = Vec::new();
+ if let Some(version) = args.version() {
+ query_args.push(format!(
+ "{}={}",
+ constants::COS_QUERY_VERSION_ID,
+ percent_decode_path(version)
+ ))
+ }
+ if !query_args.is_empty() {
+ url.push_str(&format!("?{}", query_args.join("&")));
+ }
let mut req = Request::head(&url);
@@ -217,10 +247,22 @@ impl CosCore {
Ok(req)
}
- pub async fn cos_delete_object(&self, path: &str) ->
Result<Response<Buffer>> {
+ pub async fn cos_delete_object(&self, path: &str, args: &OpDelete) ->
Result<Response<Buffer>> {
let p = build_abs_path(&self.root, path);
- let url = format!("{}/{}", self.endpoint, percent_encode_path(&p));
+ let mut url = format!("{}/{}", self.endpoint, percent_encode_path(&p));
+
+ let mut query_args = Vec::new();
+ if let Some(version) = args.version() {
+ query_args.push(format!(
+ "{}={}",
+ constants::COS_QUERY_VERSION_ID,
+ percent_decode_path(version)
+ ))
+ }
+ if !query_args.is_empty() {
+ url.push_str(&format!("?{}", query_args.join("&")));
+ }
let req = Request::delete(&url);
@@ -434,6 +476,50 @@ impl CosCore {
self.sign(&mut req).await?;
self.send(req).await
}
+
+ pub async fn cos_list_object_versions(
+ &self,
+ prefix: &str,
+ delimiter: &str,
+ limit: Option<usize>,
+ key_marker: &str,
+ version_id_marker: &str,
+ ) -> Result<Response<Buffer>> {
+ let p = build_abs_path(&self.root, prefix);
+
+ let mut url = format!("{}?versions", self.endpoint);
+ if !p.is_empty() {
+ write!(url, "&prefix={}", percent_encode_path(p.as_str()))
+ .expect("write into string must succeed");
+ }
+ if !delimiter.is_empty() {
+ write!(url, "&delimiter={}", delimiter).expect("write into string
must succeed");
+ }
+
+ if let Some(limit) = limit {
+ write!(url, "&max-keys={}", limit).expect("write into string must
succeed");
+ }
+ if !key_marker.is_empty() {
+ write!(url, "&key-marker={}", percent_encode_path(key_marker))
+ .expect("write into string must succeed");
+ }
+ if !version_id_marker.is_empty() {
+ write!(
+ url,
+ "&version-id-marker={}",
+ percent_encode_path(version_id_marker)
+ )
+ .expect("write into string must succeed");
+ }
+
+ let mut req = Request::get(&url)
+ .body(Buffer::new())
+ .map_err(new_request_build_error)?;
+
+ self.sign(&mut req).await?;
+
+ self.send(req).await
+ }
}
/// Result of CreateMultipartUpload
@@ -511,6 +597,45 @@ pub struct ListObjectsOutputContent {
pub size: u64,
}
+#[derive(Default, Debug, Eq, PartialEq, Deserialize)]
+#[serde(rename_all = "PascalCase")]
+pub struct OutputCommonPrefix {
+ pub prefix: String,
+}
+
+/// Output of ListObjectVersions
+#[derive(Default, Debug, Deserialize)]
+#[serde(default, rename_all = "PascalCase")]
+pub struct ListObjectVersionsOutput {
+ pub is_truncated: Option<bool>,
+ pub next_key_marker: Option<String>,
+ pub next_version_id_marker: Option<String>,
+ pub common_prefixes: Vec<OutputCommonPrefix>,
+ pub version: Vec<ListObjectVersionsOutputVersion>,
+ pub delete_marker: Vec<ListObjectVersionsOutputDeleteMarker>,
+}
+
+#[derive(Default, Debug, Eq, PartialEq, Deserialize)]
+#[serde(rename_all = "PascalCase")]
+pub struct ListObjectVersionsOutputVersion {
+ pub key: String,
+ pub version_id: String,
+ pub is_latest: bool,
+ pub size: u64,
+ pub last_modified: String,
+ #[serde(rename = "ETag")]
+ pub etag: Option<String>,
+}
+
+#[derive(Default, Debug, Eq, PartialEq, Deserialize)]
+#[serde(rename_all = "PascalCase")]
+pub struct ListObjectVersionsOutputDeleteMarker {
+ pub key: String,
+ pub version_id: String,
+ pub is_latest: bool,
+ pub last_modified: String,
+}
+
#[cfg(test)]
mod tests {
use bytes::Buf;
diff --git a/core/src/services/cos/delete.rs b/core/src/services/cos/delete.rs
index 17319ba91..bd29f8521 100644
--- a/core/src/services/cos/delete.rs
+++ b/core/src/services/cos/delete.rs
@@ -33,8 +33,8 @@ impl CosDeleter {
}
impl oio::OneShotDelete for CosDeleter {
- async fn delete_once(&self, path: String, _: OpDelete) -> Result<()> {
- let resp = self.core.cos_delete_object(&path).await?;
+ async fn delete_once(&self, path: String, args: OpDelete) -> Result<()> {
+ let resp = self.core.cos_delete_object(&path, &args).await?;
let status = resp.status();
diff --git a/core/src/services/cos/lister.rs b/core/src/services/cos/lister.rs
index 880544425..465bf97bc 100644
--- a/core/src/services/cos/lister.rs
+++ b/core/src/services/cos/lister.rs
@@ -22,11 +22,15 @@ use quick_xml::de;
use super::core::*;
use super::error::parse_error;
+use crate::raw::oio::PageContext;
use crate::raw::*;
use crate::EntryMode;
+use crate::Error;
use crate::Metadata;
use crate::Result;
+pub type CosListers = TwoWays<oio::PageLister<CosLister>,
oio::PageLister<CosObjectVersionsLister>>;
+
pub struct CosLister {
core: Arc<CosCore>,
path: String,
@@ -95,3 +99,139 @@ impl oio::PageList for CosLister {
Ok(())
}
}
+
+/// refer: https://cloud.tencent.com/document/product/436/35521
+pub struct CosObjectVersionsLister {
+ core: Arc<CosCore>,
+
+ prefix: String,
+ args: OpList,
+
+ delimiter: &'static str,
+ abs_start_after: Option<String>,
+}
+
+impl CosObjectVersionsLister {
+ pub fn new(core: Arc<CosCore>, path: &str, args: OpList) -> Self {
+ let delimiter = if args.recursive() { "" } else { "/" };
+ let abs_start_after = args
+ .start_after()
+ .map(|start_after| build_abs_path(&core.root, start_after));
+
+ Self {
+ core,
+ prefix: path.to_string(),
+ args,
+ delimiter,
+ abs_start_after,
+ }
+ }
+}
+
+impl oio::PageList for CosObjectVersionsLister {
+ async fn next_page(&self, ctx: &mut PageContext) -> Result<()> {
+ let markers = ctx.token.rsplit_once(" ");
+ let (key_marker, version_id_marker) = if let Some(data) = markers {
+ data
+ } else if let Some(start_after) = &self.abs_start_after {
+ (start_after.as_str(), "")
+ } else {
+ ("", "")
+ };
+
+ let resp = self
+ .core
+ .cos_list_object_versions(
+ &self.prefix,
+ self.delimiter,
+ self.args.limit(),
+ key_marker,
+ version_id_marker,
+ )
+ .await?;
+ if resp.status() != http::StatusCode::OK {
+ return Err(parse_error(resp));
+ }
+
+ let body = resp.into_body();
+ let output: ListObjectVersionsOutput = de::from_reader(body.reader())
+ .map_err(new_xml_deserialize_error)
+ // Allow Cos list to retry on XML deserialization errors.
+ //
+ // This is because the Cos list API may return incomplete XML data
under high load.
+ // We are confident that our XML decoding logic is correct. When
this error occurs,
+ // we allow retries to obtain the correct data.
+ .map_err(Error::set_temporary)?;
+
+ ctx.done = if let Some(is_truncated) = output.is_truncated {
+ !is_truncated
+ } else {
+ false
+ };
+ ctx.token = format!(
+ "{} {}",
+ output.next_key_marker.unwrap_or_default(),
+ output.next_version_id_marker.unwrap_or_default()
+ );
+
+ for prefix in output.common_prefixes {
+ let de = oio::Entry::new(
+ &build_rel_path(&self.core.root, &prefix.prefix),
+ Metadata::new(EntryMode::DIR),
+ );
+ ctx.entries.push_back(de);
+ }
+
+ for version_object in output.version {
+ // `list` must be additive, so we need to include the latest
version object
+ // even if `versions` is not enabled.
+ //
+ // Here we skip all non-latest version objects if `versions` is
not enabled.
+ if !(self.args.versions() || version_object.is_latest) {
+ continue;
+ }
+
+ let mut path = build_rel_path(&self.core.root,
&version_object.key);
+ if path.is_empty() {
+ path = "/".to_owned();
+ }
+
+ let mut meta = Metadata::new(EntryMode::from_path(&path));
+ meta.set_version(&version_object.version_id);
+ meta.set_is_current(version_object.is_latest);
+ meta.set_content_length(version_object.size);
+ meta.set_last_modified(parse_datetime_from_rfc3339(
+ version_object.last_modified.as_str(),
+ )?);
+ if let Some(etag) = version_object.etag {
+ meta.set_etag(&etag);
+ meta.set_content_md5(etag.trim_matches('"'));
+ }
+
+ let entry = oio::Entry::new(&path, meta);
+ ctx.entries.push_back(entry);
+ }
+
+ if self.args.deleted() {
+ for delete_marker in output.delete_marker {
+ let mut path = build_rel_path(&self.core.root,
&delete_marker.key);
+ if path.is_empty() {
+ path = "/".to_owned();
+ }
+
+ let mut meta = Metadata::new(EntryMode::FILE);
+ meta.set_version(&delete_marker.version_id);
+ meta.set_is_deleted(true);
+ meta.set_is_current(delete_marker.is_latest);
+ meta.set_last_modified(parse_datetime_from_rfc3339(
+ delete_marker.last_modified.as_str(),
+ )?);
+
+ let entry = oio::Entry::new(&path, meta);
+ ctx.entries.push_back(entry);
+ }
+ }
+
+ Ok(())
+ }
+}