This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
The following commit(s) were added to refs/heads/main by this push:
new ab9f0a844 feat(services/gcs): Add batch delete support (#2142)
ab9f0a844 is described below
commit ab9f0a844076ece16f31a023e713f0dfc48262d5
Author: congyi wang <[email protected]>
AuthorDate: Thu Jun 8 08:30:50 2023 +0800
feat(services/gcs): Add batch delete support (#2142)
* support batch delete for gcs
* implement batch for gcs via json
* minor
* fix comments
* add endpoint
* rebase main
* Fix batch request build
Signed-off-by: Xuanwo <[email protected]>
* Fix build
Signed-off-by: Xuanwo <[email protected]>
* Implement batch delete
Signed-off-by: Xuanwo <[email protected]>
* Remove not used error
Signed-off-by: Xuanwo <[email protected]>
---------
Signed-off-by: Xuanwo <[email protected]>
Co-authored-by: Xuanwo <[email protected]>
---
core/src/raw/http_util/multipart.rs | 19 ++++++++----
core/src/services/gcs/backend.rs | 61 +++++++++++++++++++++++++++++++++++++
core/src/services/gcs/core.rs | 32 +++++++++++++++++--
3 files changed, 103 insertions(+), 9 deletions(-)
diff --git a/core/src/raw/http_util/multipart.rs
b/core/src/raw/http_util/multipart.rs
index d364a4416..d27829e78 100644
--- a/core/src/raw/http_util/multipart.rs
+++ b/core/src/raw/http_util/multipart.rs
@@ -20,6 +20,7 @@ use std::str::FromStr;
use bytes::Bytes;
use bytes::BytesMut;
+use futures::stream;
use http::header::CONTENT_DISPOSITION;
use http::header::CONTENT_LENGTH;
use http::header::CONTENT_TYPE;
@@ -36,6 +37,7 @@ use http::Version;
use super::new_request_build_error;
use super::AsyncBody;
+use super::IncomingAsyncBody;
use crate::*;
/// Multipart is a builder for multipart/form-data.
@@ -72,6 +74,11 @@ impl<T: Part> Multipart<T> {
self
}
+ /// Into parts.
+ pub fn into_parts(self) -> Vec<T> {
+ self.parts
+ }
+
/// Parse a response with multipart body into Multipart.
pub fn parse(mut self, bs: Bytes) -> Result<Self> {
let s = String::from_utf8(bs.to_vec()).map_err(|err| {
@@ -87,7 +94,7 @@ impl<T: Part> Multipart<T> {
.collect::<Vec<&str>>();
for part in parts {
- if part.is_empty() || part == "--" {
+ if part.is_empty() || part.starts_with("--") {
continue;
}
@@ -300,7 +307,7 @@ impl MixedPart {
}
/// Consume a mixed part to build a response.
- pub fn into_response(mut self) -> Response<AsyncBody> {
+ pub fn into_response(mut self) -> Response<IncomingAsyncBody> {
let mut builder = Response::builder();
builder = builder.status(self.status_code.unwrap_or(StatusCode::OK));
@@ -308,10 +315,10 @@ impl MixedPart {
// Swap headers directly instead of copy the entire map.
mem::swap(builder.headers_mut().unwrap(), &mut self.headers);
- let body = match self.content.is_empty() {
- true => AsyncBody::Empty,
- false => AsyncBody::Bytes(self.content),
- };
+ let bs: Bytes = self.content;
+ let length = bs.len();
+ let body =
+ IncomingAsyncBody::new(Box::new(stream::iter(vec![Ok(bs)])),
Some(length as u64));
builder
.body(body)
diff --git a/core/src/services/gcs/backend.rs b/core/src/services/gcs/backend.rs
index 08062a450..93161637a 100644
--- a/core/src/services/gcs/backend.rs
+++ b/core/src/services/gcs/backend.rs
@@ -427,6 +427,8 @@ impl Accessor for GcsBackend {
list_with_delimiter_slash: true,
list_without_delimiter: true,
+ batch: true,
+ batch_max_operations: Some(100),
presign: true,
presign_stat: true,
presign_read: true,
@@ -557,6 +559,65 @@ impl Accessor for GcsBackend {
))
}
+ async fn batch(&self, args: OpBatch) -> Result<RpBatch> {
+ let ops = args.into_operation();
+ if ops.len() > 100 {
+ return Err(Error::new(
+ ErrorKind::Unsupported,
+ "gcs services only allow delete less than 100 keys at once",
+ )
+ .with_context("length", ops.len().to_string()));
+ }
+
+ let paths: Vec<String> = ops.into_iter().map(|(p, _)| p).collect();
+ let resp = self.core.gcs_delete_objects(paths.clone()).await?;
+
+ let status = resp.status();
+
+ if let StatusCode::OK = status {
+ let content_type =
parse_content_type(resp.headers())?.ok_or_else(|| {
+ Error::new(
+ ErrorKind::Unexpected,
+ "gcs batch delete response content type is empty",
+ )
+ })?;
+ let boundary = content_type
+ .strip_prefix("multipart/mixed; boundary=")
+ .ok_or_else(|| {
+ Error::new(
+ ErrorKind::Unexpected,
+ "gcs batch delete response content type is not
multipart/mixed",
+ )
+ })?
+ .trim_matches('"');
+ let multipart: Multipart<MixedPart> = Multipart::new()
+ .with_boundary(boundary)
+ .parse(resp.into_body().bytes().await?)?;
+ let parts = multipart.into_parts();
+
+ let mut batched_result = Vec::with_capacity(parts.len());
+
+ for (i, part) in parts.into_iter().enumerate() {
+ let resp = part.into_response();
+ // TODO: maybe we can take it directly?
+ let path = paths[i].clone();
+
+ // deleting not existing objects is ok
+ if resp.status().is_success() || resp.status() ==
StatusCode::NOT_FOUND {
+ batched_result.push((path,
Ok(RpDelete::default().into())));
+ } else {
+ batched_result.push((path, Err(parse_error(resp).await?)));
+ }
+ }
+
+ Ok(RpBatch::new(batched_result))
+ } else {
+ // If the overall request isn't formatted correctly and Cloud
Storage is unable to parse it into sub-requests, you receive a 400 error.
+ // Otherwise, Cloud Storage returns a 200 status code, even if
some or all of the sub-requests fail.
+ Err(parse_error(resp).await?)
+ }
+ }
+
async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
// We will not send this request out, just for signing.
let mut req = match args.operation() {
diff --git a/core/src/services/gcs/core.rs b/core/src/services/gcs/core.rs
index 7ebe25dbe..e29402db7 100644
--- a/core/src/services/gcs/core.rs
+++ b/core/src/services/gcs/core.rs
@@ -371,6 +371,13 @@ impl GcsCore {
}
pub async fn gcs_delete_object(&self, path: &str) ->
Result<Response<IncomingAsyncBody>> {
+ let mut req = self.gcs_delete_object_request(path)?;
+
+ self.sign(&mut req).await?;
+ self.send(req).await
+ }
+
+ pub fn gcs_delete_object_request(&self, path: &str) ->
Result<Request<AsyncBody>> {
let p = build_abs_path(&self.root, path);
let url = format!(
@@ -380,12 +387,31 @@ impl GcsCore {
percent_encode_path(&p)
);
- let mut req = Request::delete(&url)
+ Request::delete(&url)
.body(AsyncBody::Empty)
- .map_err(new_request_build_error)?;
+ .map_err(new_request_build_error)
+ }
- self.sign(&mut req).await?;
+ pub async fn gcs_delete_objects(
+ &self,
+ paths: Vec<String>,
+ ) -> Result<Response<IncomingAsyncBody>> {
+ let uri = format!("{}/batch/storage/v1", self.endpoint);
+
+ let mut multipart = Multipart::new();
+
+ for (idx, path) in paths.iter().enumerate() {
+ let req = self.gcs_delete_object_request(path)?;
+ multipart = multipart.part(
+
MixedPart::from_request(req).part_header("content-id".parse().unwrap(),
idx.into()),
+ );
+ }
+
+ let req = Request::post(uri);
+ let mut req = multipart.apply(req)?;
+
+ self.sign(&mut req).await?;
self.send(req).await
}