This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
The following commit(s) were added to refs/heads/main by this push:
new da6b45f61 refactor(services/azblob): instead
`parse_batch_delete_response` with `Multipart::parse` (#3071)
da6b45f61 is described below
commit da6b45f61a1f0ce33459034ac6b57bf1c8a8da36
Author: G-XD <[email protected]>
AuthorDate: Thu Sep 14 23:05:39 2023 +0800
refactor(services/azblob): instead `parse_batch_delete_response` with
`Multipart::parse` (#3071)
* refactor(services/azblob): instead parse_batch_delete_response with
Multipart::parse
* refactor(services/azblob): del unused func
---
core/src/services/azblob/backend.rs | 34 +++++---
core/src/services/azblob/batch.rs | 166 ------------------------------------
core/src/services/azblob/error.rs | 22 -----
core/src/services/azblob/mod.rs | 1 -
4 files changed, 23 insertions(+), 200 deletions(-)
diff --git a/core/src/services/azblob/backend.rs
b/core/src/services/azblob/backend.rs
index 7e5fe259e..3ba390c13 100644
--- a/core/src/services/azblob/backend.rs
+++ b/core/src/services/azblob/backend.rs
@@ -32,7 +32,6 @@ use reqsign::AzureStorageSigner;
use sha2::Digest;
use sha2::Sha256;
-use super::batch::parse_batch_delete_response;
use super::error::parse_error;
use super::pager::AzblobPager;
use super::writer::AzblobWriter;
@@ -742,18 +741,31 @@ impl Accessor for AzblobBackend {
)
})?;
- let body = resp.into_body().bytes().await?;
- let body = String::from_utf8(body.to_vec()).map_err(|e| {
- Error::new(
+ let multipart: Multipart<MixedPart> = Multipart::new()
+ .with_boundary(boundary)
+ .parse(resp.into_body().bytes().await?)?;
+ let parts = multipart.into_parts();
+
+ if paths.len() != parts.len() {
+ return Err(Error::new(
ErrorKind::Unexpected,
- &format!("get invalid batch response {e:?}"),
- )
- })?;
+ "invalid batch response, paths and response parts don't match",
+ ));
+ }
+
+ let mut results = Vec::with_capacity(parts.len());
+
+ for (i, part) in parts.into_iter().enumerate() {
+ let resp = part.into_response();
+ let path = paths[i].clone();
- let results = parse_batch_delete_response(boundary, body, paths)?
- .into_iter()
- .map(|(path, rp)| (path, rp.map(|v| v.into())))
- .collect();
+ // deleting not existing objects is ok
+ if resp.status() == StatusCode::ACCEPTED || resp.status() ==
StatusCode::NOT_FOUND {
+ results.push((path, Ok(RpDelete::default().into())));
+ } else {
+ results.push((path, Err(parse_error(resp).await?)));
+ }
+ }
Ok(RpBatch::new(results))
}
}
diff --git a/core/src/services/azblob/batch.rs
b/core/src/services/azblob/batch.rs
deleted file mode 100644
index c6004be02..000000000
--- a/core/src/services/azblob/batch.rs
+++ /dev/null
@@ -1,166 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use http::StatusCode;
-
-use super::error::parse_http_error;
-use crate::raw::*;
-use crate::*;
-
-pub(super) fn parse_batch_delete_response(
- boundary: &str,
- body: String,
- expect: Vec<String>,
-) -> Result<Vec<(String, Result<RpDelete>)>> {
- let mut reps = Vec::with_capacity(expect.len());
-
- let mut resp_packs: Vec<&str> =
body.trim().split(&format!("--{boundary}")).collect();
- if resp_packs.len() != (expect.len() + 2) {
- return Err(Error::new(
- ErrorKind::Unexpected,
- "invalid batch delete response",
- ));
- }
- // drop the tail
- resp_packs.pop();
- for (resp_pack, name) in resp_packs[1..].iter().zip(expect.into_iter()) {
- // the http body use CRLF (\r\n) instead of LF (\n)
- // split the body at double CRLF
- let split: Vec<&str> = resp_pack.split("\r\n\r\n").collect();
-
- let header: Vec<&str> = split
- .get(1)
- .ok_or_else(|| Error::new(ErrorKind::Unexpected, "Empty item in
batch response"))?
- .trim()
- .split_ascii_whitespace()
- .collect();
-
- let status_code = header
- .get(1)
- .ok_or_else(|| {
- Error::new(
- ErrorKind::Unexpected,
- "cannot find status code of HTTP response item!",
- )
- })?
- .parse::<u16>()
- .map_err(|e| {
- Error::new(
- ErrorKind::Unexpected,
- &format!("invalid status code: {:?}", e),
- )
- })?
- .try_into()
- .map_err(|e| {
- Error::new(
- ErrorKind::Unexpected,
- &format!("invalid status code: {:?}", e),
- )
- })?;
-
- let rep = match status_code {
- StatusCode::ACCEPTED | StatusCode::NOT_FOUND => (name,
Ok(RpDelete::default())),
- s => {
- let body = split.get(1).ok_or_else(|| {
- Error::new(ErrorKind::Unexpected, "Empty HTTP error
response")
- })?;
- let err = parse_http_error(s, body)?;
- (name, Err(err))
- }
- };
- reps.push(rep)
- }
- Ok(reps)
-}
-
-#[cfg(test)]
-mod test {
- use super::*;
-
- #[test]
- fn test_break_down_batch() {
- // the last item in batch is a mocked response.
- // if stronger validation is implemented for Azblob,
- // feel free to replace or remove it.
- let body = r#"--batchresponse_66925647-d0cb-4109-b6d3-28efe3e1e5ed
-Content-Type: application/http
-Content-ID: 0
-
-HTTP/1.1 202 Accepted
-x-ms-delete-type-permanent: true
-x-ms-request-id: 778fdc83-801e-0000-62ff-0334671e284f
-x-ms-version: 2018-11-09
-
---batchresponse_66925647-d0cb-4109-b6d3-28efe3e1e5ed
-Content-Type: application/http
-Content-ID: 1
-
-HTTP/1.1 202 Accepted
-x-ms-delete-type-permanent: true
-x-ms-request-id: 778fdc83-801e-0000-62ff-0334671e2851
-x-ms-version: 2018-11-09
-
---batchresponse_66925647-d0cb-4109-b6d3-28efe3e1e5ed
-Content-Type: application/http
-Content-ID: 2
-
-HTTP/1.1 404 The specified blob does not exist.
-x-ms-error-code: BlobNotFound
-x-ms-request-id: 778fdc83-801e-0000-62ff-0334671e2852
-x-ms-version: 2018-11-09
-Content-Length: 216
-Content-Type: application/xml
-
-<?xml version="1.0" encoding="utf-8"?>
-<Error><Code>BlobNotFound</Code><Message>The specified blob does not exist.
-RequestId:778fdc83-801e-0000-62ff-0334671e2852
-Time:2018-06-14T16:46:54.6040685Z</Message></Error>
-
---batchresponse_66925647-d0cb-4109-b6d3-28efe3e1e5ed
-Content-Type: application/http
-Content-ID: 3
-
-HTTP/1.1 403 Request to blob forbidden
-x-ms-error-code: BlobForbidden
-x-ms-request-id: 778fdc83-801e-0000-62ff-0334671e2852
-x-ms-version: 2018-11-09
-Content-Length: 216
-Content-Type: application/xml
-
-<?xml version="1.0" encoding="utf-8"?>
-<Error><Code>BlobNotFound</Code><Message>The specified blob does not exist.
-RequestId:778fdc83-801e-0000-62ff-0334671e2852
-Time:2018-06-14T16:46:54.6040685Z</Message></Error>
---batchresponse_66925647-d0cb-4109-b6d3-28efe3e1e5ed--"#
- .replace('\n', "\r\n");
-
- let expected: Vec<_> = (0..=3).map(|n|
format!("/to-del/{n}")).collect();
- let boundary = "batchresponse_66925647-d0cb-4109-b6d3-28efe3e1e5ed";
- let p =
- parse_batch_delete_response(boundary, body,
expected.clone()).expect("must_success");
- assert_eq!(p.len(), expected.len());
- for (idx, ((del, rep), to_del)) in
p.into_iter().zip(expected.into_iter()).enumerate() {
- assert_eq!(del, to_del);
-
- if idx != 3 {
- assert!(rep.is_ok());
- } else {
- assert!(rep.is_err());
- }
- }
- }
-}
diff --git a/core/src/services/azblob/error.rs
b/core/src/services/azblob/error.rs
index 80049d571..58b744134 100644
--- a/core/src/services/azblob/error.rs
+++ b/core/src/services/azblob/error.rs
@@ -60,28 +60,6 @@ impl Debug for AzblobError {
}
}
-pub fn parse_http_error(status: StatusCode, body: &str) -> Result<Error> {
- let (kind, retryable) = match status {
- StatusCode::FORBIDDEN => (ErrorKind::PermissionDenied, false),
- StatusCode::INTERNAL_SERVER_ERROR
- | StatusCode::BAD_GATEWAY
- | StatusCode::SERVICE_UNAVAILABLE
- | StatusCode::GATEWAY_TIMEOUT => (ErrorKind::Unexpected, true),
- _ => (ErrorKind::Unexpected, false),
- };
- let message = match de::from_str::<AzblobError>(body) {
- Ok(err) => format!("{err:?}"),
- Err(_) => body.to_string(),
- };
- let mut err = Error::new(kind, &message).with_context("response",
body.to_string());
-
- if retryable {
- err = err.set_temporary();
- }
-
- Ok(err)
-}
-
/// Parse error response into Error.
pub async fn parse_error(resp: Response<IncomingAsyncBody>) -> Result<Error> {
let (parts, body) = resp.into_parts();
diff --git a/core/src/services/azblob/mod.rs b/core/src/services/azblob/mod.rs
index 7edcfde3d..8170b176b 100644
--- a/core/src/services/azblob/mod.rs
+++ b/core/src/services/azblob/mod.rs
@@ -18,7 +18,6 @@
mod backend;
pub use backend::AzblobBuilder as Azblob;
-mod batch;
mod core;
mod error;
mod pager;