tustvold commented on code in PR #5681:
URL: https://github.com/apache/arrow-rs/pull/5681#discussion_r1606715122


##########
object_store/src/azure/client.rs:
##########
@@ -367,6 +546,112 @@ impl AzureClient {
         Ok(())
     }
 
+    pub async fn bulk_delete_request(&self, paths: Vec<Path>) -> 
Result<Vec<Result<Path>>> {
+        if paths.is_empty() {
+            return Ok(Vec::new());
+        }
+
+        let credential = self.get_credential().await?;
+
+        let boundary = format!("batch_{}", uuid::Uuid::new_v4());

Review Comment:
   ```suggestion
           // https://www.ietf.org/rfc/rfc2046
           let boundary = format!("batch_{}", uuid::Uuid::new_v4());
   ```



##########
object_store/src/azure/client.rs:
##########
@@ -240,6 +268,157 @@ impl<'a> PutRequest<'a> {
     }
 }
 
+#[inline]
+fn extend(dst: &mut Vec<u8>, data: &[u8]) {
+    dst.extend_from_slice(data);
+}
+
+// Write header names as title case. The header name is assumed to be ASCII.
+fn title_case(dst: &mut Vec<u8>, name: &[u8]) {
+    dst.reserve(name.len());
+
+    // Ensure first character is uppercased
+    let mut prev = b'-';
+    for &(mut c) in name {
+        if prev == b'-' {
+            c.make_ascii_uppercase();
+        }
+        dst.push(c);
+        prev = c;
+    }
+}
+
+fn write_headers(headers: &HeaderMap, dst: &mut Vec<u8>) {
+    for (name, value) in headers {
+        if name == "content-id" {
+            extend(dst, b"Content-ID");
+        } else {
+            title_case(dst, name.as_str().as_bytes());
+        }
+        extend(dst, b": ");
+        extend(dst, value.as_bytes());
+        extend(dst, b"\r\n");
+    }
+}
+
+fn serialize_part_request(
+    dst: &mut Vec<u8>,
+    boundary: &str,
+    idx: usize,
+    request: reqwest::Request,
+    relative_url: String,
+) {
+    // Encode start marker for part
+    extend(dst, b"--");
+    extend(dst, boundary.as_bytes());
+    extend(dst, b"\r\n");
+
+    // Encode part headers
+    let mut part_headers = HeaderMap::new();
+    part_headers.insert(CONTENT_TYPE, "application/http".parse().unwrap());

Review Comment:
   I think you can use HeaderValue::from_static here



##########
object_store/src/azure/client.rs:
##########
@@ -240,6 +268,157 @@ impl<'a> PutRequest<'a> {
     }
 }
 
+#[inline]
+fn extend(dst: &mut Vec<u8>, data: &[u8]) {
+    dst.extend_from_slice(data);
+}
+
+// Write header names as title case. The header name is assumed to be ASCII.
+fn title_case(dst: &mut Vec<u8>, name: &[u8]) {
+    dst.reserve(name.len());
+
+    // Ensure first character is uppercased
+    let mut prev = b'-';
+    for &(mut c) in name {
+        if prev == b'-' {
+            c.make_ascii_uppercase();
+        }
+        dst.push(c);
+        prev = c;
+    }
+}
+
+fn write_headers(headers: &HeaderMap, dst: &mut Vec<u8>) {
+    for (name, value) in headers {
+        if name == "content-id" {
+            extend(dst, b"Content-ID");
+        } else {
+            title_case(dst, name.as_str().as_bytes());
+        }
+        extend(dst, b": ");
+        extend(dst, value.as_bytes());
+        extend(dst, b"\r\n");
+    }
+}
+
+fn serialize_part_request(
+    dst: &mut Vec<u8>,
+    boundary: &str,
+    idx: usize,
+    request: reqwest::Request,
+    relative_url: String,
+) {
+    // Encode start marker for part
+    extend(dst, b"--");
+    extend(dst, boundary.as_bytes());
+    extend(dst, b"\r\n");
+
+    // Encode part headers
+    let mut part_headers = HeaderMap::new();
+    part_headers.insert(CONTENT_TYPE, "application/http".parse().unwrap());
+    part_headers.insert("Content-Transfer-Encoding", 
"binary".parse().unwrap());
+    part_headers.insert("Content-ID", HeaderValue::from(idx));
+    write_headers(&part_headers, dst);
+    extend(dst, b"\r\n");
+
+    // Encode the subrequest request-line
+    extend(dst, b"DELETE ");
+    extend(dst, format!("/{} ", relative_url).as_bytes());
+    extend(dst, b"HTTP/1.1");
+    extend(dst, b"\r\n");
+
+    // Encode subrequest headers
+    write_headers(request.headers(), dst);
+    extend(dst, b"\r\n");
+    extend(dst, b"\r\n");
+}
+
+fn parse_response_part(
+    remaining: &mut &[u8],
+    results: &mut [Result<Path>],
+    paths: &[Path],
+) -> Result<()> {
+    let invalid_response = |msg: &str| Error::InvalidBulkDeleteResponse {
+        reason: msg.to_string(),
+    };
+
+    // Parse part headers and retrieve part id
+    let mut headers = [httparse::EMPTY_HEADER; 4];

Review Comment:
   This appears to be hard-coding an assumption that the response only contains 
4 headers, is this something that can be relied upon?



##########
object_store/src/azure/client.rs:
##########
@@ -240,6 +268,157 @@ impl<'a> PutRequest<'a> {
     }
 }
 
