This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new dc32a6acf feat(core): Implement list with deleted and versions for cos 
(#5514)
dc32a6acf is described below

commit dc32a6acfcbfce63c2cb2541bdfb9648523e4958
Author: hoslo <[email protected]>
AuthorDate: Wed Jan 8 17:36:22 2025 +0800

    feat(core): Implement list with deleted and versions for cos (#5514)
    
    Co-authored-by: Xuanwo <[email protected]>
---
 .github/services/cos/cos/action.yml |   7 +-
 core/src/services/cos/backend.rs    |  52 ++++++++++++--
 core/src/services/cos/config.rs     |   2 +
 core/src/services/cos/core.rs       | 133 ++++++++++++++++++++++++++++++++--
 core/src/services/cos/delete.rs     |   4 +-
 core/src/services/cos/lister.rs     | 140 ++++++++++++++++++++++++++++++++++++
 6 files changed, 324 insertions(+), 14 deletions(-)

diff --git a/.github/services/cos/cos/action.yml 
b/.github/services/cos/cos/action.yml
index 6dc23ede7..c310a6d14 100644
--- a/.github/services/cos/cos/action.yml
+++ b/.github/services/cos/cos/action.yml
@@ -16,7 +16,7 @@
 # under the License.
 
 name: cos
-description: 'Behavior test for COS. This service is sponsored by 
@datafuse_labs.'
+description: "Behavior test for COS. This service is sponsored by 
@datafuse_labs."
 
 runs:
   using: "composite"
@@ -30,3 +30,8 @@ runs:
         OPENDAL_COS_ENDPOINT: op://services/cos/endpoint
         OPENDAL_COS_SECRET_ID: op://services/cos/secret_id
         OPENDAL_COS_SECRET_KEY: op://services/cos/secret_key
+
+    - name: Add extra settings
+      shell: bash
+      run: |
+        echo "OPENDAL_COS_ENABLE_VERSIONING=true" >> $GITHUB_ENV
diff --git a/core/src/services/cos/backend.rs b/core/src/services/cos/backend.rs
index 86d7e342c..4bbb658c8 100644
--- a/core/src/services/cos/backend.rs
+++ b/core/src/services/cos/backend.rs
@@ -29,9 +29,10 @@ use reqsign::TencentCosSigner;
 use super::core::*;
 use super::delete::CosDeleter;
 use super::error::parse_error;
-use super::lister::CosLister;
+use super::lister::{CosLister, CosListers, CosObjectVersionsLister};
 use super::writer::CosWriter;
 use super::writer::CosWriters;
+use crate::raw::oio::PageLister;
 use crate::raw::*;
 use crate::services::CosConfig;
 use crate::*;
@@ -123,6 +124,13 @@ impl CosBuilder {
         self
     }
 
+    /// Set bucket versioning status for this backend
+    pub fn enable_versioning(mut self, enabled: bool) -> Self {
+        self.config.enable_versioning = enabled;
+
+        self
+    }
+
     /// Disable config load so that opendal will not load config from
     /// environment.
     ///
@@ -215,6 +223,7 @@ impl Builder for CosBuilder {
                 bucket: bucket.clone(),
                 root,
                 endpoint: format!("{}://{}.{}", &scheme, &bucket, &endpoint),
+                enable_versioning: self.config.enable_versioning,
                 signer,
                 loader: cred_loader,
                 client,
@@ -232,7 +241,7 @@ pub struct CosBackend {
 impl Access for CosBackend {
     type Reader = HttpBody;
     type Writer = CosWriters;
-    type Lister = oio::PageLister<CosLister>;
+    type Lister = CosListers;
     type Deleter = oio::OneShotDeleter<CosDeleter>;
     type BlockingReader = ();
     type BlockingWriter = ();
@@ -253,15 +262,18 @@ impl Access for CosBackend {
                 stat_has_content_type: true,
                 stat_has_content_encoding: true,
                 stat_has_content_range: true,
+                stat_with_version: self.core.enable_versioning,
                 stat_has_etag: true,
                 stat_has_content_md5: true,
                 stat_has_last_modified: true,
                 stat_has_content_disposition: true,
+                stat_has_version: true,
 
                 read: true,
 
                 read_with_if_match: true,
                 read_with_if_none_match: true,
+                read_with_version: self.core.enable_versioning,
 
                 write: true,
                 write_can_empty: true,
@@ -270,8 +282,8 @@ impl Access for CosBackend {
                 write_with_content_type: true,
                 write_with_cache_control: true,
                 write_with_content_disposition: true,
-                // TODO: set this to false while version has been enabled.
-                write_with_if_not_exists: true,
+                // Cos doesn't support forbid overwrite while version has been 
enabled.
+                write_with_if_not_exists: !self.core.enable_versioning,
                 // The min multipart size of COS is 1 MiB.
                 //
                 // ref: 
<https://www.tencentcloud.com/document/product/436/14112>
@@ -286,10 +298,13 @@ impl Access for CosBackend {
                 },
 
                 delete: true,
+                delete_with_version: self.core.enable_versioning,
                 copy: true,
 
                 list: true,
                 list_with_recursive: true,
+                list_with_versions: self.core.enable_versioning,
+                list_with_deleted: self.core.enable_versioning,
                 list_has_content_length: true,
 
                 presign: true,
@@ -311,7 +326,16 @@ impl Access for CosBackend {
         let status = resp.status();
 
         match status {
-            StatusCode::OK => parse_into_metadata(path, 
resp.headers()).map(RpStat::new),
+            StatusCode::OK => {
+                let headers = resp.headers();
+                let mut meta = parse_into_metadata(path, headers)?;
+
+                if let Some(v) = parse_header_to_str(headers, 
"x-cos-version-id")? {
+                    meta.set_version(v);
+                }
+
+                Ok(RpStat::new(meta))
+            }
             _ => Err(parse_error(resp)),
         }
     }
@@ -357,8 +381,22 @@ impl Access for CosBackend {
     }
 
     async fn list(&self, path: &str, args: OpList) -> Result<(RpList, 
Self::Lister)> {
-        let l = CosLister::new(self.core.clone(), path, args.recursive(), 
args.limit());
-        Ok((RpList::default(), oio::PageLister::new(l)))
+        let l = if args.versions() || args.deleted() {
+            TwoWays::Two(PageLister::new(CosObjectVersionsLister::new(
+                self.core.clone(),
+                path,
+                args,
+            )))
+        } else {
+            TwoWays::One(PageLister::new(CosLister::new(
+                self.core.clone(),
+                path,
+                args.recursive(),
+                args.limit(),
+            )))
+        };
+
+        Ok((RpList::default(), l))
     }
 
     async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> 
Result<RpCopy> {
diff --git a/core/src/services/cos/config.rs b/core/src/services/cos/config.rs
index 8c73f00ca..87436a84b 100644
--- a/core/src/services/cos/config.rs
+++ b/core/src/services/cos/config.rs
@@ -35,6 +35,8 @@ pub struct CosConfig {
     pub secret_key: Option<String>,
     /// Bucket of this backend.
     pub bucket: Option<String>,
+    /// is bucket versioning enabled for this bucket
+    pub enable_versioning: bool,
     /// Disable config load so that opendal will not load config from
     pub disable_config_load: bool,
 }
diff --git a/core/src/services/cos/core.rs b/core/src/services/cos/core.rs
index 4d28b2688..020b57bab 100644
--- a/core/src/services/cos/core.rs
+++ b/core/src/services/cos/core.rs
@@ -17,6 +17,7 @@
 
 use std::fmt::Debug;
 use std::fmt::Formatter;
+use std::fmt::Write;
 use std::time::Duration;
 
 use bytes::Bytes;
@@ -37,10 +38,15 @@ use serde::Serialize;
 use crate::raw::*;
 use crate::*;
 
+pub mod constants {
+    pub const COS_QUERY_VERSION_ID: &str = "versionId";
+}
+
 pub struct CosCore {
     pub bucket: String,
     pub root: String,
     pub endpoint: String,
+    pub enable_versioning: bool,
 
     pub signer: TencentCosSigner,
     pub loader: TencentCosCredentialLoader,
@@ -125,7 +131,19 @@ impl CosCore {
     ) -> Result<Request<Buffer>> {
         let p = build_abs_path(&self.root, path);
 
-        let url = format!("{}/{}", self.endpoint, percent_encode_path(&p));
+        let mut url = format!("{}/{}", self.endpoint, percent_encode_path(&p));
+
+        let mut query_args = Vec::new();
+        if let Some(version) = args.version() {
+            query_args.push(format!(
+                "{}={}",
+                constants::COS_QUERY_VERSION_ID,
+                percent_decode_path(version)
+            ))
+        }
+        if !query_args.is_empty() {
+            url.push_str(&format!("?{}", query_args.join("&")));
+        }
 
         let mut req = Request::get(&url);
 
@@ -200,7 +218,19 @@ impl CosCore {
     pub fn cos_head_object_request(&self, path: &str, args: &OpStat) -> 
Result<Request<Buffer>> {
         let p = build_abs_path(&self.root, path);
 
-        let url = format!("{}/{}", self.endpoint, percent_encode_path(&p));
+        let mut url = format!("{}/{}", self.endpoint, percent_encode_path(&p));
+
+        let mut query_args = Vec::new();
+        if let Some(version) = args.version() {
+            query_args.push(format!(
+                "{}={}",
+                constants::COS_QUERY_VERSION_ID,
+                percent_decode_path(version)
+            ))
+        }
+        if !query_args.is_empty() {
+            url.push_str(&format!("?{}", query_args.join("&")));
+        }
 
         let mut req = Request::head(&url);
 
@@ -217,10 +247,22 @@ impl CosCore {
         Ok(req)
     }
 
-    pub async fn cos_delete_object(&self, path: &str) -> 
Result<Response<Buffer>> {
+    pub async fn cos_delete_object(&self, path: &str, args: &OpDelete) -> 
Result<Response<Buffer>> {
         let p = build_abs_path(&self.root, path);
 
-        let url = format!("{}/{}", self.endpoint, percent_encode_path(&p));
+        let mut url = format!("{}/{}", self.endpoint, percent_encode_path(&p));
+
+        let mut query_args = Vec::new();
+        if let Some(version) = args.version() {
+            query_args.push(format!(
+                "{}={}",
+                constants::COS_QUERY_VERSION_ID,
+                percent_decode_path(version)
+            ))
+        }
+        if !query_args.is_empty() {
+            url.push_str(&format!("?{}", query_args.join("&")));
+        }
 
         let req = Request::delete(&url);
 
@@ -434,6 +476,50 @@ impl CosCore {
         self.sign(&mut req).await?;
         self.send(req).await
     }
+
+    pub async fn cos_list_object_versions(
+        &self,
+        prefix: &str,
+        delimiter: &str,
+        limit: Option<usize>,
+        key_marker: &str,
+        version_id_marker: &str,
+    ) -> Result<Response<Buffer>> {
+        let p = build_abs_path(&self.root, prefix);
+
+        let mut url = format!("{}?versions", self.endpoint);
+        if !p.is_empty() {
+            write!(url, "&prefix={}", percent_encode_path(p.as_str()))
+                .expect("write into string must succeed");
+        }
+        if !delimiter.is_empty() {
+            write!(url, "&delimiter={}", delimiter).expect("write into string 
must succeed");
+        }
+
+        if let Some(limit) = limit {
+            write!(url, "&max-keys={}", limit).expect("write into string must 
succeed");
+        }
+        if !key_marker.is_empty() {
+            write!(url, "&key-marker={}", percent_encode_path(key_marker))
+                .expect("write into string must succeed");
+        }
+        if !version_id_marker.is_empty() {
+            write!(
+                url,
+                "&version-id-marker={}",
+                percent_encode_path(version_id_marker)
+            )
+            .expect("write into string must succeed");
+        }
+
+        let mut req = Request::get(&url)
+            .body(Buffer::new())
+            .map_err(new_request_build_error)?;
+
+        self.sign(&mut req).await?;
+
+        self.send(req).await
+    }
 }
 
 /// Result of CreateMultipartUpload
@@ -511,6 +597,45 @@ pub struct ListObjectsOutputContent {
     pub size: u64,
 }
 
+#[derive(Default, Debug, Eq, PartialEq, Deserialize)]
+#[serde(rename_all = "PascalCase")]
+pub struct OutputCommonPrefix {
+    pub prefix: String,
+}
+
+/// Output of ListObjectVersions
+#[derive(Default, Debug, Deserialize)]
+#[serde(default, rename_all = "PascalCase")]
+pub struct ListObjectVersionsOutput {
+    pub is_truncated: Option<bool>,
+    pub next_key_marker: Option<String>,
+    pub next_version_id_marker: Option<String>,
+    pub common_prefixes: Vec<OutputCommonPrefix>,
+    pub version: Vec<ListObjectVersionsOutputVersion>,
+    pub delete_marker: Vec<ListObjectVersionsOutputDeleteMarker>,
+}
+
+#[derive(Default, Debug, Eq, PartialEq, Deserialize)]
+#[serde(rename_all = "PascalCase")]
+pub struct ListObjectVersionsOutputVersion {
+    pub key: String,
+    pub version_id: String,
+    pub is_latest: bool,
+    pub size: u64,
+    pub last_modified: String,
+    #[serde(rename = "ETag")]
+    pub etag: Option<String>,
+}
+
+#[derive(Default, Debug, Eq, PartialEq, Deserialize)]
+#[serde(rename_all = "PascalCase")]
+pub struct ListObjectVersionsOutputDeleteMarker {
+    pub key: String,
+    pub version_id: String,
+    pub is_latest: bool,
+    pub last_modified: String,
+}
+
 #[cfg(test)]
 mod tests {
     use bytes::Buf;
diff --git a/core/src/services/cos/delete.rs b/core/src/services/cos/delete.rs
index 17319ba91..bd29f8521 100644
--- a/core/src/services/cos/delete.rs
+++ b/core/src/services/cos/delete.rs
@@ -33,8 +33,8 @@ impl CosDeleter {
 }
 
 impl oio::OneShotDelete for CosDeleter {
-    async fn delete_once(&self, path: String, _: OpDelete) -> Result<()> {
-        let resp = self.core.cos_delete_object(&path).await?;
+    async fn delete_once(&self, path: String, args: OpDelete) -> Result<()> {
+        let resp = self.core.cos_delete_object(&path, &args).await?;
 
         let status = resp.status();
 
diff --git a/core/src/services/cos/lister.rs b/core/src/services/cos/lister.rs
index 880544425..465bf97bc 100644
--- a/core/src/services/cos/lister.rs
+++ b/core/src/services/cos/lister.rs
@@ -22,11 +22,15 @@ use quick_xml::de;
 
 use super::core::*;
 use super::error::parse_error;
+use crate::raw::oio::PageContext;
 use crate::raw::*;
 use crate::EntryMode;
+use crate::Error;
 use crate::Metadata;
 use crate::Result;
 
+pub type CosListers = TwoWays<oio::PageLister<CosLister>, 
oio::PageLister<CosObjectVersionsLister>>;
+
 pub struct CosLister {
     core: Arc<CosCore>,
     path: String,
@@ -95,3 +99,139 @@ impl oio::PageList for CosLister {
         Ok(())
     }
 }
+
+/// refer: https://cloud.tencent.com/document/product/436/35521
+pub struct CosObjectVersionsLister {
+    core: Arc<CosCore>,
+
+    prefix: String,
+    args: OpList,
+
+    delimiter: &'static str,
+    abs_start_after: Option<String>,
+}
+
+impl CosObjectVersionsLister {
+    pub fn new(core: Arc<CosCore>, path: &str, args: OpList) -> Self {
+        let delimiter = if args.recursive() { "" } else { "/" };
+        let abs_start_after = args
+            .start_after()
+            .map(|start_after| build_abs_path(&core.root, start_after));
+
+        Self {
+            core,
+            prefix: path.to_string(),
+            args,
+            delimiter,
+            abs_start_after,
+        }
+    }
+}
+
+impl oio::PageList for CosObjectVersionsLister {
+    async fn next_page(&self, ctx: &mut PageContext) -> Result<()> {
+        let markers = ctx.token.rsplit_once(" ");
+        let (key_marker, version_id_marker) = if let Some(data) = markers {
+            data
+        } else if let Some(start_after) = &self.abs_start_after {
+            (start_after.as_str(), "")
+        } else {
+            ("", "")
+        };
+
+        let resp = self
+            .core
+            .cos_list_object_versions(
+                &self.prefix,
+                self.delimiter,
+                self.args.limit(),
+                key_marker,
+                version_id_marker,
+            )
+            .await?;
+        if resp.status() != http::StatusCode::OK {
+            return Err(parse_error(resp));
+        }
+
+        let body = resp.into_body();
+        let output: ListObjectVersionsOutput = de::from_reader(body.reader())
+            .map_err(new_xml_deserialize_error)
+            // Allow Cos list to retry on XML deserialization errors.
+            //
+            // This is because the Cos list API may return incomplete XML data 
under high load.
+            // We are confident that our XML decoding logic is correct. When 
this error occurs,
+            // we allow retries to obtain the correct data.
+            .map_err(Error::set_temporary)?;
+
+        ctx.done = if let Some(is_truncated) = output.is_truncated {
+            !is_truncated
+        } else {
+            false
+        };
+        ctx.token = format!(
+            "{} {}",
+            output.next_key_marker.unwrap_or_default(),
+            output.next_version_id_marker.unwrap_or_default()
+        );
+
+        for prefix in output.common_prefixes {
+            let de = oio::Entry::new(
+                &build_rel_path(&self.core.root, &prefix.prefix),
+                Metadata::new(EntryMode::DIR),
+            );
+            ctx.entries.push_back(de);
+        }
+
+        for version_object in output.version {
+            // `list` must be additive, so we need to include the latest 
version object
+            // even if `versions` is not enabled.
+            //
+            // Here we skip all non-latest version objects if `versions` is 
not enabled.
+            if !(self.args.versions() || version_object.is_latest) {
+                continue;
+            }
+
+            let mut path = build_rel_path(&self.core.root, 
&version_object.key);
+            if path.is_empty() {
+                path = "/".to_owned();
+            }
+
+            let mut meta = Metadata::new(EntryMode::from_path(&path));
+            meta.set_version(&version_object.version_id);
+            meta.set_is_current(version_object.is_latest);
+            meta.set_content_length(version_object.size);
+            meta.set_last_modified(parse_datetime_from_rfc3339(
+                version_object.last_modified.as_str(),
+            )?);
+            if let Some(etag) = version_object.etag {
+                meta.set_etag(&etag);
+                meta.set_content_md5(etag.trim_matches('"'));
+            }
+
+            let entry = oio::Entry::new(&path, meta);
+            ctx.entries.push_back(entry);
+        }
+
+        if self.args.deleted() {
+            for delete_marker in output.delete_marker {
+                let mut path = build_rel_path(&self.core.root, 
&delete_marker.key);
+                if path.is_empty() {
+                    path = "/".to_owned();
+                }
+
+                let mut meta = Metadata::new(EntryMode::FILE);
+                meta.set_version(&delete_marker.version_id);
+                meta.set_is_deleted(true);
+                meta.set_is_current(delete_marker.is_latest);
+                meta.set_last_modified(parse_datetime_from_rfc3339(
+                    delete_marker.last_modified.as_str(),
+                )?);
+
+                let entry = oio::Entry::new(&path, meta);
+                ctx.entries.push_back(entry);
+            }
+        }
+
+        Ok(())
+    }
+}

Reply via email to