This is an automated email from the ASF dual-hosted git repository.

hoslo pushed a commit to branch gcs-list-with-deleted
in repository https://gitbox.apache.org/repos/asf/opendal.git

commit 30544ce215973c10b357e096a3a3ca59182b4d45
Author: Xuanwo <[email protected]>
AuthorDate: Wed Jan 8 21:28:12 2025 +0800

    feat(core): Implement list with deleted and versions for gcs
---
 core/Cargo.toml                  |   2 +-
 core/src/services/gcs/backend.rs |   5 ++
 core/src/services/gcs/core.rs    |  31 ++++++++--
 core/src/services/gcs/delete.rs  |   8 +--
 core/src/services/gcs/lister.rs  | 125 +++++++++++++++++++++++++++++++++++++++
 5 files changed, 160 insertions(+), 11 deletions(-)

diff --git a/core/Cargo.toml b/core/Cargo.toml
index f75fb8d8c..ba71b8629 100644
--- a/core/Cargo.toml
+++ b/core/Cargo.toml
@@ -47,7 +47,7 @@ rust-version = "1.75"
 version = "0.51.1"
 
 [features]
-default = ["reqwest/rustls-tls", "executors-tokio", "services-memory"]
+default = ["reqwest/rustls-tls", "executors-tokio", "services-memory", 
"services-cos", "services-gcs"]
 
 # Build test utils or not.
 #
