tustvold commented on code in PR #5681:
URL: https://github.com/apache/arrow-rs/pull/5681#discussion_r1853988263
##########
object_store/src/azure/client.rs:
##########
@@ -380,6 +589,78 @@ impl AzureClient {
Ok(())
}
+ fn build_bulk_delete_body(
+ &self,
+ boundary: &str,
+ paths: &[Path],
+ credential: &Option<Arc<AzureCredential>>,
+ ) -> Vec<u8> {
+ let mut body_bytes = Vec::with_capacity(paths.len() * 2048);
+
+ for (idx, path) in paths.iter().enumerate() {
+ let url = self.config.path_url(path);
+
+ // Build subrequest with proper authorization
+ let request = self
+ .client
+ .request(Method::DELETE, url)
+ .header(CONTENT_LENGTH, HeaderValue::from(0))
+ // Each subrequest must be authorized individually [1] and we
use
+ // the CredentialExt for this.
+ // [1]:
https://learn.microsoft.com/en-us/rest/api/storageservices/blob-batch?tabs=microsoft-entra-id#request-body
+ .with_azure_authorization(credential, &self.config.account)
+ .build()
+ .unwrap();
+
+ // Url for part requests must be relative and without base
+ let relative_url =
self.config.service.make_relative(request.url()).unwrap();
+
+ serialize_part_delete_request(&mut body_bytes, boundary, idx,
request, relative_url)
+ }
+
+ // Encode end marker
+ extend(&mut body_bytes, b"--");
+ extend(&mut body_bytes, boundary.as_bytes());
+ extend(&mut body_bytes, b"--");
+ extend(&mut body_bytes, b"\r\n");
+ body_bytes
+ }
+
+ pub(crate) async fn bulk_delete_request(&self, paths: Vec<Path>) ->
Result<Vec<Result<Path>>> {
+ if paths.is_empty() {
+ return Ok(Vec::new());
+ }
+
+ let credential = self.get_credential().await?;
+
+ // https://www.ietf.org/rfc/rfc2046
+ let boundary = format!("batch_{}", uuid::Uuid::new_v4());
Review Comment:
I wonder if this actually needs to be a UUID or if we could just generate
128-bits of random data and avoid the additional dependency
##########
object_store/src/azure/client.rs:
##########
@@ -240,6 +268,157 @@ impl<'a> PutRequest<'a> {
}
}
+#[inline]
+fn extend(dst: &mut Vec<u8>, data: &[u8]) {
+ dst.extend_from_slice(data);
+}
+
+// Write header names as title case. The header name is assumed to be ASCII.
+fn title_case(dst: &mut Vec<u8>, name: &[u8]) {
Review Comment:
:facepalm: Wow...
##########
object_store/src/azure/client.rs:
##########
@@ -247,6 +275,187 @@ impl<'a> PutRequest<'a> {
}
}
+#[inline]
+fn extend(dst: &mut Vec<u8>, data: &[u8]) {
+ dst.extend_from_slice(data);
+}
+
+// Write header names as title case. The header name is assumed to be ASCII.
+// We need it because Azure is not always treating headers as case insensitive.
+fn title_case(dst: &mut Vec<u8>, name: &[u8]) {
+ dst.reserve(name.len());
+
+ // Ensure first character is uppercased
+ let mut prev = b'-';
+ for &(mut c) in name {
+ if prev == b'-' {
+ c.make_ascii_uppercase();
+ }
+ dst.push(c);
+ prev = c;
+ }
+}
+
+fn write_headers(headers: &HeaderMap, dst: &mut Vec<u8>) {
+ for (name, value) in headers {
+ // We need special case handling here otherwise Azure returns 400
+ // due to `Content-Id` instead of `Content-ID`
+ if name == "content-id" {
+ extend(dst, b"Content-ID");
+ } else {
+ title_case(dst, name.as_str().as_bytes());
+ }
+ extend(dst, b": ");
+ extend(dst, value.as_bytes());
+ extend(dst, b"\r\n");
+ }
+}
+
+//
https://docs.oasis-open.org/odata/odata/v4.0/errata02/os/complete/part1-protocol/odata-v4.0-errata02-os-part1-protocol-complete.html#_Toc406398359
+fn serialize_part_delete_request(
+ dst: &mut Vec<u8>,
+ boundary: &str,
+ idx: usize,
+ request: reqwest::Request,
+ relative_url: String,
+) {
+ // Encode start marker for part
+ extend(dst, b"--");
+ extend(dst, boundary.as_bytes());
+ extend(dst, b"\r\n");
+
+ // Encode part headers
+ let mut part_headers = HeaderMap::new();
+ part_headers.insert(CONTENT_TYPE,
HeaderValue::from_static("application/http"));
+ part_headers.insert(
+ "Content-Transfer-Encoding",
+ HeaderValue::from_static("binary"),
+ );
+ // Azure returns 400 if we send `Content-Id` instead of `Content-ID`
+ part_headers.insert("Content-ID", HeaderValue::from(idx));
+ write_headers(&part_headers, dst);
+ extend(dst, b"\r\n");
+
+ // Encode the subrequest request-line
+ extend(dst, b"DELETE ");
+ extend(dst, format!("/{} ", relative_url).as_bytes());
+ extend(dst, b"HTTP/1.1");
+ extend(dst, b"\r\n");
+
+ // Encode subrequest headers
+ write_headers(request.headers(), dst);
+ extend(dst, b"\r\n");
+ extend(dst, b"\r\n");
+}
+
+fn parse_multipart_response_boundary(response: &Response) -> Result<String> {
+ let invalid_response = |msg: &str| Error::InvalidBulkDeleteResponse {
+ reason: msg.to_string(),
+ };
+
+ let content_type = response
+ .headers()
+ .get(CONTENT_TYPE)
+ .ok_or_else(|| invalid_response("missing Content-Type"))?;
+
+ let boundary = content_type
+ .as_ref()
+ .strip_prefix(b"multipart/mixed; boundary=")
+ .ok_or_else(|| invalid_response("invalid Content-Type value"))?
+ .to_vec();
+
+ let boundary =
+ String::from_utf8(boundary).map_err(|_| invalid_response("invalid
multipart boundary"))?;
+
+ Ok(boundary)
+}
+
+async fn parse_blob_batch_delete_response(
+ batch_response: Response,
+ paths: &[Path],
+) -> Result<Vec<Result<Path>>> {
+ let invalid_response = |msg: &str| Error::InvalidBulkDeleteResponse {
+ reason: msg.to_string(),
+ };
+
+ let boundary = parse_multipart_response_boundary(&batch_response)?;
+
+ let stream = batch_response.bytes_stream();
+
+ let mut multipart = multer::Multipart::new(stream, boundary);
Review Comment:
Sorry to go back and forth on this, but looking at multer it has a fairly
monstrous dependency footprint, including
https://crates.io/crates/encoding_rs/0.8.35 which is 1MB on its own...
Perhaps we could just revert to the custom parsing logic you had before
##########
object_store/src/azure/client.rs:
##########
@@ -380,6 +589,78 @@ impl AzureClient {
Ok(())
}
+ fn build_bulk_delete_body(
+ &self,
+ boundary: &str,
+ paths: &[Path],
+ credential: &Option<Arc<AzureCredential>>,
+ ) -> Vec<u8> {
+ let mut body_bytes = Vec::with_capacity(paths.len() * 2048);
+
+ for (idx, path) in paths.iter().enumerate() {
+ let url = self.config.path_url(path);
+
+ // Build subrequest with proper authorization
+ let request = self
+ .client
+ .request(Method::DELETE, url)
+ .header(CONTENT_LENGTH, HeaderValue::from(0))
+ // Each subrequest must be authorized individually [1] and we
use
+ // the CredentialExt for this.
+ // [1]:
https://learn.microsoft.com/en-us/rest/api/storageservices/blob-batch?tabs=microsoft-entra-id#request-body
+ .with_azure_authorization(credential, &self.config.account)
+ .build()
+ .unwrap();
+
+ // Url for part requests must be relative and without base
+ let relative_url =
self.config.service.make_relative(request.url()).unwrap();
+
+ serialize_part_delete_request(&mut body_bytes, boundary, idx,
request, relative_url)
+ }
+
+ // Encode end marker
+ extend(&mut body_bytes, b"--");
+ extend(&mut body_bytes, boundary.as_bytes());
Review Comment:
Should we validate that boundary doesn't appear in the encoded body?
##########
object_store/src/azure/client.rs:
##########
@@ -240,6 +268,157 @@ impl<'a> PutRequest<'a> {
}
}
+#[inline]
+fn extend(dst: &mut Vec<u8>, data: &[u8]) {
Review Comment:
I personally think it obfuscates what the code is doing, but I don't feel
strongly
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]