This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch multipart-parser
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git

commit 40c6790b7c7b6dfa10e5199cf99e1589f545b795
Author: Xuanwo <[email protected]>
AuthorDate: Wed Jun 7 20:59:04 2023 +0800

    Implement
    
    Signed-off-by: Xuanwo <[email protected]>
---
 core/src/raw/http_util/multipart.rs | 414 +++++++++++++++++++++++++++++++-----
 1 file changed, 355 insertions(+), 59 deletions(-)

diff --git a/core/src/raw/http_util/multipart.rs 
b/core/src/raw/http_util/multipart.rs
index b9d5a6db6..d364a4416 100644
--- a/core/src/raw/http_util/multipart.rs
+++ b/core/src/raw/http_util/multipart.rs
@@ -15,11 +15,9 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::io::BufRead;
 use std::mem;
 use std::str::FromStr;
 
-use bytes::Buf;
 use bytes::Bytes;
 use bytes::BytesMut;
 use http::header::CONTENT_DISPOSITION;
@@ -31,6 +29,8 @@ use http::HeaderName;
 use http::HeaderValue;
 use http::Method;
 use http::Request;
+use http::Response;
+use http::StatusCode;
 use http::Uri;
 use http::Version;
 
@@ -61,8 +61,7 @@ impl<T: Part> Multipart<T> {
     }
 
     /// Set the boundary with given string.
