This is an automated email from the ASF dual-hosted git repository. hoslo pushed a commit to branch gcs-list-with-deleted in repository https://gitbox.apache.org/repos/asf/opendal.git
commit 30544ce215973c10b357e096a3a3ca59182b4d45 Author: Xuanwo <[email protected]> AuthorDate: Wed Jan 8 21:28:12 2025 +0800 feat(core): Implement list with deleted and versions for gcs --- core/Cargo.toml | 2 +- core/src/services/gcs/backend.rs | 5 ++ core/src/services/gcs/core.rs | 31 ++++++++-- core/src/services/gcs/delete.rs | 8 +-- core/src/services/gcs/lister.rs | 125 +++++++++++++++++++++++++++++++++++++++ 5 files changed, 160 insertions(+), 11 deletions(-) diff --git a/core/Cargo.toml b/core/Cargo.toml index f75fb8d8c..ba71b8629 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -47,7 +47,7 @@ rust-version = "1.75" version = "0.51.1" [features] -default = ["reqwest/rustls-tls", "executors-tokio", "services-memory"] +default = ["reqwest/rustls-tls", "executors-tokio", "services-memory", "services-cos", "services-gcs"] # Build test utils or not. # diff --git a/core/src/services/gcs/backend.rs b/core/src/services/gcs/backend.rs index c61ff75ca..0faeb2d1c 100644 --- a/core/src/services/gcs/backend.rs +++ b/core/src/services/gcs/backend.rs @@ -432,6 +432,7 @@ impl Access for GcsBackend { m.set_etag(&meta.etag); m.set_content_md5(&meta.md5_hash); + m.set_version(&meta.generation); let size = meta .size @@ -554,6 +555,10 @@ struct GetObjectJsonResponse { /// /// For example: `"contentType": "image/png",` content_type: String, + /// Generation of this object. + /// + /// For example: `"generation": "1660563214863653"` + generation: String, /// Custom metadata of this object. /// /// For example: `"metadata" : { "my-key": "my-value" }` diff --git a/core/src/services/gcs/core.rs b/core/src/services/gcs/core.rs index 3e06bf03b..f850edee9 100644 --- a/core/src/services/gcs/core.rs +++ b/core/src/services/gcs/core.rs @@ -51,6 +51,9 @@ pub mod constants { pub const X_GOOG_ACL: &str = "x-goog-acl"; pub const X_GOOG_STORAGE_CLASS: &str = "x-goog-storage-class"; pub const X_GOOG_META_PREFIX: &str = "x-goog-meta-"; + pub const IF_GENERATION_MATCH: &str = "ifGenerationMatch"; + pub const X_GOOG_IF_GENERATION_MATCH: &str = "x-goog-if-generation-match"; + pub const GENERATION: &str = "generation"; } pub struct GcsCore { @@ -193,6 +196,10 @@ impl GcsCore { let mut req = Request::get(&url); + if let Some(version) = args.version() { + req = req.header(constants::IF_GENERATION_MATCH, version); + } + if let Some(if_match) = args.if_match() { req = req.header(IF_MATCH, if_match); } @@ -216,6 +223,10 @@ impl GcsCore { let mut req = Request::get(&url); + if let Some(version) = args.version() { + req = req.header(constants::X_GOOG_IF_GENERATION_MATCH, version); + } + if let Some(if_match) = args.if_match() { req = req.header(IF_MATCH, if_match); } @@ -372,6 +383,10 @@ impl GcsCore { let mut req = Request::get(&url); + if let Some(version) = args.version() { + req = req.header(constants::IF_GENERATION_MATCH, version); + } + if let Some(if_none_match) = args.if_none_match() { req = req.header(IF_NONE_MATCH, if_none_match); } @@ -397,6 +412,10 @@ impl GcsCore { let mut req = Request::head(&url); + if let Some(version) = args.version() { + req = req.header(constants::X_GOOG_IF_GENERATION_MATCH, version); + } + if let Some(if_none_match) = args.if_none_match() { req = req.header(IF_NONE_MATCH, if_none_match); } @@ -422,14 +441,14 @@ impl GcsCore { self.send(req).await } - pub async fn gcs_delete_object(&self, path: &str) -> Result<Response<Buffer>> { - let mut req = self.gcs_delete_object_request(path)?; + pub async fn gcs_delete_object(&self, path: &str, args: OpDelete) -> Result<Response<Buffer>> { + let mut req = self.gcs_delete_object_request(path, args)?; self.sign(&mut req).await?; self.send(req).await } - pub fn gcs_delete_object_request(&self, path: &str) -> Result<Request<Buffer>> { + pub fn gcs_delete_object_request(&self, path: &str, args: OpDelete) -> Result<Request<Buffer>> { let p = build_abs_path(&self.root, path); let url = format!( @@ -444,13 +463,13 @@ impl GcsCore { .map_err(new_request_build_error) } - pub async fn gcs_delete_objects(&self, paths: Vec<String>) -> Result<Response<Buffer>> { + pub async fn gcs_delete_objects(&self, batch: Vec<(String, OpDelete)>) -> Result<Response<Buffer>> { let uri = format!("{}/batch/storage/v1", self.endpoint); let mut multipart = Multipart::new(); - for (idx, path) in paths.iter().enumerate() { - let req = self.gcs_delete_object_request(path)?; + for (idx, (path, args)) in batch.iter().enumerate() { + let req = self.gcs_delete_object_request(path, args.clone())?; multipart = multipart.part( MixedPart::from_request(req).part_header("content-id".parse().unwrap(), idx.into()), diff --git a/core/src/services/gcs/delete.rs b/core/src/services/gcs/delete.rs index 241b6152e..03968fd36 100644 --- a/core/src/services/gcs/delete.rs +++ b/core/src/services/gcs/delete.rs @@ -34,8 +34,8 @@ impl GcsDeleter { } impl oio::BatchDelete for GcsDeleter { - async fn delete_once(&self, path: String, _: OpDelete) -> Result<()> { - let resp = self.core.gcs_delete_object(&path).await?; + async fn delete_once(&self, path: String, args: OpDelete) -> Result<()> { + let resp = self.core.gcs_delete_object(&path, args).await?; // deleting not existing objects is ok if resp.status().is_success() || resp.status() == StatusCode::NOT_FOUND { @@ -46,8 +46,8 @@ impl oio::BatchDelete for GcsDeleter { } async fn delete_batch(&self, batch: Vec<(String, OpDelete)>) -> Result<BatchDeleteResult> { - let paths: Vec<String> = batch.into_iter().map(|(p, _)| p).collect(); - let resp = self.core.gcs_delete_objects(paths.clone()).await?; + let paths: Vec<String> = batch.clone().into_iter().map(|(p, _)| p).collect(); + let resp = self.core.gcs_delete_objects(batch).await?; let status = resp.status(); diff --git a/core/src/services/gcs/lister.rs b/core/src/services/gcs/lister.rs index cd66e964f..682974f91 100644 --- a/core/src/services/gcs/lister.rs +++ b/core/src/services/gcs/lister.rs @@ -18,6 +18,7 @@ use std::sync::Arc; use bytes::Buf; +use quick_xml::se; use serde_json; use super::core::*; @@ -134,3 +135,127 @@ impl oio::PageList for GcsLister { Ok(()) } } + +pub struct GcsObjectVersionsLister { + core: Arc<GcsCore>, + + path: String, + args: OpList, + delimiter: &'static str, + limit: Option<usize>, + + /// Filter results to objects whose names are lexicographically + /// **equal to or after** startOffset + start_after: Option<String>, +} + +impl GcsObjectVersionsLister { + /// Generate a new directory walker + pub fn new( + core: Arc<GcsCore>, + path: &str, + args: OpList, + recursive: bool, + limit: Option<usize>, + start_after: Option<&str>, + ) -> Self { + let delimiter = if recursive { "" } else { "/" }; + Self { + core, + + path: path.to_string(), + args, + delimiter, + limit, + start_after: start_after.map(String::from), + } + } +} + +impl oio::PageList for GcsObjectVersionsLister { + async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> { + let resp = self + .core + .gcs_list_object_versions( + &self.path, + &ctx.token, + self.delimiter, + self.limit, + if ctx.token.is_empty() { + self.start_after.clone() + } else { + None + }, + ) + .await?; + + if !resp.status().is_success() { + return Err(parse_error(resp)); + } + let bytes = resp.into_body(); + + let output: ListResponse = + serde_json::from_reader(bytes.reader()).map_err(new_json_deserialize_error)?; + + if let Some(token) = &output.next_page_token { + ctx.token.clone_from(token); + } else { + ctx.done = true; + } + + for prefix in output.prefixes { + let de = oio::Entry::new( + &build_rel_path(&self.core.root, &prefix), + Metadata::new(EntryMode::DIR), + ); + + ctx.entries.push_back(de); + } + + let mut item_map = std::collections::HashMap::new(); + for object in output.items { + // exclude the inclusive start_after itself + let mut path = build_rel_path(&self.core.root, &object.name); + if path.is_empty() { + path = "/".to_string(); + } + if self.start_after.as_ref() == Some(&path) { + continue; + } + + let mut meta = Metadata::new(EntryMode::from_path(&path)); + + // set metadata fields + meta.set_content_md5(object.md5_hash.as_str()); + meta.set_etag(object.etag.as_str()); + + let size = object.size.parse().map_err(|e| { + Error::new(ErrorKind::Unexpected, "parse u64 from list response").set_source(e) + })?; + meta.set_content_length(size); + if !object.content_type.is_empty() { + meta.set_content_type(&object.content_type); + } + + meta.set_last_modified(parse_datetime_from_rfc3339(object.updated.as_str())?); + if object.time_deleted.is_some() { + meta.set_is_deleted(true); + } else { + meta.set_is_current(true); + } + + item_map.insert(path, meta); + } + for (path, meta) in item_map { + // `list` must be additive, so we need to include the latest version object + // + // If `deleted` is true, we include all deleted objects. + if (self.args.deleted() && meta.is_deleted()) || meta.is_current() == Some(true) { + let de = oio::Entry::with(path, meta); + ctx.entries.push_back(de); + } + } + + Ok(()) + } +}