+#[inline]
+fn extend(dst: &mut Vec<u8>, data: &[u8]) {
+    dst.extend_from_slice(data);
+}
+
+// Write header names as title case. The header name is assumed to be ASCII.
+fn title_case(dst: &mut Vec<u8>, name: &[u8]) {
+    dst.reserve(name.len());
+
+    // Ensure first character is uppercased
+    let mut prev = b'-';
+    for &(mut c) in name {
+        if prev == b'-' {
+            c.make_ascii_uppercase();
+        }
+        dst.push(c);
+        prev = c;
+    }
+}
+
+fn write_headers(headers: &HeaderMap, dst: &mut Vec<u8>) {
+    for (name, value) in headers {
+        if name == "content-id" {
+            extend(dst, b"Content-ID");
+        } else {
+            title_case(dst, name.as_str().as_bytes());
+        }
+        extend(dst, b": ");
+        extend(dst, value.as_bytes());
+        extend(dst, b"\r\n");
+    }
+}
+
+fn serialize_part_request(
+    dst: &mut Vec<u8>,
+    boundary: &str,
+    idx: usize,
+    request: reqwest::Request,
+    relative_url: String,
+) {
+    // Encode start marker for part
+    extend(dst, b"--");
+    extend(dst, boundary.as_bytes());
+    extend(dst, b"\r\n");
+
+    // Encode part headers
+    let mut part_headers = HeaderMap::new();
+    part_headers.insert(CONTENT_TYPE, "application/http".parse().unwrap());
+    part_headers.insert("Content-Transfer-Encoding", 
"binary".parse().unwrap());
+    part_headers.insert("Content-ID", HeaderValue::from(idx));
+    write_headers(&part_headers, dst);
+    extend(dst, b"\r\n");
+
+    // Encode the subrequest request-line
+    extend(dst, b"DELETE ");
+    extend(dst, format!("/{} ", relative_url).as_bytes());
+    extend(dst, b"HTTP/1.1");
+    extend(dst, b"\r\n");
+
+    // Encode subrequest headers
+    write_headers(request.headers(), dst);
+    extend(dst, b"\r\n");
+    extend(dst, b"\r\n");
+}
+
+fn parse_response_part(
+    remaining: &mut &[u8],
+    results: &mut [Result<Path>],
+    paths: &[Path],
+) -> Result<()> {
+    let invalid_response = |msg: &str| Error::InvalidBulkDeleteResponse {
+        reason: msg.to_string(),
+    };
+
+    // Parse part headers and retrieve part id
+    let mut headers = [httparse::EMPTY_HEADER; 4];
+    let id = match httparse::parse_headers(remaining, &mut headers) {
+        Ok(httparse::Status::Complete((pos, headers))) => {
+            *remaining = &remaining[pos..];
+            headers
+                .iter()
+                .find(|h| h.name.to_lowercase() == "content-id")
+                .and_then(|h| std::str::from_utf8(h.value).ok())
+                .and_then(|v| v.parse::<usize>().ok())
+        }
+        _ => return Err(invalid_response("unable to parse parse 
headers").into()),
+    };
+
+    // Parse part response headers
+    let mut headers = [httparse::EMPTY_HEADER; 10];

Review Comment:
   Similar comment to above



##########
object_store/src/azure/client.rs:
##########
@@ -240,6 +268,157 @@ impl<'a> PutRequest<'a> {
     }
 }
 
+#[inline]
+fn extend(dst: &mut Vec<u8>, data: &[u8]) {
+    dst.extend_from_slice(data);
+}
+
+// Write header names as title case. The header name is assumed to be ASCII.
+fn title_case(dst: &mut Vec<u8>, name: &[u8]) {
+    dst.reserve(name.len());
+
+    // Ensure first character is uppercased
+    let mut prev = b'-';
+    for &(mut c) in name {
+        if prev == b'-' {
+            c.make_ascii_uppercase();
+        }
+        dst.push(c);
+        prev = c;
+    }
+}
+
+fn write_headers(headers: &HeaderMap, dst: &mut Vec<u8>) {
+    for (name, value) in headers {
+        if name == "content-id" {

Review Comment:
   Why is this special case needed?



##########
object_store/src/azure/client.rs:
##########
@@ -367,6 +546,112 @@ impl AzureClient {
         Ok(())
     }
 
+    pub async fn bulk_delete_request(&self, paths: Vec<Path>) -> 
Result<Vec<Result<Path>>> {
+        if paths.is_empty() {
+            return Ok(Vec::new());
+        }
+
+        let credential = self.get_credential().await?;
+
+        let boundary = format!("batch_{}", uuid::Uuid::new_v4());
+
+        let mut body_bytes = Vec::with_capacity(paths.len() * 256);
+
+        for (idx, path) in paths.iter().enumerate() {
+            let url = self.config.path_url(path);
+
+            // Build subrequest with proper authorization
+            let request = self
+                .client
+                .request(Method::DELETE, url)
+                .header(CONTENT_LENGTH, HeaderValue::from(0))
+                .with_azure_authorization(&credential, &self.config.account)
+                .build()
+                .unwrap();
+
+            // Url for part requests must be relative and without base
+            let relative_url = 
self.config.service.make_relative(request.url()).unwrap();
+
+            serialize_part_request(&mut body_bytes, &boundary, idx, request, 
relative_url)
+        }
+
+        // Encode end marker
+        extend(&mut body_bytes, b"--");
+        extend(&mut body_bytes, boundary.as_bytes());
+        extend(&mut body_bytes, b"--");
+        extend(&mut body_bytes, b"\r\n");
+
+        // Send multipart request
+        let url = self.config.path_url(&Path::from("/"));
+        let batch_response = self
+            .client
+            .request(Method::POST, url)
+            .query(&[("restype", "container"), ("comp", "batch")])
+            .header(
+                CONTENT_TYPE,
+                HeaderValue::from_str(format!("multipart/mixed; boundary={}", 
boundary).as_str())
+                    .unwrap(),
+            )
+            .header(CONTENT_LENGTH, HeaderValue::from(body_bytes.len()))
+            .body(body_bytes)
+            .with_azure_authorization(&credential, &self.config.account)

Review Comment:
   Is this necessary?



##########
object_store/src/azure/client.rs:
##########
@@ -240,6 +268,157 @@ impl<'a> PutRequest<'a> {
     }
 }
 
+#[inline]
+fn extend(dst: &mut Vec<u8>, data: &[u8]) {

Review Comment:
   Is this needed?



##########
object_store/src/azure/client.rs:
##########
@@ -240,6 +268,157 @@ impl<'a> PutRequest<'a> {
     }
 }
 
+#[inline]
+fn extend(dst: &mut Vec<u8>, data: &[u8]) {
+    dst.extend_from_slice(data);
+}
+
+// Write header names as title case. The header name is assumed to be ASCII.
+fn title_case(dst: &mut Vec<u8>, name: &[u8]) {

Review Comment:
   FWIW this may not be necessary as headers are case insensitive



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to