This is an automated email from the ASF dual-hosted git repository. xuanwo pushed a commit to branch multipart-parser in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
commit 40c6790b7c7b6dfa10e5199cf99e1589f545b795 Author: Xuanwo <[email protected]> AuthorDate: Wed Jun 7 20:59:04 2023 +0800 Implement Signed-off-by: Xuanwo <[email protected]> --- core/src/raw/http_util/multipart.rs | 414 +++++++++++++++++++++++++++++++----- 1 file changed, 355 insertions(+), 59 deletions(-) diff --git a/core/src/raw/http_util/multipart.rs b/core/src/raw/http_util/multipart.rs index b9d5a6db6..d364a4416 100644 --- a/core/src/raw/http_util/multipart.rs +++ b/core/src/raw/http_util/multipart.rs @@ -15,11 +15,9 @@ // specific language governing permissions and limitations // under the License. -use std::io::BufRead; use std::mem; use std::str::FromStr; -use bytes::Buf; use bytes::Bytes; use bytes::BytesMut; use http::header::CONTENT_DISPOSITION; @@ -31,6 +29,8 @@ use http::HeaderName; use http::HeaderValue; use http::Method; use http::Request; +use http::Response; +use http::StatusCode; use http::Uri; use http::Version; @@ -61,8 +61,7 @@ impl<T: Part> Multipart<T> { } /// Set the boundary with given string. - #[cfg(test)] - fn with_boundary(mut self, boundary: &str) -> Self { + pub fn with_boundary(mut self, boundary: &str) -> Self { self.boundary = boundary.to_string(); self } @@ -73,6 +72,31 @@ impl<T: Part> Multipart<T> { self } + /// Parse a response with multipart body into Multipart. + pub fn parse(mut self, bs: Bytes) -> Result<Self> { + let s = String::from_utf8(bs.to_vec()).map_err(|err| { + Error::new( + ErrorKind::Unexpected, + "multipart response contains invalid utf-8 chars", + ) + .set_source(err) + })?; + + let parts = s + .split(format!("--{}", self.boundary).as_str()) + .collect::<Vec<&str>>(); + + for part in parts { + if part.is_empty() || part == "--" { + continue; + } + + self.parts.push(T::parse(part)?); + } + + Ok(self) + } + pub(crate) fn build(&self) -> Bytes { let mut bs = BytesMut::new(); @@ -126,7 +150,7 @@ pub trait Part: Sized { fn format(&self) -> Bytes; /// parse will parse the bytes into a part. - fn parse(bs: Bytes) -> Result<Self>; + fn parse(s: &str) -> Result<Self>; } /// FormDataPart is a builder for multipart/form-data part. @@ -191,7 +215,7 @@ impl Part for FormDataPart { bs.freeze() } - fn parse(_: Bytes) -> Result<Self> { + fn parse(_: &str) -> Result<Self> { Err(Error::new( ErrorKind::Unsupported, "parse of form-data is not supported", @@ -201,14 +225,21 @@ impl Part for FormDataPart { /// MixedPart is a builder for multipart/mixed part. #[derive(Debug)] +#[cfg_attr(test, derive(Eq, PartialEq))] pub struct MixedPart { part_headers: HeaderMap, - method: Method, - uri: Uri, + /// Common version: Version, headers: HeaderMap, content: Bytes, + + /// Request only + method: Option<Method>, + uri: Option<Uri>, + + /// Response only + status_code: Option<StatusCode>, } impl MixedPart { @@ -222,19 +253,19 @@ impl MixedPart { Self { part_headers, - method: Method::GET, - uri, + version: Version::HTTP_11, headers: HeaderMap::new(), content: Bytes::new(), + + uri: Some(uri), + method: None, + + status_code: None, } } /// Build a mixed part from a request. - /// - /// # Notes - /// - /// Mixed parts only takes the path from the request uri. pub fn from_request(req: Request<AsyncBody>) -> Self { let mut part_headers = HeaderMap::new(); part_headers.insert(CONTENT_TYPE, "application/http".parse().unwrap()); @@ -249,21 +280,44 @@ impl MixedPart { Self { part_headers, - method: parts.method, - uri: Uri::from_str( - parts - .uri - .path_and_query() - .unwrap_or(&PathAndQuery::from_static("/")) - .as_str(), - ) - .expect("the uri used to build a mixed part must be valid"), + uri: Some( + Uri::from_str( + parts + .uri + .path_and_query() + .unwrap_or(&PathAndQuery::from_static("/")) + .as_str(), + ) + .expect("the uri used to build a mixed part must be valid"), + ), version: parts.version, headers: parts.headers, content, + + method: Some(parts.method), + status_code: None, } } + /// Consume a mixed part to build a response. + pub fn into_response(mut self) -> Response<AsyncBody> { + let mut builder = Response::builder(); + + builder = builder.status(self.status_code.unwrap_or(StatusCode::OK)); + builder = builder.version(self.version); + // Swap headers directly instead of copy the entire map. + mem::swap(builder.headers_mut().unwrap(), &mut self.headers); + + let body = match self.content.is_empty() { + true => AsyncBody::Empty, + false => AsyncBody::Bytes(self.content), + }; + + builder + .body(body) + .expect("mixed part must be valid response") + } + /// Insert a part header into part. pub fn part_header(mut self, key: HeaderName, value: HeaderValue) -> Self { self.part_headers.insert(key, value); @@ -272,7 +326,7 @@ impl MixedPart { /// Set the method for request in this part. pub fn method(mut self, method: Method) -> Self { - self.method = method; + self.method = Some(method); self } @@ -329,9 +383,21 @@ impl Part for MixedPart { // Write request line: `DELETE /container0/blob0 HTTP/1.1` bs.extend_from_slice(b"\r\n"); - bs.extend_from_slice(self.method.as_str().as_bytes()); + bs.extend_from_slice( + self.method + .as_ref() + .expect("mixed part must be a valid request that contains method") + .as_str() + .as_bytes(), + ); bs.extend_from_slice(b" "); - bs.extend_from_slice(self.uri.path().as_bytes()); + bs.extend_from_slice( + self.uri + .as_ref() + .expect("mixed part must be a valid request that contains uri") + .path() + .as_bytes(), + ); bs.extend_from_slice(b" "); bs.extend_from_slice(format!("{:?}", self.version).as_bytes()); bs.extend_from_slice(b"\r\n"); @@ -354,45 +420,89 @@ impl Part for MixedPart { bs.freeze() } - fn parse(bs: Bytes) -> Result<Self> { - let mut r = bs.reader(); - let mut buf = String::new(); + /// TODO + /// + /// This is a simple implementation and have a lot of space to improve. + fn parse(s: &str) -> Result<Self> { + let parts = s.splitn(2, "\r\n\r\n").collect::<Vec<&str>>(); + let part_headers_content = parts[0]; + let http_response = parts.get(1).unwrap_or(&""); let mut part_headers = HeaderMap::new(); - loop { - let n = r.read_line(&mut buf)?; - // Read an unexpected EOF - if n == 0 { - return Err( - Error::new(ErrorKind::Unexpected, "unexpected end of multipart") - .with_operation("Multipart::parse"), - ); - } - // Read a `\r\n`, which means the end of headers. - if n == 1 && buf == "\r" { - break; - } - // Read a `\n`, keep going. - if n == 1 { - continue; + for line in part_headers_content.lines() { + let parts = line.splitn(2, ": ").collect::<Vec<&str>>(); + if parts.len() == 2 { + let header_name = HeaderName::from_str(parts[0]).map_err(|err| { + Error::new( + ErrorKind::Unexpected, + "multipart response contains invalid part header name", + ) + .set_source(err) + })?; + let header_value = parts[1].parse().map_err(|err| { + Error::new( + ErrorKind::Unexpected, + "multipart response contains invalid part header value", + ) + .set_source(err) + })?; + + part_headers.insert(header_name, header_value); } + } - let (header_name, header_value) = buf.split_once(": ").ok_or({ - Error::new(ErrorKind::Unexpected, "invalid header format") - .with_operation("Multipart::parse") - })?; - part_headers.insert( - HeaderName::from_str(header_name).map_err(|err| { - Error::new(ErrorKind::Unexpected, "invalid header name").set_source(err) - })?, - HeaderValue::from_str(header_value).map_err(|err| { - Error::new(ErrorKind::Unexpected, "invalid header value").set_source(err) - })?, - ); + let parts = http_response.split("\r\n\r\n").collect::<Vec<&str>>(); + let headers_content = parts[0]; + let body_content = parts.get(1).unwrap_or(&""); + + let status_line = headers_content.lines().next().unwrap_or(""); + let status_code = status_line + .split_whitespace() + .nth(1) + .unwrap_or("") + .parse::<u16>() + .unwrap_or(200); + + let mut headers = HeaderMap::new(); + for line in headers_content.lines().skip(1) { + let parts = line.splitn(2, ": ").collect::<Vec<&str>>(); + if parts.len() == 2 { + let header_name = HeaderName::from_str(parts[0]).map_err(|err| { + Error::new( + ErrorKind::Unexpected, + "multipart response contains invalid part header name", + ) + .set_source(err) + })?; + let header_value = parts[1].parse().map_err(|err| { + Error::new( + ErrorKind::Unexpected, + "multipart response contains invalid part header value", + ) + .set_source(err) + })?; - buf.clear(); + headers.insert(header_name, header_value); + } } - todo!() + + Ok(Self { + part_headers, + version: Version::HTTP_11, + headers, + content: Bytes::from(body_content.to_string()), + + method: None, + uri: None, + + status_code: Some(StatusCode::from_u16(status_code).map_err(|err| { + Error::new( + ErrorKind::Unexpected, + "multipart response contains invalid status code", + ) + .set_source(err) + })?), + }) } } @@ -709,4 +819,190 @@ content-length: 0 .replace("\r\n", "\n") ); } + + /// This test is inspired by <https://cloud.google.com/storage/docs/batch> + #[test] + fn test_multipart_mixed_gcs_batch_metadata_response() { + let response = r#"--batch_pK7JBAk73-E=_AA5eFwv4m2Q= +Content-Type: application/http +Content-ID: <response-b29c5de2-0db4-490b-b421-6a51b598bd22+1> + +HTTP/1.1 200 OK +ETag: "lGaP-E0memYDumK16YuUDM_6Gf0/V43j6azD55CPRGb9b6uytDYl61Y" +Content-Type: application/json; charset=UTF-8 +Date: Mon, 22 Jan 2018 18:56:00 GMT +Expires: Mon, 22 Jan 2018 18:56:00 GMT +Cache-Control: private, max-age=0 +Content-Length: 846 + +{"kind": "storage#object","id": "example-bucket/obj1/1495822576643790","metadata": {"type": "tabby"}} + +--batch_pK7JBAk73-E=_AA5eFwv4m2Q= +Content-Type: application/http +Content-ID: <response-b29c5de2-0db4-490b-b421-6a51b598bd22+2> + +HTTP/1.1 200 OK +ETag: "lGaP-E0memYDumK16YuUDM_6Gf0/91POdd-sxSAkJnS8Dm7wMxBSDKk" +Content-Type: application/json; charset=UTF-8 +Date: Mon, 22 Jan 2018 18:56:00 GMT +Expires: Mon, 22 Jan 2018 18:56:00 GMT +Cache-Control: private, max-age=0 +Content-Length: 846 + +{"kind": "storage#object","id": "example-bucket/obj2/1495822576643790","metadata": {"type": "tuxedo"}} + +--batch_pK7JBAk73-E=_AA5eFwv4m2Q= +Content-Type: application/http +Content-ID: <response-b29c5de2-0db4-490b-b421-6a51b598bd22+3> + +HTTP/1.1 200 OK +ETag: "lGaP-E0memYDumK16YuUDM_6Gf0/d2Z1F1_ZVbB1dC0YKM9rX5VAgIQ" +Content-Type: application/json; charset=UTF-8 +Date: Mon, 22 Jan 2018 18:56:00 GMT +Expires: Mon, 22 Jan 2018 18:56:00 GMT +Cache-Control: private, max-age=0 +Content-Length: 846 + +{"kind": "storage#object","id": "example-bucket/obj3/1495822576643790","metadata": {"type": "calico"}} + +--batch_pK7JBAk73-E=_AA5eFwv4m2Q=--"#.replace('\n', "\r\n"); + + let multipart: Multipart<MixedPart> = Multipart::new() + .with_boundary("batch_pK7JBAk73-E=_AA5eFwv4m2Q=") + .parse(Bytes::from(response)) + .unwrap(); + + assert_eq!(multipart.parts.len(), 3); + assert_eq!( + multipart.parts[0], + MixedPart { + part_headers: { + let mut h = HeaderMap::new(); + h.insert("Content-Type", "application/http".parse().unwrap()); + h.insert( + "Content-ID", + "<response-b29c5de2-0db4-490b-b421-6a51b598bd22+1>" + .parse() + .unwrap(), + ); + + h + }, + version: Version::HTTP_11, + headers: { + let mut h = HeaderMap::new(); + h.insert( + "ETag", + "\"lGaP-E0memYDumK16YuUDM_6Gf0/V43j6azD55CPRGb9b6uytDYl61Y\"" + .parse() + .unwrap(), + ); + h.insert( + "Content-Type", + "application/json; charset=UTF-8".parse().unwrap(), + ); + h.insert("Date", "Mon, 22 Jan 2018 18:56:00 GMT".parse().unwrap()); + h.insert("Expires", "Mon, 22 Jan 2018 18:56:00 GMT".parse().unwrap()); + h.insert("Cache-Control", "private, max-age=0".parse().unwrap()); + h.insert("Content-Length", "846".parse().unwrap()); + + h + }, + content: Bytes::from_static( + r#"{"kind": "storage#object","id": "example-bucket/obj1/1495822576643790","metadata": {"type": "tabby"}}"# + .as_bytes() + ), + uri: None, + method: None, + status_code: Some(StatusCode::from_u16(200).unwrap()) + } + ); + assert_eq!( + multipart.parts[1], + MixedPart { + part_headers: { + let mut h = HeaderMap::new(); + h.insert("Content-Type", "application/http".parse().unwrap()); + h.insert( + "Content-ID", + "<response-b29c5de2-0db4-490b-b421-6a51b598bd22+2>" + .parse() + .unwrap(), + ); + + h + }, + version: Version::HTTP_11, + headers: { + let mut h = HeaderMap::new(); + h.insert( + "ETag", + "\"lGaP-E0memYDumK16YuUDM_6Gf0/91POdd-sxSAkJnS8Dm7wMxBSDKk\"" + .parse() + .unwrap(), + ); + h.insert( + "Content-Type", + "application/json; charset=UTF-8".parse().unwrap(), + ); + h.insert("Date", "Mon, 22 Jan 2018 18:56:00 GMT".parse().unwrap()); + h.insert("Expires", "Mon, 22 Jan 2018 18:56:00 GMT".parse().unwrap()); + h.insert("Cache-Control", "private, max-age=0".parse().unwrap()); + h.insert("Content-Length", "846".parse().unwrap()); + + h + }, + content: Bytes::from_static( + r#"{"kind": "storage#object","id": "example-bucket/obj2/1495822576643790","metadata": {"type": "tuxedo"}}"# + .as_bytes() + ), + uri: None, + method: None, + status_code: Some(StatusCode::from_u16(200).unwrap()) + } + ); + assert_eq!( + multipart.parts[2], + MixedPart { + part_headers: { + let mut h = HeaderMap::new(); + h.insert("Content-Type", "application/http".parse().unwrap()); + h.insert( + "Content-ID", + "<response-b29c5de2-0db4-490b-b421-6a51b598bd22+3>" + .parse() + .unwrap(), + ); + + h + }, + version: Version::HTTP_11, + headers: { + let mut h = HeaderMap::new(); + h.insert( + "ETag", + "\"lGaP-E0memYDumK16YuUDM_6Gf0/d2Z1F1_ZVbB1dC0YKM9rX5VAgIQ\"" + .parse() + .unwrap(), + ); + h.insert( + "Content-Type", + "application/json; charset=UTF-8".parse().unwrap(), + ); + h.insert("Date", "Mon, 22 Jan 2018 18:56:00 GMT".parse().unwrap()); + h.insert("Expires", "Mon, 22 Jan 2018 18:56:00 GMT".parse().unwrap()); + h.insert("Cache-Control", "private, max-age=0".parse().unwrap()); + h.insert("Content-Length", "846".parse().unwrap()); + + h + }, + content: Bytes::from_static( + r#"{"kind": "storage#object","id": "example-bucket/obj3/1495822576643790","metadata": {"type": "calico"}}"# + .as_bytes() + ), + uri: None, + method: None, + status_code: Some(StatusCode::from_u16(200).unwrap()) + }); + } }
