This is an automated email from the ASF dual-hosted git repository.

hoslo pushed a commit to branch gcs-list-with-deleted
in repository https://gitbox.apache.org/repos/asf/opendal.git

commit a76b366c16da1426cbc743d4d78bfb7cf093295e
Author: hoslo <ho...@apache.org>
AuthorDate: Wed Jan 8 21:28:12 2025 +0800

    feat(core): Implement list with deleted and versions for gcs
---
 core/src/services/gcs/backend.rs |  20 ++++++-
 core/src/services/gcs/config.rs  |   2 +
 core/src/services/gcs/core.rs    |  81 +++++++++++++++++++++++--
 core/src/services/gcs/delete.rs  |   8 +--
 core/src/services/gcs/lister.rs  | 125 +++++++++++++++++++++++++++++++++++++++
 5 files changed, 225 insertions(+), 11 deletions(-)

diff --git a/core/src/services/gcs/backend.rs b/core/src/services/gcs/backend.rs
index c61ff75ca..e7b081eb0 100644
--- a/core/src/services/gcs/backend.rs
+++ b/core/src/services/gcs/backend.rs
@@ -192,6 +192,13 @@ impl GcsBuilder {
         self
     }
 
+     /// Set bucket versioning status for this backend
+     pub fn enable_versioning(mut self, enabled: bool) -> Self {
+        self.config.enable_versioning = enabled;
+
+        self
+    }
+
     /// Set the predefined acl for GCS.
     ///
     /// Available values are:
@@ -326,6 +333,7 @@ impl Builder for GcsBuilder {
                 predefined_acl: self.config.predefined_acl.clone(),
                 default_storage_class: 
self.config.default_storage_class.clone(),
                 allow_anonymous: self.config.allow_anonymous,
+                enable_versioning: self.config.enable_versioning,
             }),
         };
 