diff --git a/core/src/services/gcs/backend.rs b/core/src/services/gcs/backend.rs
index c61ff75ca..0faeb2d1c 100644
--- a/core/src/services/gcs/backend.rs
+++ b/core/src/services/gcs/backend.rs
@@ -432,6 +432,7 @@ impl Access for GcsBackend {
 
         m.set_etag(&meta.etag);
         m.set_content_md5(&meta.md5_hash);
+        m.set_version(&meta.generation);
 
         let size = meta
             .size
@@ -554,6 +555,10 @@ struct GetObjectJsonResponse {
     ///
     /// For example: `"contentType": "image/png",`
     content_type: String,
+    /// Generation of this object.
+    ///
+    /// For example: `"generation": "1660563214863653"`
+    generation: String,
     /// Custom metadata of this object.
     ///
     /// For example: `"metadata" : { "my-key": "my-value" }`
diff --git a/core/src/services/gcs/core.rs b/core/src/services/gcs/core.rs
index 3e06bf03b..f850edee9 100644
--- a/core/src/services/gcs/core.rs
+++ b/core/src/services/gcs/core.rs
@@ -51,6 +51,9 @@ pub mod constants {
     pub const X_GOOG_ACL: &str = "x-goog-acl";
     pub const X_GOOG_STORAGE_CLASS: &str = "x-goog-storage-class";
     pub const X_GOOG_META_PREFIX: &str = "x-goog-meta-";
+    pub const IF_GENERATION_MATCH: &str = "ifGenerationMatch";
+    pub const X_GOOG_IF_GENERATION_MATCH: &str = "x-goog-if-generation-match";
+    pub const GENERATION: &str = "generation";
 }
 
 pub struct GcsCore {
@@ -193,6 +196,10 @@ impl GcsCore {
 
         let mut req = Request::get(&url);
 
+        if let Some(version) = args.version() {
+            req = req.header(constants::IF_GENERATION_MATCH, version);
+        }
+
         if let Some(if_match) = args.if_match() {
             req = req.header(IF_MATCH, if_match);
         }
@@ -216,6 +223,10 @@ impl GcsCore {
 
         let mut req = Request::get(&url);
 
+        if let Some(version) = args.version() {
+            req = req.header(constants::X_GOOG_IF_GENERATION_MATCH, version);
+        }
+
         if let Some(if_match) = args.if_match() {
             req = req.header(IF_MATCH, if_match);
         }
@@ -372,6 +383,10 @@ impl GcsCore {
 
         let mut req = Request::get(&url);
 
+        if let Some(version) = args.version() {
+            req = req.header(constants::IF_GENERATION_MATCH, version);
+        }
+
         if let Some(if_none_match) = args.if_none_match() {
             req = req.header(IF_NONE_MATCH, if_none_match);
         }
@@ -397,6 +412,10 @@ impl GcsCore {
 
         let mut req = Request::head(&url);
 
+        if let Some(version) = args.version() {
+            req = req.header(constants::X_GOOG_IF_GENERATION_MATCH, version);
+        }
+
         if let Some(if_none_match) = args.if_none_match() {
             req = req.header(IF_NONE_MATCH, if_none_match);
         }
@@ -422,14 +441,14 @@ impl GcsCore {
         self.send(req).await
     }
 
-    pub async fn gcs_delete_object(&self, path: &str) -> 
Result<Response<Buffer>> {
-        let mut req = self.gcs_delete_object_request(path)?;
+    pub async fn gcs_delete_object(&self, path: &str, args: OpDelete) -> 
Result<Response<Buffer>> {
+        let mut req = self.gcs_delete_object_request(path, args)?;
 
         self.sign(&mut req).await?;
         self.send(req).await
     }
 
-    pub fn gcs_delete_object_request(&self, path: &str) -> 
Result<Request<Buffer>> {
+    pub fn gcs_delete_object_request(&self, path: &str, args: OpDelete) -> 
Result<Request<Buffer>> {
         let p = build_abs_path(&self.root, path);
 
         let url = format!(
@@ -444,13 +463,13 @@ impl GcsCore {
             .map_err(new_request_build_error)
     }
 
-    pub async fn gcs_delete_objects(&self, paths: Vec<String>) -> 
Result<Response<Buffer>> {
+    pub async fn gcs_delete_objects(&self, batch: Vec<(String, OpDelete)>) -> 
Result<Response<Buffer>> {
         let uri = format!("{}/batch/storage/v1", self.endpoint);
 
         let mut multipart = Multipart::new();
 
-        for (idx, path) in paths.iter().enumerate() {
-            let req = self.gcs_delete_object_request(path)?;
+        for (idx, (path, args)) in batch.iter().enumerate() {
+            let req = self.gcs_delete_object_request(path, args.clone())?;
 
             multipart = multipart.part(
                 
MixedPart::from_request(req).part_header("content-id".parse().unwrap(), 
idx.into()),
diff --git a/core/src/services/gcs/delete.rs b/core/src/services/gcs/delete.rs
index 241b6152e..03968fd36 100644
--- a/core/src/services/gcs/delete.rs
+++ b/core/src/services/gcs/delete.rs
@@ -34,8 +34,8 @@ impl GcsDeleter {
 }
 
 impl oio::BatchDelete for GcsDeleter {
-    async fn delete_once(&self, path: String, _: OpDelete) -> Result<()> {
-        let resp = self.core.gcs_delete_object(&path).await?;
+    async fn delete_once(&self, path: String, args: OpDelete) -> Result<()> {
+        let resp = self.core.gcs_delete_object(&path, args).await?;
 
         // deleting not existing objects is ok
         if resp.status().is_success() || resp.status() == 
StatusCode::NOT_FOUND {
@@ -46,8 +46,8 @@ impl oio::BatchDelete for GcsDeleter {
     }
 
     async fn delete_batch(&self, batch: Vec<(String, OpDelete)>) -> 
Result<BatchDeleteResult> {
-        let paths: Vec<String> = batch.into_iter().map(|(p, _)| p).collect();
-        let resp = self.core.gcs_delete_objects(paths.clone()).await?;
+        let paths: Vec<String> = batch.clone().into_iter().map(|(p, _)| 
p).collect();
+        let resp = self.core.gcs_delete_objects(batch).await?;
 
         let status = resp.status();
 
diff --git a/core/src/services/gcs/lister.rs b/core/src/services/gcs/lister.rs
index cd66e964f..682974f91 100644
--- a/core/src/services/gcs/lister.rs
+++ b/core/src/services/gcs/lister.rs
@@ -18,6 +18,7 @@
 use std::sync::Arc;
 
 use bytes::Buf;
+use quick_xml::se;
 use serde_json;
 
 use super::core::*;
@@ -134,3 +135,127 @@ impl oio::PageList for GcsLister {
         Ok(())
     }
 }
+
+pub struct GcsObjectVersionsLister {
+    core: Arc<GcsCore>,
+
+    path: String,
+    args: OpList,
+    delimiter: &'static str,
+    limit: Option<usize>,
+
+    /// Filter results to objects whose names are lexicographically
+    /// **equal to or after** startOffset
+    start_after: Option<String>,
+}
+
+impl GcsObjectVersionsLister {
+    /// Generate a new directory walker
+    pub fn new(
+        core: Arc<GcsCore>,
+        path: &str,
+        args: OpList,
+        recursive: bool,
+        limit: Option<usize>,
+        start_after: Option<&str>,
+    ) -> Self {
+        let delimiter = if recursive { "" } else { "/" };
+        Self {
+            core,
+
+            path: path.to_string(),
+            args,
+            delimiter,
+            limit,
+            start_after: start_after.map(String::from),
+        }
+    }
+}
+
+impl oio::PageList for GcsObjectVersionsLister {
+    async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> {
+        let resp = self
+            .core
+            .gcs_list_object_versions(
+                &self.path,
+                &ctx.token,
+                self.delimiter,
+                self.limit,
+                if ctx.token.is_empty() {
+                    self.start_after.clone()
+                } else {
+                    None
+                },
+            )
+            .await?;
+
+        if !resp.status().is_success() {
+            return Err(parse_error(resp));
+        }
+        let bytes = resp.into_body();
+
+        let output: ListResponse =
+            
serde_json::from_reader(bytes.reader()).map_err(new_json_deserialize_error)?;
+
+        if let Some(token) = &output.next_page_token {
+            ctx.token.clone_from(token);
+        } else {
+            ctx.done = true;
+        }
+
+        for prefix in output.prefixes {
+            let de = oio::Entry::new(
+                &build_rel_path(&self.core.root, &prefix),
+                Metadata::new(EntryMode::DIR),
+            );
+
+            ctx.entries.push_back(de);
+        }
+
+        let mut item_map = std::collections::HashMap::new();
+        for object in output.items {
+            // exclude the inclusive start_after itself
+            let mut path = build_rel_path(&self.core.root, &object.name);
+            if path.is_empty() {
+                path = "/".to_string();
+            }
+            if self.start_after.as_ref() == Some(&path) {
+                continue;
+            }
+
+            let mut meta = Metadata::new(EntryMode::from_path(&path));
+
+            // set metadata fields
+            meta.set_content_md5(object.md5_hash.as_str());
+            meta.set_etag(object.etag.as_str());
+
+            let size = object.size.parse().map_err(|e| {
+                Error::new(ErrorKind::Unexpected, "parse u64 from list 
response").set_source(e)
+            })?;
+            meta.set_content_length(size);
+            if !object.content_type.is_empty() {
+                meta.set_content_type(&object.content_type);
+            }
+
+            
meta.set_last_modified(parse_datetime_from_rfc3339(object.updated.as_str())?);
+            if object.time_deleted.is_some() {
+                meta.set_is_deleted(true);
+            } else {
+                meta.set_is_current(true);
+            }
+
+            item_map.insert(path, meta);
+        }
+        for (path, meta) in item_map {
+            // `list` must be additive, so we need to include the latest 
version object
+            //
+            // If `deleted` is true, we include all deleted objects.
+            if (self.args.deleted() && meta.is_deleted()) || meta.is_current() 
== Some(true) {
+                let de = oio::Entry::with(path, meta);
+                ctx.entries.push_back(de);
+            }
+        }
+
+        Ok(())
+    }
+}

Reply via email to