-    #[cfg(test)]
-    fn with_boundary(mut self, boundary: &str) -> Self {
+    pub fn with_boundary(mut self, boundary: &str) -> Self {
         self.boundary = boundary.to_string();
         self
     }
@@ -73,6 +72,31 @@ impl<T: Part> Multipart<T> {
         self
     }
 
+    /// Parse a response with multipart body into Multipart.
+    pub fn parse(mut self, bs: Bytes) -> Result<Self> {
+        let s = String::from_utf8(bs.to_vec()).map_err(|err| {
+            Error::new(
+                ErrorKind::Unexpected,
+                "multipart response contains invalid utf-8 chars",
+            )
+            .set_source(err)
+        })?;
+
+        let parts = s
+            .split(format!("--{}", self.boundary).as_str())
+            .collect::<Vec<&str>>();
+
+        for part in parts {
+            if part.is_empty() || part == "--" {
+                continue;
+            }
+
+            self.parts.push(T::parse(part)?);
+        }
+
+        Ok(self)
+    }
+
     pub(crate) fn build(&self) -> Bytes {
         let mut bs = BytesMut::new();
 
@@ -126,7 +150,7 @@ pub trait Part: Sized {
     fn format(&self) -> Bytes;
 
     /// parse will parse the bytes into a part.
-    fn parse(bs: Bytes) -> Result<Self>;
+    fn parse(s: &str) -> Result<Self>;
 }
 
 /// FormDataPart is a builder for multipart/form-data part.
@@ -191,7 +215,7 @@ impl Part for FormDataPart {
         bs.freeze()
     }
 
-    fn parse(_: Bytes) -> Result<Self> {
+    fn parse(_: &str) -> Result<Self> {
         Err(Error::new(
             ErrorKind::Unsupported,
             "parse of form-data is not supported",
@@ -201,14 +225,21 @@ impl Part for FormDataPart {
 
 /// MixedPart is a builder for multipart/mixed part.
 #[derive(Debug)]
+#[cfg_attr(test, derive(Eq, PartialEq))]
 pub struct MixedPart {
     part_headers: HeaderMap,
 
-    method: Method,
-    uri: Uri,
+    /// Common
     version: Version,
     headers: HeaderMap,
     content: Bytes,
+
+    /// Request only
+    method: Option<Method>,
+    uri: Option<Uri>,
+
+    /// Response only
+    status_code: Option<StatusCode>,
 }
 
 impl MixedPart {
@@ -222,19 +253,19 @@ impl MixedPart {
 
         Self {
             part_headers,
-            method: Method::GET,
-            uri,
+
             version: Version::HTTP_11,
             headers: HeaderMap::new(),
             content: Bytes::new(),
+
+            uri: Some(uri),
+            method: None,
+
+            status_code: None,
         }
     }
 
     /// Build a mixed part from a request.
-    ///
-    /// # Notes
-    ///
-    /// Mixed parts only takes the path from the request uri.
     pub fn from_request(req: Request<AsyncBody>) -> Self {
         let mut part_headers = HeaderMap::new();
         part_headers.insert(CONTENT_TYPE, "application/http".parse().unwrap());
@@ -249,21 +280,44 @@ impl MixedPart {
 
         Self {
             part_headers,
-            method: parts.method,
-            uri: Uri::from_str(
-                parts
-                    .uri
-                    .path_and_query()
-                    .unwrap_or(&PathAndQuery::from_static("/"))
-                    .as_str(),
-            )
-            .expect("the uri used to build a mixed part must be valid"),
+            uri: Some(
+                Uri::from_str(
+                    parts
+                        .uri
+                        .path_and_query()
+                        .unwrap_or(&PathAndQuery::from_static("/"))
+                        .as_str(),
+                )
+                .expect("the uri used to build a mixed part must be valid"),
+            ),
             version: parts.version,
             headers: parts.headers,
             content,
+
+            method: Some(parts.method),
+            status_code: None,
         }
     }
 
+    /// Consume a mixed part to build a response.
+    pub fn into_response(mut self) -> Response<AsyncBody> {
+        let mut builder = Response::builder();
+
+        builder = builder.status(self.status_code.unwrap_or(StatusCode::OK));
+        builder = builder.version(self.version);
+        // Swap headers directly instead of copy the entire map.
+        mem::swap(builder.headers_mut().unwrap(), &mut self.headers);
+
+        let body = match self.content.is_empty() {
+            true => AsyncBody::Empty,
+            false => AsyncBody::Bytes(self.content),
+        };
+
+        builder
+            .body(body)
+            .expect("mixed part must be valid response")
+    }
+
     /// Insert a part header into part.
     pub fn part_header(mut self, key: HeaderName, value: HeaderValue) -> Self {
         self.part_headers.insert(key, value);
@@ -272,7 +326,7 @@ impl MixedPart {
 
     /// Set the method for request in this part.
     pub fn method(mut self, method: Method) -> Self {
-        self.method = method;
+        self.method = Some(method);
         self
     }
 
@@ -329,9 +383,21 @@ impl Part for MixedPart {
 
         // Write request line: `DELETE /container0/blob0 HTTP/1.1`
         bs.extend_from_slice(b"\r\n");
-        bs.extend_from_slice(self.method.as_str().as_bytes());
+        bs.extend_from_slice(
+            self.method
+                .as_ref()
+                .expect("mixed part must be a valid request that contains 
method")
+                .as_str()
+                .as_bytes(),
+        );
         bs.extend_from_slice(b" ");
-        bs.extend_from_slice(self.uri.path().as_bytes());
+        bs.extend_from_slice(
+            self.uri
+                .as_ref()
+                .expect("mixed part must be a valid request that contains uri")
+                .path()
+                .as_bytes(),
+        );
         bs.extend_from_slice(b" ");
         bs.extend_from_slice(format!("{:?}", self.version).as_bytes());
         bs.extend_from_slice(b"\r\n");
@@ -354,45 +420,89 @@ impl Part for MixedPart {
         bs.freeze()
     }
 
-    fn parse(bs: Bytes) -> Result<Self> {
-        let mut r = bs.reader();
-        let mut buf = String::new();
+    /// TODO
+    ///
+    /// This is a simple implementation and have a lot of space to improve.
+    fn parse(s: &str) -> Result<Self> {
+        let parts = s.splitn(2, "\r\n\r\n").collect::<Vec<&str>>();
+        let part_headers_content = parts[0];
+        let http_response = parts.get(1).unwrap_or(&"");
 
         let mut part_headers = HeaderMap::new();
-        loop {
-            let n = r.read_line(&mut buf)?;
-            // Read an unexpected EOF
-            if n == 0 {
-                return Err(
-                    Error::new(ErrorKind::Unexpected, "unexpected end of 
multipart")
-                        .with_operation("Multipart::parse"),
-                );
-            }
-            // Read a `\r\n`, which means the end of headers.
-            if n == 1 && buf == "\r" {
-                break;
-            }
-            // Read a `\n`, keep going.
-            if n == 1 {
-                continue;
+        for line in part_headers_content.lines() {
+            let parts = line.splitn(2, ": ").collect::<Vec<&str>>();
+            if parts.len() == 2 {
+                let header_name = HeaderName::from_str(parts[0]).map_err(|err| 
{
+                    Error::new(
+                        ErrorKind::Unexpected,
+                        "multipart response contains invalid part header name",
+                    )
+                    .set_source(err)
+                })?;
+                let header_value = parts[1].parse().map_err(|err| {
+                    Error::new(
+                        ErrorKind::Unexpected,
+                        "multipart response contains invalid part header 
value",
+                    )
+                    .set_source(err)
+                })?;
+
+                part_headers.insert(header_name, header_value);
             }
+        }
 
-            let (header_name, header_value) = buf.split_once(": ").ok_or({
-                Error::new(ErrorKind::Unexpected, "invalid header format")
-                    .with_operation("Multipart::parse")
-            })?;
-            part_headers.insert(
-                HeaderName::from_str(header_name).map_err(|err| {
-                    Error::new(ErrorKind::Unexpected, "invalid header 
name").set_source(err)
-                })?,
-                HeaderValue::from_str(header_value).map_err(|err| {
-                    Error::new(ErrorKind::Unexpected, "invalid header 
value").set_source(err)
-                })?,
-            );
+        let parts = http_response.split("\r\n\r\n").collect::<Vec<&str>>();
+        let headers_content = parts[0];
+        let body_content = parts.get(1).unwrap_or(&"");
+
+        let status_line = headers_content.lines().next().unwrap_or("");
+        let status_code = status_line
+            .split_whitespace()
+            .nth(1)
+            .unwrap_or("")
+            .parse::<u16>()
+            .unwrap_or(200);
+
+        let mut headers = HeaderMap::new();
+        for line in headers_content.lines().skip(1) {
+            let parts = line.splitn(2, ": ").collect::<Vec<&str>>();
+            if parts.len() == 2 {
+                let header_name = HeaderName::from_str(parts[0]).map_err(|err| 
{
+                    Error::new(
+                        ErrorKind::Unexpected,
+                        "multipart response contains invalid part header name",
+                    )
+                    .set_source(err)
+                })?;
+                let header_value = parts[1].parse().map_err(|err| {
+                    Error::new(
+                        ErrorKind::Unexpected,
+                        "multipart response contains invalid part header 
value",
+                    )
+                    .set_source(err)
+                })?;
 
-            buf.clear();
+                headers.insert(header_name, header_value);
+            }
         }
-        todo!()
+
+        Ok(Self {
+            part_headers,
+            version: Version::HTTP_11,
+            headers,
+            content: Bytes::from(body_content.to_string()),
+
+            method: None,
+            uri: None,
+
+            status_code: Some(StatusCode::from_u16(status_code).map_err(|err| {
+                Error::new(
+                    ErrorKind::Unexpected,
+                    "multipart response contains invalid status code",
+                )
+                .set_source(err)
+            })?),
+        })
     }
 }
 
@@ -709,4 +819,190 @@ content-length: 0
                 .replace("\r\n", "\n")
         );
     }
+
+    /// This test is inspired by <https://cloud.google.com/storage/docs/batch>
+    #[test]
+    fn test_multipart_mixed_gcs_batch_metadata_response() {
+        let response = r#"--batch_pK7JBAk73-E=_AA5eFwv4m2Q=
+Content-Type: application/http
+Content-ID: <response-b29c5de2-0db4-490b-b421-6a51b598bd22+1>
+
+HTTP/1.1 200 OK
+ETag: "lGaP-E0memYDumK16YuUDM_6Gf0/V43j6azD55CPRGb9b6uytDYl61Y"
+Content-Type: application/json; charset=UTF-8
+Date: Mon, 22 Jan 2018 18:56:00 GMT
+Expires: Mon, 22 Jan 2018 18:56:00 GMT
+Cache-Control: private, max-age=0
+Content-Length: 846
+
+{"kind": "storage#object","id": 
"example-bucket/obj1/1495822576643790","metadata": {"type": "tabby"}}
+
+--batch_pK7JBAk73-E=_AA5eFwv4m2Q=
+Content-Type: application/http
+Content-ID: <response-b29c5de2-0db4-490b-b421-6a51b598bd22+2>
+
+HTTP/1.1 200 OK
+ETag: "lGaP-E0memYDumK16YuUDM_6Gf0/91POdd-sxSAkJnS8Dm7wMxBSDKk"
+Content-Type: application/json; charset=UTF-8
+Date: Mon, 22 Jan 2018 18:56:00 GMT
+Expires: Mon, 22 Jan 2018 18:56:00 GMT
+Cache-Control: private, max-age=0
+Content-Length: 846
+
+{"kind": "storage#object","id": 
"example-bucket/obj2/1495822576643790","metadata": {"type": "tuxedo"}}
+
+--batch_pK7JBAk73-E=_AA5eFwv4m2Q=
+Content-Type: application/http
+Content-ID: <response-b29c5de2-0db4-490b-b421-6a51b598bd22+3>
+
+HTTP/1.1 200 OK
+ETag: "lGaP-E0memYDumK16YuUDM_6Gf0/d2Z1F1_ZVbB1dC0YKM9rX5VAgIQ"
+Content-Type: application/json; charset=UTF-8
+Date: Mon, 22 Jan 2018 18:56:00 GMT
+Expires: Mon, 22 Jan 2018 18:56:00 GMT
+Cache-Control: private, max-age=0
+Content-Length: 846
+
+{"kind": "storage#object","id": 
"example-bucket/obj3/1495822576643790","metadata": {"type": "calico"}}
+
+--batch_pK7JBAk73-E=_AA5eFwv4m2Q=--"#.replace('\n', "\r\n");
+
+        let multipart: Multipart<MixedPart> = Multipart::new()
+            .with_boundary("batch_pK7JBAk73-E=_AA5eFwv4m2Q=")
+            .parse(Bytes::from(response))
+            .unwrap();
+
+        assert_eq!(multipart.parts.len(), 3);
+        assert_eq!(
+            multipart.parts[0],
+            MixedPart {
+                part_headers: {
+                    let mut h = HeaderMap::new();
+                    h.insert("Content-Type", 
"application/http".parse().unwrap());
+                    h.insert(
+                        "Content-ID",
+                        "<response-b29c5de2-0db4-490b-b421-6a51b598bd22+1>"
+                            .parse()
+                            .unwrap(),
+                    );
+
+                    h
+                },
+                version: Version::HTTP_11,
+                headers: {
+                    let mut h = HeaderMap::new();
+                    h.insert(
+                        "ETag",
+                        
"\"lGaP-E0memYDumK16YuUDM_6Gf0/V43j6azD55CPRGb9b6uytDYl61Y\""
+                            .parse()
+                            .unwrap(),
+                    );
+                    h.insert(
+                        "Content-Type",
+                        "application/json; charset=UTF-8".parse().unwrap(),
+                    );
+                    h.insert("Date", "Mon, 22 Jan 2018 18:56:00 
GMT".parse().unwrap());
+                    h.insert("Expires", "Mon, 22 Jan 2018 18:56:00 
GMT".parse().unwrap());
+                    h.insert("Cache-Control", "private, 
max-age=0".parse().unwrap());
+                    h.insert("Content-Length", "846".parse().unwrap());
+
+                    h
+                },
+                content: Bytes::from_static(
+                    r#"{"kind": "storage#object","id": 
"example-bucket/obj1/1495822576643790","metadata": {"type": "tabby"}}"#
+                    .as_bytes()
+                ),
+                uri: None,
+                method: None,
+                status_code: Some(StatusCode::from_u16(200).unwrap())
+            }
+        );
+        assert_eq!(
+            multipart.parts[1],
+            MixedPart {
+                part_headers: {
+                    let mut h = HeaderMap::new();
+                    h.insert("Content-Type", 
"application/http".parse().unwrap());
+                    h.insert(
+                        "Content-ID",
+                        "<response-b29c5de2-0db4-490b-b421-6a51b598bd22+2>"
+                            .parse()
+                            .unwrap(),
+                    );
+
+                    h
+                },
+                version: Version::HTTP_11,
+                headers: {
+                    let mut h = HeaderMap::new();
+                    h.insert(
+                        "ETag",
+                        
"\"lGaP-E0memYDumK16YuUDM_6Gf0/91POdd-sxSAkJnS8Dm7wMxBSDKk\""
+                            .parse()
+                            .unwrap(),
+                    );
+                    h.insert(
+                        "Content-Type",
+                        "application/json; charset=UTF-8".parse().unwrap(),
+                    );
+                    h.insert("Date", "Mon, 22 Jan 2018 18:56:00 
GMT".parse().unwrap());
+                    h.insert("Expires", "Mon, 22 Jan 2018 18:56:00 
GMT".parse().unwrap());
+                    h.insert("Cache-Control", "private, 
max-age=0".parse().unwrap());
+                    h.insert("Content-Length", "846".parse().unwrap());
+
+                    h
+                },
+                content: Bytes::from_static(
+                    r#"{"kind": "storage#object","id": 
"example-bucket/obj2/1495822576643790","metadata": {"type": "tuxedo"}}"#
+                    .as_bytes()
+                ),
+                uri: None,
+                method: None,
+                status_code: Some(StatusCode::from_u16(200).unwrap())
+            }
+         );
+        assert_eq!(
+            multipart.parts[2],
+            MixedPart {
+                part_headers: {
+                    let mut h = HeaderMap::new();
+                    h.insert("Content-Type", 
"application/http".parse().unwrap());
+                    h.insert(
+                        "Content-ID",
+                        "<response-b29c5de2-0db4-490b-b421-6a51b598bd22+3>"
+                            .parse()
+                            .unwrap(),
+                    );
+
+                    h
+                },
+                version: Version::HTTP_11,
+                headers: {
+                    let mut h = HeaderMap::new();
+                    h.insert(
+                        "ETag",
+                        
"\"lGaP-E0memYDumK16YuUDM_6Gf0/d2Z1F1_ZVbB1dC0YKM9rX5VAgIQ\""
+                            .parse()
+                            .unwrap(),
+                    );
+                    h.insert(
+                        "Content-Type",
+                        "application/json; charset=UTF-8".parse().unwrap(),
+                    );
+                    h.insert("Date", "Mon, 22 Jan 2018 18:56:00 
GMT".parse().unwrap());
+                    h.insert("Expires", "Mon, 22 Jan 2018 18:56:00 
GMT".parse().unwrap());
+                    h.insert("Cache-Control", "private, 
max-age=0".parse().unwrap());
+                    h.insert("Content-Length", "846".parse().unwrap());
+
+                    h
+                },
+                content: Bytes::from_static(
+                    r#"{"kind": "storage#object","id": 
"example-bucket/obj3/1495822576643790","metadata": {"type": "calico"}}"#
+                    .as_bytes()
+                ),
+                uri: None,
+                method: None,
+                status_code: Some(StatusCode::from_u16(200).unwrap())
+            });
+    }
 }

Reply via email to