This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new ab9f0a844 feat(services/gcs): Add batch delete support (#2142)
ab9f0a844 is described below

commit ab9f0a844076ece16f31a023e713f0dfc48262d5
Author: congyi wang <[email protected]>
AuthorDate: Thu Jun 8 08:30:50 2023 +0800

    feat(services/gcs): Add batch delete support (#2142)
    
    * support batch delete for gcs
    
    * implement batch for gcs via json
    
    * minor
    
    * fix comments
    
    * add endpoint
    
    * rebase main
    
    * Fix batch request build
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Fix build
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Implement batch delete
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Remove not used error
    
    Signed-off-by: Xuanwo <[email protected]>
    
    ---------
    
    Signed-off-by: Xuanwo <[email protected]>
    Co-authored-by: Xuanwo <[email protected]>
---
 core/src/raw/http_util/multipart.rs | 19 ++++++++----
 core/src/services/gcs/backend.rs    | 61 +++++++++++++++++++++++++++++++++++++
 core/src/services/gcs/core.rs       | 32 +++++++++++++++++--
 3 files changed, 103 insertions(+), 9 deletions(-)

diff --git a/core/src/raw/http_util/multipart.rs 
b/core/src/raw/http_util/multipart.rs
index d364a4416..d27829e78 100644
--- a/core/src/raw/http_util/multipart.rs
+++ b/core/src/raw/http_util/multipart.rs
@@ -20,6 +20,7 @@ use std::str::FromStr;
 
 use bytes::Bytes;
 use bytes::BytesMut;
+use futures::stream;
 use http::header::CONTENT_DISPOSITION;
 use http::header::CONTENT_LENGTH;
 use http::header::CONTENT_TYPE;
@@ -36,6 +37,7 @@ use http::Version;
 
 use super::new_request_build_error;
 use super::AsyncBody;
+use super::IncomingAsyncBody;
 use crate::*;
 
 /// Multipart is a builder for multipart/form-data.
@@ -72,6 +74,11 @@ impl<T: Part> Multipart<T> {
         self
     }
 
+    /// Into parts.
+    pub fn into_parts(self) -> Vec<T> {
+        self.parts
+    }
+
     /// Parse a response with multipart body into Multipart.
     pub fn parse(mut self, bs: Bytes) -> Result<Self> {
         let s = String::from_utf8(bs.to_vec()).map_err(|err| {
@@ -87,7 +94,7 @@ impl<T: Part> Multipart<T> {
             .collect::<Vec<&str>>();
 
         for part in parts {
-            if part.is_empty() || part == "--" {
+            if part.is_empty() || part.starts_with("--") {
                 continue;
             }
 
@@ -300,7 +307,7 @@ impl MixedPart {
     }
 
     /// Consume a mixed part to build a response.
-    pub fn into_response(mut self) -> Response<AsyncBody> {
+    pub fn into_response(mut self) -> Response<IncomingAsyncBody> {
         let mut builder = Response::builder();
 
         builder = builder.status(self.status_code.unwrap_or(StatusCode::OK));
@@ -308,10 +315,10 @@ impl MixedPart {
         // Swap headers directly instead of copy the entire map.
         mem::swap(builder.headers_mut().unwrap(), &mut self.headers);
 
-        let body = match self.content.is_empty() {
-            true => AsyncBody::Empty,
-            false => AsyncBody::Bytes(self.content),
-        };
+        let bs: Bytes = self.content;
+        let length = bs.len();
+        let body =
+            IncomingAsyncBody::new(Box::new(stream::iter(vec![Ok(bs)])), 
Some(length as u64));
 
         builder
             .body(body)
diff --git a/core/src/services/gcs/backend.rs b/core/src/services/gcs/backend.rs
index 08062a450..93161637a 100644
--- a/core/src/services/gcs/backend.rs
+++ b/core/src/services/gcs/backend.rs
@@ -427,6 +427,8 @@ impl Accessor for GcsBackend {
                 list_with_delimiter_slash: true,
                 list_without_delimiter: true,
 
+                batch: true,
+                batch_max_operations: Some(100),
                 presign: true,
                 presign_stat: true,
                 presign_read: true,
@@ -557,6 +559,65 @@ impl Accessor for GcsBackend {
         ))
     }
 
+    async fn batch(&self, args: OpBatch) -> Result<RpBatch> {
+        let ops = args.into_operation();
+        if ops.len() > 100 {
+            return Err(Error::new(
+                ErrorKind::Unsupported,
+                "gcs services only allow delete less than 100 keys at once",
+            )
+            .with_context("length", ops.len().to_string()));
+        }
+
+        let paths: Vec<String> = ops.into_iter().map(|(p, _)| p).collect();
+        let resp = self.core.gcs_delete_objects(paths.clone()).await?;
+
+        let status = resp.status();
+
+        if let StatusCode::OK = status {
+            let content_type = 
parse_content_type(resp.headers())?.ok_or_else(|| {
+                Error::new(
+                    ErrorKind::Unexpected,
+                    "gcs batch delete response content type is empty",
+                )
+            })?;
+            let boundary = content_type
+                .strip_prefix("multipart/mixed; boundary=")
+                .ok_or_else(|| {
+                    Error::new(
+                        ErrorKind::Unexpected,
+                        "gcs batch delete response content type is not 
multipart/mixed",
+                    )
+                })?
+                .trim_matches('"');
+            let multipart: Multipart<MixedPart> = Multipart::new()
+                .with_boundary(boundary)
+                .parse(resp.into_body().bytes().await?)?;
+            let parts = multipart.into_parts();
+
+            let mut batched_result = Vec::with_capacity(parts.len());
+
+            for (i, part) in parts.into_iter().enumerate() {
+                let resp = part.into_response();
+                // TODO: maybe we can take it directly?
+                let path = paths[i].clone();
+
+                // deleting not existing objects is ok
+                if resp.status().is_success() || resp.status() == 
StatusCode::NOT_FOUND {
+                    batched_result.push((path, 
Ok(RpDelete::default().into())));
+                } else {
+                    batched_result.push((path, Err(parse_error(resp).await?)));
+                }
+            }
+
+            Ok(RpBatch::new(batched_result))
+        } else {
+            // If the overall request isn't formatted correctly and Cloud 
Storage is unable to parse it into sub-requests, you receive a 400 error.
+            // Otherwise, Cloud Storage returns a 200 status code, even if 
some or all of the sub-requests fail.
+            Err(parse_error(resp).await?)
+        }
+    }
+
     async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
         // We will not send this request out, just for signing.
         let mut req = match args.operation() {
diff --git a/core/src/services/gcs/core.rs b/core/src/services/gcs/core.rs
index 7ebe25dbe..e29402db7 100644
--- a/core/src/services/gcs/core.rs
+++ b/core/src/services/gcs/core.rs
@@ -371,6 +371,13 @@ impl GcsCore {
     }
 
     pub async fn gcs_delete_object(&self, path: &str) -> 
Result<Response<IncomingAsyncBody>> {
+        let mut req = self.gcs_delete_object_request(path)?;
+
+        self.sign(&mut req).await?;
+        self.send(req).await
+    }
+
+    pub fn gcs_delete_object_request(&self, path: &str) -> 
Result<Request<AsyncBody>> {
         let p = build_abs_path(&self.root, path);
 
         let url = format!(
@@ -380,12 +387,31 @@ impl GcsCore {
             percent_encode_path(&p)
         );
 
-        let mut req = Request::delete(&url)
+        Request::delete(&url)
             .body(AsyncBody::Empty)
-            .map_err(new_request_build_error)?;
+            .map_err(new_request_build_error)
+    }
 
-        self.sign(&mut req).await?;
+    pub async fn gcs_delete_objects(
+        &self,
+        paths: Vec<String>,
+    ) -> Result<Response<IncomingAsyncBody>> {
+        let uri = format!("{}/batch/storage/v1", self.endpoint);
+
+        let mut multipart = Multipart::new();
+
+        for (idx, path) in paths.iter().enumerate() {
+            let req = self.gcs_delete_object_request(path)?;
 
+            multipart = multipart.part(
+                
MixedPart::from_request(req).part_header("content-id".parse().unwrap(), 
idx.into()),
+            );
+        }
+
+        let req = Request::post(uri);
+        let mut req = multipart.apply(req)?;
+
+        self.sign(&mut req).await?;
         self.send(req).await
     }
 

Reply via email to