@@ -362,6 +370,7 @@ impl Access for GcsBackend {
                 stat_has_content_md5: true,
                 stat_has_content_length: true,
                 stat_has_content_type: true,
+                stat_with_version: self.core.enable_versioning,
                 stat_has_last_modified: true,
                 stat_has_user_metadata: true,
 
@@ -369,13 +378,14 @@ impl Access for GcsBackend {
 
                 read_with_if_match: true,
                 read_with_if_none_match: true,
+                read_with_version: self.core.enable_versioning,
 
                 write: true,
                 write_can_empty: true,
                 write_can_multi: true,
                 write_with_content_type: true,
                 write_with_user_metadata: true,
-                write_with_if_not_exists: true,
+                write_with_if_not_exists: !self.core.enable_versioning,
 
                 // The min multipart size of Gcs is 5 MiB.
                 //
@@ -392,6 +402,7 @@ impl Access for GcsBackend {
 
                 delete: true,
                 delete_max_size: Some(100),
+                delete_with_version: self.core.enable_versioning,
                 copy: true,
 
                 list: true,
@@ -403,6 +414,8 @@ impl Access for GcsBackend {
                 list_has_content_length: true,
                 list_has_content_type: true,
                 list_has_last_modified: true,
+                list_with_versions: self.core.enable_versioning,
+                list_with_deleted: self.core.enable_versioning,
 
                 presign: true,
                 presign_stat: true,
@@ -432,6 +445,7 @@ impl Access for GcsBackend {
 
         m.set_etag(&meta.etag);
         m.set_content_md5(&meta.md5_hash);
+        m.set_version(&meta.generation);
 
         let size = meta
             .size
@@ -554,6 +568,10 @@ struct GetObjectJsonResponse {
     ///
     /// For example: `"contentType": "image/png",`
     content_type: String,
+    /// Generation of this object.
+    ///
+    /// For example: `"generation": "1660563214863653"`
+    generation: String,
     /// Custom metadata of this object.
     ///
     /// For example: `"metadata" : { "my-key": "my-value" }`
diff --git a/core/src/services/gcs/config.rs b/core/src/services/gcs/config.rs
index 43ff15175..fb12871dc 100644
--- a/core/src/services/gcs/config.rs
+++ b/core/src/services/gcs/config.rs
@@ -53,6 +53,8 @@ pub struct GcsConfig {
     pub disable_vm_metadata: bool,
     /// Disable loading configuration from the environment.
     pub disable_config_load: bool,
+    /// Enable versioning for the bucket.
+    pub enable_versioning: bool,
     /// A Google Cloud OAuth2 token.
     ///
     /// Takes precedence over `credential` and `credential_path`.
diff --git a/core/src/services/gcs/core.rs b/core/src/services/gcs/core.rs
index 3e06bf03b..021d39753 100644
--- a/core/src/services/gcs/core.rs
+++ b/core/src/services/gcs/core.rs
@@ -51,6 +51,9 @@ pub mod constants {
     pub const X_GOOG_ACL: &str = "x-goog-acl";
     pub const X_GOOG_STORAGE_CLASS: &str = "x-goog-storage-class";
     pub const X_GOOG_META_PREFIX: &str = "x-goog-meta-";
+    pub const IF_GENERATION_MATCH: &str = "ifGenerationMatch";
+    pub const X_GOOG_IF_GENERATION_MATCH: &str = "x-goog-if-generation-match";
+    pub const GENERATION: &str = "generation";
 }
 
 pub struct GcsCore {
@@ -69,6 +72,7 @@ pub struct GcsCore {
     pub default_storage_class: Option<String>,
 
     pub allow_anonymous: bool,
+    pub enable_versioning: bool,
 }
 
 impl Debug for GcsCore {
@@ -193,6 +197,10 @@ impl GcsCore {
 
         let mut req = Request::get(&url);
 
+        if let Some(version) = args.version() {
+            req = req.header(constants::IF_GENERATION_MATCH, version);
+        }
+
         if let Some(if_match) = args.if_match() {
             req = req.header(IF_MATCH, if_match);
         }
@@ -216,6 +224,10 @@ impl GcsCore {
 
         let mut req = Request::get(&url);
 
+        if let Some(version) = args.version() {
+            req = req.header(constants::X_GOOG_IF_GENERATION_MATCH, version);
+        }
+
         if let Some(if_match) = args.if_match() {
             req = req.header(IF_MATCH, if_match);
         }
@@ -372,6 +384,10 @@ impl GcsCore {
 
         let mut req = Request::get(&url);
 
+        if let Some(version) = args.version() {
+            req = req.header(constants::IF_GENERATION_MATCH, version);
+        }
+
         if let Some(if_none_match) = args.if_none_match() {
             req = req.header(IF_NONE_MATCH, if_none_match);
         }
@@ -397,6 +413,10 @@ impl GcsCore {
 
         let mut req = Request::head(&url);
 
+        if let Some(version) = args.version() {
+            req = req.header(constants::X_GOOG_IF_GENERATION_MATCH, version);
+        }
+
         if let Some(if_none_match) = args.if_none_match() {
             req = req.header(IF_NONE_MATCH, if_none_match);
         }
@@ -422,14 +442,14 @@ impl GcsCore {
         self.send(req).await
     }
 
-    pub async fn gcs_delete_object(&self, path: &str) -> 
Result<Response<Buffer>> {
-        let mut req = self.gcs_delete_object_request(path)?;
+    pub async fn gcs_delete_object(&self, path: &str, args: OpDelete) -> 
Result<Response<Buffer>> {
+        let mut req = self.gcs_delete_object_request(path, args)?;
 
         self.sign(&mut req).await?;
         self.send(req).await
     }
 
-    pub fn gcs_delete_object_request(&self, path: &str) -> 
Result<Request<Buffer>> {
+    pub fn gcs_delete_object_request(&self, path: &str, args: OpDelete) -> 
Result<Request<Buffer>> {
         let p = build_abs_path(&self.root, path);
 
         let url = format!(
@@ -444,13 +464,13 @@ impl GcsCore {
             .map_err(new_request_build_error)
     }
 
-    pub async fn gcs_delete_objects(&self, paths: Vec<String>) -> 
Result<Response<Buffer>> {
+    pub async fn gcs_delete_objects(&self, batch: Vec<(String, OpDelete)>) -> 
Result<Response<Buffer>> {
         let uri = format!("{}/batch/storage/v1", self.endpoint);
 
         let mut multipart = Multipart::new();
 
-        for (idx, path) in paths.iter().enumerate() {
-            let req = self.gcs_delete_object_request(path)?;
+        for (idx, (path, args)) in batch.iter().enumerate() {
+            let req = self.gcs_delete_object_request(path, args.clone())?;
 
             multipart = multipart.part(
                 
MixedPart::from_request(req).part_header("content-id".parse().unwrap(), 
idx.into()),
@@ -534,6 +554,54 @@ impl GcsCore {
         self.send(req).await
     }
 
+    pub async fn gcs_list_object_versions(
+        &self,
+        path: &str,
+        page_token: &str,
+        delimiter: &str,
+        limit: Option<usize>,
+        start_after: Option<String>,
+    ) -> Result<Response<Buffer>> {
+        let p = build_abs_path(&self.root, path);
+
+        let mut url = format!(
+            "{}/storage/v1/b/{}/o?versions=true&prefix={}",
+            self.endpoint,
+            self.bucket,
+            percent_encode_path(&p)
+        );
+        if !delimiter.is_empty() {
+            write!(url, "&delimiter={delimiter}").expect("write into string 
must succeed");
+        }
+        if let Some(limit) = limit {
+            write!(url, "&maxResults={limit}").expect("write into string must 
succeed");
+        }
+        if let Some(start_after) = start_after {
+            let start_after = build_abs_path(&self.root, &start_after);
+            write!(url, "&startOffset={}", percent_encode_path(&start_after))
+                .expect("write into string must succeed");
+        }
+
+        if !page_token.is_empty() {
+            // NOTE:
+            //
+            // GCS uses pageToken in request and nextPageToken in response
+            //
+            // Don't know how will those tokens be like so this part are copied
+            // directly from AWS S3 service.
+            write!(url, "&pageToken={}", percent_encode_path(page_token))
+                .expect("write into string must succeed");
+        }
+
+        let mut req = Request::get(&url)
+            .body(Buffer::new())
+            .map_err(new_request_build_error)?;
+
+        self.sign(&mut req).await?;
+
+        self.send(req).await
+    }
+
     pub async fn gcs_initiate_multipart_upload(&self, path: &str) -> 
Result<Response<Buffer>> {
         let p = build_abs_path(&self.root, path);
 
@@ -681,6 +749,7 @@ pub struct ListResponseItem {
     pub md5_hash: String,
     pub updated: String,
     pub content_type: String,
+    pub time_deleted: Option<String>,
 }
 
 /// Result of CreateMultipartUpload
diff --git a/core/src/services/gcs/delete.rs b/core/src/services/gcs/delete.rs
index 241b6152e..03968fd36 100644
--- a/core/src/services/gcs/delete.rs
+++ b/core/src/services/gcs/delete.rs
@@ -34,8 +34,8 @@ impl GcsDeleter {
 }
 
 impl oio::BatchDelete for GcsDeleter {
-    async fn delete_once(&self, path: String, _: OpDelete) -> Result<()> {
-        let resp = self.core.gcs_delete_object(&path).await?;
+    async fn delete_once(&self, path: String, args: OpDelete) -> Result<()> {
+        let resp = self.core.gcs_delete_object(&path, args).await?;
 
         // deleting not existing objects is ok
         if resp.status().is_success() || resp.status() == 
StatusCode::NOT_FOUND {
@@ -46,8 +46,8 @@ impl oio::BatchDelete for GcsDeleter {
     }
 
     async fn delete_batch(&self, batch: Vec<(String, OpDelete)>) -> 
Result<BatchDeleteResult> {
-        let paths: Vec<String> = batch.into_iter().map(|(p, _)| p).collect();
-        let resp = self.core.gcs_delete_objects(paths.clone()).await?;
+        let paths: Vec<String> = batch.clone().into_iter().map(|(p, _)| 
p).collect();
+        let resp = self.core.gcs_delete_objects(batch).await?;
 
         let status = resp.status();
 
diff --git a/core/src/services/gcs/lister.rs b/core/src/services/gcs/lister.rs
index cd66e964f..682974f91 100644
--- a/core/src/services/gcs/lister.rs
+++ b/core/src/services/gcs/lister.rs
@@ -18,6 +18,7 @@
 use std::sync::Arc;
 
 use bytes::Buf;
+use quick_xml::se;
 use serde_json;
 
 use super::core::*;
@@ -134,3 +135,127 @@ impl oio::PageList for GcsLister {
         Ok(())
     }
 }
+
+pub struct GcsObjectVersionsLister {
+    core: Arc<GcsCore>,
+
+    path: String,
+    args: OpList,
+    delimiter: &'static str,
+    limit: Option<usize>,
+
+    /// Filter results to objects whose names are lexicographically
+    /// **equal to or after** startOffset
+    start_after: Option<String>,
+}
+
+impl GcsObjectVersionsLister {
+    /// Generate a new directory walker
+    pub fn new(
+        core: Arc<GcsCore>,
+        path: &str,
+        args: OpList,
+        recursive: bool,
+        limit: Option<usize>,
+        start_after: Option<&str>,
+    ) -> Self {
+        let delimiter = if recursive { "" } else { "/" };
+        Self {
+            core,
+
+            path: path.to_string(),
+            args,
+            delimiter,
+            limit,
+            start_after: start_after.map(String::from),
+        }
+    }
+}
+
+impl oio::PageList for GcsObjectVersionsLister {
+    async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> {
+        let resp = self
+            .core
+            .gcs_list_object_versions(
+                &self.path,
+                &ctx.token,
+                self.delimiter,
+                self.limit,
+                if ctx.token.is_empty() {
+                    self.start_after.clone()
+                } else {
+                    None
+                },
+            )
+            .await?;
+
+        if !resp.status().is_success() {
+            return Err(parse_error(resp));
+        }
+        let bytes = resp.into_body();
+
+        let output: ListResponse =
+            
serde_json::from_reader(bytes.reader()).map_err(new_json_deserialize_error)?;
+
+        if let Some(token) = &output.next_page_token {
+            ctx.token.clone_from(token);
+        } else {
+            ctx.done = true;
+        }
+
+        for prefix in output.prefixes {
+            let de = oio::Entry::new(
+                &build_rel_path(&self.core.root, &prefix),
+                Metadata::new(EntryMode::DIR),
+            );
+
+            ctx.entries.push_back(de);
+        }
+
+        let mut item_map = std::collections::HashMap::new();
+        for object in output.items {
+            // exclude the inclusive start_after itself
+            let mut path = build_rel_path(&self.core.root, &object.name);
+            if path.is_empty() {
+                path = "/".to_string();
+            }
+            if self.start_after.as_ref() == Some(&path) {
+                continue;
+            }
+
+            let mut meta = Metadata::new(EntryMode::from_path(&path));
+
+            // set metadata fields
+            meta.set_content_md5(object.md5_hash.as_str());
+            meta.set_etag(object.etag.as_str());
+
+            let size = object.size.parse().map_err(|e| {
+                Error::new(ErrorKind::Unexpected, "parse u64 from list 
response").set_source(e)
+            })?;
+            meta.set_content_length(size);
+            if !object.content_type.is_empty() {
+                meta.set_content_type(&object.content_type);
+            }
+
+            
meta.set_last_modified(parse_datetime_from_rfc3339(object.updated.as_str())?);
+            if object.time_deleted.is_some() {
+                meta.set_is_deleted(true);
+            } else {
+                meta.set_is_current(true);
+            }
+
+            item_map.insert(path, meta);
+        }
+        for (path, meta) in item_map {
+            // `list` must be additive, so we need to include the latest 
version object
+            //
+            // If `deleted` is true, we include all deleted objects.
+            if (self.args.deleted() && meta.is_deleted()) || meta.is_current() 
== Some(true) {
+                let de = oio::Entry::with(path, meta);
+                ctx.entries.push_back(de);
+            }
+        }
+
+        Ok(())
+    }
+}

Reply via email to