This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 4b49c344406 Add put_multipart_opts (#5435) (#5652)
4b49c344406 is described below

commit 4b49c344406b63825ef41efbdc85cf09afe35966
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Wed Apr 17 11:51:56 2024 +0100

    Add put_multipart_opts (#5435) (#5652)
    
    * Add put_multipart_opts (#5435)
---
 object_store/src/aws/client.rs       | 203 +++++++++++++++++------------------
 object_store/src/aws/mod.rs          |  48 ++++++---
 object_store/src/azure/client.rs     |  93 +++++++++-------
 object_store/src/azure/mod.rs        |  16 ++-
 object_store/src/chunked.rs          |  10 +-
 object_store/src/gcp/client.rs       | 174 ++++++++++++++----------------
 object_store/src/gcp/mod.rs          |  15 ++-
 object_store/src/http/mod.rs         |   8 +-
 object_store/src/lib.rs              | 104 ++++++++++++++++--
 object_store/src/limit.rs            |  16 ++-
 object_store/src/local.rs            |  12 ++-
 object_store/src/memory.rs           |  21 ++--
 object_store/src/prefix.rs           |  13 ++-
 object_store/src/throttle.rs         |  14 ++-
 object_store/tests/get_range_file.rs |   6 +-
 15 files changed, 461 insertions(+), 292 deletions(-)

diff --git a/object_store/src/aws/client.rs b/object_store/src/aws/client.rs
index e81ef6aa220..4a4dc178d5b 100644
--- a/object_store/src/aws/client.rs
+++ b/object_store/src/aws/client.rs
@@ -19,8 +19,8 @@ use crate::aws::builder::S3EncryptionHeaders;
 use crate::aws::checksum::Checksum;
 use crate::aws::credential::{AwsCredential, CredentialExt};
 use crate::aws::{
-    AwsAuthorizer, AwsCredentialProvider, S3ConditionalPut, S3CopyIfNotExists, 
STORE,
-    STRICT_PATH_ENCODE_SET,
+    AwsAuthorizer, AwsCredentialProvider, S3ConditionalPut, S3CopyIfNotExists, 
COPY_SOURCE_HEADER,
+    STORE, STRICT_PATH_ENCODE_SET, TAGS_HEADER,
 };
 use crate::client::get::GetClient;
 use crate::client::header::{get_etag, HeaderConfig};
@@ -35,16 +35,16 @@ use crate::client::GetOptionsExt;
 use crate::multipart::PartId;
 use crate::path::DELIMITER;
 use crate::{
-    Attribute, Attributes, ClientOptions, GetOptions, ListResult, MultipartId, 
Path, PutPayload,
-    PutResult, Result, RetryConfig,
+    Attribute, Attributes, ClientOptions, GetOptions, ListResult, MultipartId, 
Path,
+    PutMultipartOpts, PutPayload, PutResult, Result, RetryConfig, TagSet,
 };
 use async_trait::async_trait;
 use base64::prelude::BASE64_STANDARD;
 use base64::Engine;
 use bytes::{Buf, Bytes};
 use hyper::header::{CACHE_CONTROL, CONTENT_LENGTH};
-use hyper::http;
 use hyper::http::HeaderName;
+use hyper::{http, HeaderMap};
 use itertools::Itertools;
 use md5::{Digest, Md5};
 use percent_encoding::{utf8_percent_encode, PercentEncode};
@@ -98,9 +98,6 @@ pub(crate) enum Error {
     #[snafu(display("Error getting list response body: {}", source))]
     ListResponseBody { source: reqwest::Error },
 
-    #[snafu(display("Error performing create multipart request: {}", source))]
-    CreateMultipartRequest { source: crate::client::retry::Error },
-
     #[snafu(display("Error getting create multipart response body: {}", 
source))]
     CreateMultipartResponseBody { source: reqwest::Error },
 
@@ -289,8 +286,75 @@ impl<'a> Request<'a> {
         Self { builder, ..self }
     }
 
-    pub fn idempotent(mut self, idempotent: bool) -> Self {
-        self.idempotent = idempotent;
+    pub fn headers(self, headers: HeaderMap) -> Self {
+        let builder = self.builder.headers(headers);
+        Self { builder, ..self }
+    }
+
+    pub fn idempotent(self, idempotent: bool) -> Self {
+        Self { idempotent, ..self }
+    }
+
+    pub fn with_encryption_headers(self) -> Self {
+        let headers = self.config.encryption_headers.clone().into();
+        let builder = self.builder.headers(headers);
+        Self { builder, ..self }
+    }
+
+    pub fn with_session_creds(self, use_session_creds: bool) -> Self {
+        Self {
+            use_session_creds,
+            ..self
+        }
+    }
+
+    pub fn with_tags(mut self, tags: TagSet) -> Self {
+        let tags = tags.encoded();
+        if !tags.is_empty() && !self.config.disable_tagging {
+            self.builder = self.builder.header(&TAGS_HEADER, tags);
+        }
+        self
+    }
+
+    pub fn with_attributes(self, attributes: Attributes) -> Self {
+        let mut has_content_type = false;
+        let mut builder = self.builder;
+        for (k, v) in &attributes {
+            builder = match k {
+                Attribute::CacheControl => builder.header(CACHE_CONTROL, 
v.as_ref()),
+                Attribute::ContentType => {
+                    has_content_type = true;
+                    builder.header(CONTENT_TYPE, v.as_ref())
+                }
+            };
+        }
+
+        if !has_content_type {
+            if let Some(value) = 
self.config.client_options.get_content_type(self.path) {
+                builder = builder.header(CONTENT_TYPE, value);
+            }
+        }
+        Self { builder, ..self }
+    }
+
+    pub fn with_payload(mut self, payload: PutPayload) -> Self {
+        if !self.config.skip_signature || self.config.checksum.is_some() {
+            let mut sha256 = Context::new(&digest::SHA256);
+            payload.iter().for_each(|x| sha256.update(x));
+            let payload_sha256 = sha256.finish();
+
+            if let Some(Checksum::SHA256) = self.config.checksum {
+                self.builder = self.builder.header(
+                    "x-amz-checksum-sha256",
+                    BASE64_STANDARD.encode(payload_sha256),
+                );
+            }
+            self.payload_sha256 = Some(payload_sha256);
+        }
+
+        let content_length = payload.content_length();
+        self.builder = self.builder.header(CONTENT_LENGTH, content_length);
+        self.payload = Some(payload);
         self
     }
 
@@ -335,81 +399,19 @@ impl S3Client {
         Ok(Self { config, client })
     }
 
-    /// Make an S3 PUT request 
<https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html>
-    ///
-    /// Returns the ETag
-    pub fn put_request<'a>(
-        &'a self,
-        path: &'a Path,
-        payload: PutPayload,
-        attributes: Attributes,
-        with_encryption_headers: bool,
-    ) -> Request<'a> {
+    pub fn request<'a>(&'a self, method: Method, path: &'a Path) -> 
Request<'a> {
         let url = self.config.path_url(path);
-        let mut builder = self.client.request(Method::PUT, url);
-        if with_encryption_headers {
-            builder = 
builder.headers(self.config.encryption_headers.clone().into());
-        }
-
-        let mut sha256 = Context::new(&digest::SHA256);
-        payload.iter().for_each(|x| sha256.update(x));
-        let payload_sha256 = sha256.finish();
-
-        if let Some(Checksum::SHA256) = self.config.checksum {
-            builder = builder.header(
-                "x-amz-checksum-sha256",
-                BASE64_STANDARD.encode(payload_sha256),
-            )
-        }
-
-        let mut has_content_type = false;
-        for (k, v) in &attributes {
-            builder = match k {
-                Attribute::CacheControl => builder.header(CACHE_CONTROL, 
v.as_ref()),
-                Attribute::ContentType => {
-                    has_content_type = true;
-                    builder.header(CONTENT_TYPE, v.as_ref())
-                }
-            };
-        }
-
-        if !has_content_type {
-            if let Some(value) = 
self.config.client_options.get_content_type(path) {
-                builder = builder.header(CONTENT_TYPE, value);
-            }
-        }
-
         Request {
             path,
-            builder: builder.header(CONTENT_LENGTH, payload.content_length()),
-            payload: Some(payload),
-            payload_sha256: Some(payload_sha256),
+            builder: self.client.request(method, url),
+            payload: None,
+            payload_sha256: None,
             config: &self.config,
             use_session_creds: true,
             idempotent: false,
         }
     }
 
-    /// Make an S3 Delete request 
<https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObject.html>
-    pub async fn delete_request<T: Serialize + ?Sized + Sync>(
-        &self,
-        path: &Path,
-        query: &T,
-    ) -> Result<()> {
-        let credential = self.config.get_session_credential().await?;
-        let url = self.config.path_url(path);
-
-        self.client
-            .request(Method::DELETE, url)
-            .query(query)
-            .with_aws_sigv4(credential.authorizer(), None)
-            .send_retry(&self.config.retry_config)
-            .await
-            .map_err(|e| e.error(STORE, path.to_string()))?;
-
-        Ok(())
-    }
-
     /// Make an S3 Delete Objects request 
<https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html>
     ///
     /// Produces a vector of results, one for each path in the input vector. If
@@ -513,41 +515,29 @@ impl S3Client {
     }
 
     /// Make an S3 Copy request 
<https://docs.aws.amazon.com/AmazonS3/latest/API/API_CopyObject.html>
-    pub fn copy_request<'a>(&'a self, from: &'a Path, to: &Path) -> 
Request<'a> {
-        let url = self.config.path_url(to);
+    pub fn copy_request<'a>(&'a self, from: &Path, to: &'a Path) -> 
Request<'a> {
         let source = format!("{}/{}", self.config.bucket, encode_path(from));
-
-        let builder = self
-            .client
-            .request(Method::PUT, url)
-            .header("x-amz-copy-source", source)
-            .headers(self.config.encryption_headers.clone().into());
-
-        Request {
-            builder,
-            path: from,
-            config: &self.config,
-            payload: None,
-            payload_sha256: None,
-            use_session_creds: false,
-            idempotent: false,
-        }
+        self.request(Method::PUT, to)
+            .idempotent(true)
+            .header(&COPY_SOURCE_HEADER, &source)
+            .headers(self.config.encryption_headers.clone().into())
+            .with_session_creds(false)
     }
 
-    pub async fn create_multipart(&self, location: &Path) -> 
Result<MultipartId> {
-        let credential = self.config.get_session_credential().await?;
-        let url = format!("{}?uploads=", self.config.path_url(location),);
-
+    pub async fn create_multipart(
+        &self,
+        location: &Path,
+        opts: PutMultipartOpts,
+    ) -> Result<MultipartId> {
         let response = self
-            .client
-            .request(Method::POST, url)
-            .headers(self.config.encryption_headers.clone().into())
-            .with_aws_sigv4(credential.authorizer(), None)
-            .retryable(&self.config.retry_config)
+            .request(Method::POST, location)
+            .query(&[("uploads", "")])
+            .with_encryption_headers()
+            .with_attributes(opts.attributes)
+            .with_tags(opts.tags)
             .idempotent(true)
             .send()
-            .await
-            .context(CreateMultipartRequestSnafu)?
+            .await?
             .bytes()
             .await
             .context(CreateMultipartResponseBodySnafu)?;
@@ -568,7 +558,8 @@ impl S3Client {
         let part = (part_idx + 1).to_string();
 
         let response = self
-            .put_request(path, data, Attributes::default(), false)
+            .request(Method::PUT, path)
+            .with_payload(data)
             .query(&[("partNumber", &part), ("uploadId", upload_id)])
             .idempotent(true)
             .send()
diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs
index 43bd38a6de2..7f1edf12faf 100644
--- a/object_store/src/aws/mod.rs
+++ b/object_store/src/aws/mod.rs
@@ -45,10 +45,12 @@ use crate::signer::Signer;
 use crate::util::STRICT_ENCODE_SET;
 use crate::{
     Error, GetOptions, GetResult, ListResult, MultipartId, MultipartUpload, 
ObjectMeta,
-    ObjectStore, Path, PutMode, PutOptions, PutPayload, PutResult, Result, 
UploadPart,
+    ObjectStore, Path, PutMode, PutMultipartOpts, PutOptions, PutPayload, 
PutResult, Result,
+    UploadPart,
 };
 
 static TAGS_HEADER: HeaderName = HeaderName::from_static("x-amz-tagging");
+static COPY_SOURCE_HEADER: HeaderName = 
HeaderName::from_static("x-amz-copy-source");
 
 mod builder;
 mod checksum;
@@ -156,12 +158,13 @@ impl ObjectStore for AmazonS3 {
         payload: PutPayload,
         opts: PutOptions,
     ) -> Result<PutResult> {
-        let attrs = opts.attributes;
-        let mut request = self.client.put_request(location, payload, attrs, 
true);
-        let tags = opts.tags.encoded();
-        if !tags.is_empty() && !self.client.config.disable_tagging {
-            request = request.header(&TAGS_HEADER, tags);
-        }
+        let request = self
+            .client
+            .request(Method::PUT, location)
+            .with_payload(payload)
+            .with_attributes(opts.attributes)
+            .with_tags(opts.tags)
+            .with_encryption_headers();
 
         match (opts.mode, &self.client.config.conditional_put) {
             (PutMode::Overwrite, _) => request.idempotent(true).do_put().await,
@@ -204,8 +207,12 @@ impl ObjectStore for AmazonS3 {
         }
     }
 
-    async fn put_multipart(&self, location: &Path) -> Result<Box<dyn 
MultipartUpload>> {
-        let upload_id = self.client.create_multipart(location).await?;
+    async fn put_multipart_opts(
+        &self,
+        location: &Path,
+        opts: PutMultipartOpts,
+    ) -> Result<Box<dyn MultipartUpload>> {
+        let upload_id = self.client.create_multipart(location, opts).await?;
 
         Ok(Box::new(S3MultiPartUpload {
             part_idx: 0,
@@ -223,7 +230,8 @@ impl ObjectStore for AmazonS3 {
     }
 
     async fn delete(&self, location: &Path) -> Result<()> {
-        self.client.delete_request(location, &()).await
+        self.client.request(Method::DELETE, location).send().await?;
+        Ok(())
     }
 
     fn delete_stream<'a>(
@@ -351,15 +359,22 @@ impl MultipartUpload for S3MultiPartUpload {
     async fn abort(&mut self) -> Result<()> {
         self.state
             .client
-            .delete_request(&self.state.location, &[("uploadId", 
&self.state.upload_id)])
-            .await
+            .request(Method::DELETE, &self.state.location)
+            .query(&[("uploadId", &self.state.upload_id)])
+            .idempotent(true)
+            .send()
+            .await?;
+
+        Ok(())
     }
 }
 
 #[async_trait]
 impl MultipartStore for AmazonS3 {
     async fn create_multipart(&self, path: &Path) -> Result<MultipartId> {
-        self.client.create_multipart(path).await
+        self.client
+            .create_multipart(path, PutMultipartOpts::default())
+            .await
     }
 
     async fn put_part(
@@ -382,7 +397,12 @@ impl MultipartStore for AmazonS3 {
     }
 
     async fn abort_multipart(&self, path: &Path, id: &MultipartId) -> 
Result<()> {
-        self.client.delete_request(path, &[("uploadId", id)]).await
+        self.client
+            .request(Method::DELETE, path)
+            .query(&[("uploadId", id)])
+            .send()
+            .await?;
+        Ok(())
     }
 }
 
diff --git a/object_store/src/azure/client.rs b/object_store/src/azure/client.rs
index 134609eb262..918fcd047ae 100644
--- a/object_store/src/azure/client.rs
+++ b/object_store/src/azure/client.rs
@@ -28,16 +28,14 @@ use crate::path::DELIMITER;
 use crate::util::{deserialize_rfc1123, GetRange};
 use crate::{
     Attribute, Attributes, ClientOptions, GetOptions, ListResult, ObjectMeta, 
Path, PutMode,
-    PutOptions, PutPayload, PutResult, Result, RetryConfig,
+    PutMultipartOpts, PutOptions, PutPayload, PutResult, Result, RetryConfig, 
TagSet,
 };
 use async_trait::async_trait;
 use base64::prelude::BASE64_STANDARD;
 use base64::Engine;
 use bytes::{Buf, Bytes};
 use chrono::{DateTime, Utc};
-use hyper::header::CACHE_CONTROL;
 use hyper::http::HeaderName;
-use reqwest::header::CONTENT_TYPE;
 use reqwest::{
     header::{HeaderValue, CONTENT_LENGTH, IF_MATCH, IF_NONE_MATCH},
     Client as ReqwestClient, Method, RequestBuilder, Response,
@@ -50,6 +48,8 @@ use std::time::Duration;
 use url::Url;
 
 const VERSION_HEADER: &str = "x-ms-version-id";
+static MS_CACHE_CONTROL: HeaderName = 
HeaderName::from_static("x-ms-blob-cache-control");
+static MS_CONTENT_TYPE: HeaderName = 
HeaderName::from_static("x-ms-blob-content-type");
 
 static TAGS_HEADER: HeaderName = HeaderName::from_static("x-ms-tags");
 
@@ -188,10 +188,39 @@ impl<'a> PutRequest<'a> {
         Self { builder, ..self }
     }
 
-    fn set_idempotent(self, idempotent: bool) -> Self {
+    fn idempotent(self, idempotent: bool) -> Self {
         Self { idempotent, ..self }
     }
 
+    fn with_tags(mut self, tags: TagSet) -> Self {
+        let tags = tags.encoded();
+        if !tags.is_empty() && !self.config.disable_tagging {
+            self.builder = self.builder.header(&TAGS_HEADER, tags);
+        }
+        self
+    }
+
+    fn with_attributes(self, attributes: Attributes) -> Self {
+        let mut builder = self.builder;
+        let mut has_content_type = false;
+        for (k, v) in &attributes {
+            builder = match k {
+                Attribute::CacheControl => builder.header(&MS_CACHE_CONTROL, 
v.as_ref()),
+                Attribute::ContentType => {
+                    has_content_type = true;
+                    builder.header(&MS_CONTENT_TYPE, v.as_ref())
+                }
+            };
+        }
+
+        if !has_content_type {
+            if let Some(value) = 
self.config.client_options.get_content_type(self.path) {
+                builder = builder.header(&MS_CONTENT_TYPE, value);
+            }
+        }
+        Self { builder, ..self }
+    }
+
     async fn send(self) -> Result<Response> {
         let credential = self.config.get_credential().await?;
         let response = self
@@ -233,32 +262,9 @@ impl AzureClient {
         self.config.get_credential().await
     }
 
-    fn put_request<'a>(
-        &'a self,
-        path: &'a Path,
-        payload: PutPayload,
-        attributes: Attributes,
-    ) -> PutRequest<'a> {
+    fn put_request<'a>(&'a self, path: &'a Path, payload: PutPayload) -> 
PutRequest<'a> {
         let url = self.config.path_url(path);
-
-        let mut builder = self.client.request(Method::PUT, url);
-
-        let mut has_content_type = false;
-        for (k, v) in &attributes {
-            builder = match k {
-                Attribute::CacheControl => builder.header(CACHE_CONTROL, 
v.as_ref()),
-                Attribute::ContentType => {
-                    has_content_type = true;
-                    builder.header(CONTENT_TYPE, v.as_ref())
-                }
-            };
-        }
-
-        if !has_content_type {
-            if let Some(value) = 
self.config.client_options.get_content_type(path) {
-                builder = builder.header(CONTENT_TYPE, value);
-            }
-        }
+        let builder = self.client.request(Method::PUT, url);
 
         PutRequest {
             path,
@@ -276,10 +282,13 @@ impl AzureClient {
         payload: PutPayload,
         opts: PutOptions,
     ) -> Result<PutResult> {
-        let builder = self.put_request(path, payload, opts.attributes);
+        let builder = self
+            .put_request(path, payload)
+            .with_attributes(opts.attributes)
+            .with_tags(opts.tags);
 
         let builder = match &opts.mode {
-            PutMode::Overwrite => builder.set_idempotent(true),
+            PutMode::Overwrite => builder.idempotent(true),
             PutMode::Create => builder.header(&IF_NONE_MATCH, "*"),
             PutMode::Update(v) => {
                 let etag = v.e_tag.as_ref().context(MissingETagSnafu)?;
@@ -287,11 +296,6 @@ impl AzureClient {
             }
         };
 
-        let builder = match (opts.tags.encoded(), self.config.disable_tagging) 
{
-            ("", _) | (_, true) => builder,
-            (tags, false) => builder.header(&TAGS_HEADER, tags),
-        };
-
         let response = builder.header(&BLOB_TYPE, "BlockBlob").send().await?;
         Ok(get_put_result(response.headers(), 
VERSION_HEADER).context(MetadataSnafu)?)
     }
@@ -306,9 +310,9 @@ impl AzureClient {
         let content_id = format!("{part_idx:20}");
         let block_id = BASE64_STANDARD.encode(&content_id);
 
-        self.put_request(path, payload, Attributes::default())
+        self.put_request(path, payload)
             .query(&[("comp", "block"), ("blockid", &block_id)])
-            .set_idempotent(true)
+            .idempotent(true)
             .send()
             .await?;
 
@@ -316,7 +320,12 @@ impl AzureClient {
     }
 
     /// PUT a block list 
<https://learn.microsoft.com/en-us/rest/api/storageservices/put-block-list>
-    pub async fn put_block_list(&self, path: &Path, parts: Vec<PartId>) -> 
Result<PutResult> {
+    pub async fn put_block_list(
+        &self,
+        path: &Path,
+        parts: Vec<PartId>,
+        opts: PutMultipartOpts,
+    ) -> Result<PutResult> {
         let blocks = parts
             .into_iter()
             .map(|part| BlockId::from(part.content_id))
@@ -324,9 +333,11 @@ impl AzureClient {
 
         let payload = BlockList { blocks }.to_xml().into();
         let response = self
-            .put_request(path, payload, Attributes::default())
+            .put_request(path, payload)
+            .with_attributes(opts.attributes)
+            .with_tags(opts.tags)
             .query(&[("comp", "blocklist")])
-            .set_idempotent(true)
+            .idempotent(true)
             .send()
             .await?;
 
diff --git a/object_store/src/azure/mod.rs b/object_store/src/azure/mod.rs
index 3bb57c45aa6..25ae6dda68a 100644
--- a/object_store/src/azure/mod.rs
+++ b/object_store/src/azure/mod.rs
@@ -27,7 +27,7 @@ use crate::{
     path::Path,
     signer::Signer,
     GetOptions, GetResult, ListResult, MultipartId, MultipartUpload, 
ObjectMeta, ObjectStore,
-    PutOptions, PutPayload, PutResult, Result, UploadPart,
+    PutMultipartOpts, PutOptions, PutPayload, PutResult, Result, UploadPart,
 };
 use async_trait::async_trait;
 use futures::stream::BoxStream;
@@ -95,9 +95,14 @@ impl ObjectStore for MicrosoftAzure {
         self.client.put_blob(location, payload, opts).await
     }
 
-    async fn put_multipart(&self, location: &Path) -> Result<Box<dyn 
MultipartUpload>> {
+    async fn put_multipart_opts(
+        &self,
+        location: &Path,
+        opts: PutMultipartOpts,
+    ) -> Result<Box<dyn MultipartUpload>> {
         Ok(Box::new(AzureMultiPartUpload {
             part_idx: 0,
+            opts,
             state: Arc::new(UploadState {
                 client: Arc::clone(&self.client),
                 location: location.clone(),
@@ -196,6 +201,7 @@ impl Signer for MicrosoftAzure {
 struct AzureMultiPartUpload {
     part_idx: usize,
     state: Arc<UploadState>,
+    opts: PutMultipartOpts,
 }
 
 #[derive(Debug)]
@@ -223,7 +229,7 @@ impl MultipartUpload for AzureMultiPartUpload {
 
         self.state
             .client
-            .put_block_list(&self.state.location, parts)
+            .put_block_list(&self.state.location, parts, std::mem::take(&mut 
self.opts))
             .await
     }
 
@@ -255,7 +261,9 @@ impl MultipartStore for MicrosoftAzure {
         _: &MultipartId,
         parts: Vec<PartId>,
     ) -> Result<PutResult> {
-        self.client.put_block_list(path, parts).await
+        self.client
+            .put_block_list(path, parts, Default::default())
+            .await
     }
 
     async fn abort_multipart(&self, _: &Path, _: &MultipartId) -> Result<()> {
diff --git a/object_store/src/chunked.rs b/object_store/src/chunked.rs
index 9abe49dbfce..a3bd7626787 100644
--- a/object_store/src/chunked.rs
+++ b/object_store/src/chunked.rs
@@ -29,7 +29,7 @@ use futures::StreamExt;
 use crate::path::Path;
 use crate::{
     GetOptions, GetResult, GetResultPayload, ListResult, MultipartUpload, 
ObjectMeta, ObjectStore,
-    PutOptions, PutResult,
+    PutMultipartOpts, PutOptions, PutResult,
 };
 use crate::{PutPayload, Result};
 
@@ -75,6 +75,14 @@ impl ObjectStore for ChunkedStore {
         self.inner.put_multipart(location).await
     }
 
+    async fn put_multipart_opts(
+        &self,
+        location: &Path,
+        opts: PutMultipartOpts,
+    ) -> Result<Box<dyn MultipartUpload>> {
+        self.inner.put_multipart_opts(location, opts).await
+    }
+
     async fn get_opts(&self, location: &Path, options: GetOptions) -> 
Result<GetResult> {
         let r = self.inner.get_opts(location, options).await?;
         let stream = match r.payload {
diff --git a/object_store/src/gcp/client.rs b/object_store/src/gcp/client.rs
index 4ee03eaad62..9c39efe6b23 100644
--- a/object_store/src/gcp/client.rs
+++ b/object_store/src/gcp/client.rs
@@ -29,8 +29,8 @@ use crate::multipart::PartId;
 use crate::path::{Path, DELIMITER};
 use crate::util::hex_encode;
 use crate::{
-    Attribute, Attributes, ClientOptions, GetOptions, ListResult, MultipartId, 
PutMode, PutOptions,
-    PutPayload, PutResult, Result, RetryConfig,
+    Attribute, Attributes, ClientOptions, GetOptions, ListResult, MultipartId, 
PutMode,
+    PutMultipartOpts, PutOptions, PutPayload, PutResult, Result, RetryConfig,
 };
 use async_trait::async_trait;
 use base64::prelude::BASE64_STANDARD;
@@ -39,7 +39,7 @@ use bytes::Buf;
 use hyper::header::{CACHE_CONTROL, CONTENT_LENGTH, CONTENT_TYPE};
 use percent_encoding::{percent_encode, utf8_percent_encode, NON_ALPHANUMERIC};
 use reqwest::header::HeaderName;
-use reqwest::{header, Client, Method, RequestBuilder, Response, StatusCode};
+use reqwest::{Client, Method, RequestBuilder, Response, StatusCode};
 use serde::{Deserialize, Serialize};
 use snafu::{OptionExt, ResultExt, Snafu};
 use std::sync::Arc;
@@ -66,14 +66,8 @@ enum Error {
         path: String,
     },
 
-    #[snafu(display("Error performing delete request {}: {}", path, source))]
-    DeleteRequest {
-        source: crate::client::retry::Error,
-        path: String,
-    },
-
-    #[snafu(display("Error performing put request {}: {}", path, source))]
-    PutRequest {
+    #[snafu(display("Error performing request {}: {}", path, source))]
+    Request {
         source: crate::client::retry::Error,
         path: String,
     },
@@ -120,9 +114,9 @@ enum Error {
 impl From<Error> for crate::Error {
     fn from(err: Error) -> Self {
         match err {
-            Error::GetRequest { source, path }
-            | Error::DeleteRequest { source, path }
-            | Error::PutRequest { source, path } => source.error(STORE, path),
+            Error::GetRequest { source, path } | Error::Request { source, path 
} => {
+                source.error(STORE, path)
+            }
             _ => Self::Generic {
                 store: STORE,
                 source: Box::new(err),
@@ -171,15 +165,15 @@ impl GoogleCloudStorageConfig {
 }
 
 /// A builder for a put request allowing customisation of the headers and 
query string
-pub struct PutRequest<'a> {
+pub struct Request<'a> {
     path: &'a Path,
     config: &'a GoogleCloudStorageConfig,
-    payload: PutPayload,
+    payload: Option<PutPayload>,
     builder: RequestBuilder,
     idempotent: bool,
 }
 
-impl<'a> PutRequest<'a> {
+impl<'a> Request<'a> {
     fn header(self, k: &HeaderName, v: &str) -> Self {
         let builder = self.builder.header(k, v);
         Self { builder, ..self }
@@ -190,26 +184,58 @@ impl<'a> PutRequest<'a> {
         Self { builder, ..self }
     }
 
-    fn set_idempotent(mut self, idempotent: bool) -> Self {
+    fn idempotent(mut self, idempotent: bool) -> Self {
         self.idempotent = idempotent;
         self
     }
 
-    async fn send(self) -> Result<PutResult> {
+    fn with_attributes(self, attributes: Attributes) -> Self {
+        let mut builder = self.builder;
+        let mut has_content_type = false;
+        for (k, v) in &attributes {
+            builder = match k {
+                Attribute::CacheControl => builder.header(CACHE_CONTROL, 
v.as_ref()),
+                Attribute::ContentType => {
+                    has_content_type = true;
+                    builder.header(CONTENT_TYPE, v.as_ref())
+                }
+            };
+        }
+
+        if !has_content_type {
+            let value = self.config.client_options.get_content_type(self.path);
+            builder = builder.header(CONTENT_TYPE, 
value.unwrap_or(DEFAULT_CONTENT_TYPE))
+        }
+        Self { builder, ..self }
+    }
+
+    fn with_payload(self, payload: PutPayload) -> Self {
+        let content_length = payload.content_length();
+        Self {
+            builder: self.builder.header(CONTENT_LENGTH, content_length),
+            payload: Some(payload),
+            ..self
+        }
+    }
+
+    async fn send(self) -> Result<Response> {
         let credential = self.config.credentials.get_credential().await?;
-        let response = self
+        let resp = self
             .builder
             .bearer_auth(&credential.bearer)
-            .header(CONTENT_LENGTH, self.payload.content_length())
             .retryable(&self.config.retry_config)
             .idempotent(self.idempotent)
-            .payload(Some(self.payload))
+            .payload(self.payload)
             .send()
             .await
-            .context(PutRequestSnafu {
+            .context(RequestSnafu {
                 path: self.path.as_ref(),
             })?;
+        Ok(resp)
+    }
 
+    async fn do_put(self) -> Result<PutResult> {
+        let response = self.send().await?;
         Ok(get_put_result(response.headers(), 
VERSION_HEADER).context(MetadataSnafu)?)
     }
 }
@@ -324,36 +350,13 @@ impl GoogleCloudStorageClient {
     /// Perform a put request 
<https://cloud.google.com/storage/docs/xml-api/put-object-upload>
     ///
     /// Returns the new ETag
-    pub fn put_request<'a>(
-        &'a self,
-        path: &'a Path,
-        payload: PutPayload,
-        attributes: Attributes,
-    ) -> PutRequest<'a> {
-        let url = self.object_url(path);
-        let mut builder = self.client.request(Method::PUT, url);
-
-        let mut has_content_type = false;
-        for (k, v) in &attributes {
-            builder = match k {
-                Attribute::CacheControl => builder.header(CACHE_CONTROL, 
v.as_ref()),
-                Attribute::ContentType => {
-                    has_content_type = true;
-                    builder.header(CONTENT_TYPE, v.as_ref())
-                }
-            };
-        }
+    pub fn request<'a>(&'a self, method: Method, path: &'a Path) -> 
Request<'a> {
+        let builder = self.client.request(method, self.object_url(path));
 
-        if !has_content_type {
-            let opts = &self.config.client_options;
-            let value = 
opts.get_content_type(path).unwrap_or(DEFAULT_CONTENT_TYPE);
-            builder = builder.header(CONTENT_TYPE, value)
-        }
-
-        PutRequest {
+        Request {
             path,
             builder,
-            payload,
+            payload: None,
             config: &self.config,
             idempotent: false,
         }
@@ -365,10 +368,13 @@ impl GoogleCloudStorageClient {
         payload: PutPayload,
         opts: PutOptions,
     ) -> Result<PutResult> {
-        let builder = self.put_request(path, payload, opts.attributes);
+        let builder = self
+            .request(Method::PUT, path)
+            .with_payload(payload)
+            .with_attributes(opts.attributes);
 
         let builder = match &opts.mode {
-            PutMode::Overwrite => builder.set_idempotent(true),
+            PutMode::Overwrite => builder.idempotent(true),
             PutMode::Create => builder.header(&VERSION_MATCH, "0"),
             PutMode::Update(v) => {
                 let etag = v.version.as_ref().context(MissingVersionSnafu)?;
@@ -376,7 +382,7 @@ impl GoogleCloudStorageClient {
             }
         };
 
-        match (opts.mode, builder.send().await) {
+        match (opts.mode, builder.do_put().await) {
             (PutMode::Create, Err(crate::Error::Precondition { path, source 
})) => {
                 Err(crate::Error::AlreadyExists { path, source })
             }
@@ -399,10 +405,11 @@ impl GoogleCloudStorageClient {
             ("uploadId", upload_id),
         ];
         let result = self
-            .put_request(path, data, Attributes::new())
+            .request(Method::PUT, path)
+            .with_payload(data)
             .query(query)
-            .set_idempotent(true)
-            .send()
+            .idempotent(true)
+            .do_put()
             .await?;
 
         Ok(PartId {
@@ -411,30 +418,18 @@ impl GoogleCloudStorageClient {
     }
 
     /// Initiate a multipart upload 
<https://cloud.google.com/storage/docs/xml-api/post-object-multipart>
-    pub async fn multipart_initiate(&self, path: &Path) -> Result<MultipartId> 
{
-        let credential = self.get_credential().await?;
-        let url = self.object_url(path);
-
-        let content_type = self
-            .config
-            .client_options
-            .get_content_type(path)
-            .unwrap_or("application/octet-stream");
-
+    pub async fn multipart_initiate(
+        &self,
+        path: &Path,
+        opts: PutMultipartOpts,
+    ) -> Result<MultipartId> {
         let response = self
-            .client
-            .request(Method::POST, &url)
-            .bearer_auth(&credential.bearer)
-            .header(header::CONTENT_TYPE, content_type)
-            .header(header::CONTENT_LENGTH, "0")
+            .request(Method::POST, path)
+            .with_attributes(opts.attributes)
+            .header(&CONTENT_LENGTH, "0")
             .query(&[("uploads", "")])
-            .retryable(&self.config.retry_config)
-            .idempotent(true)
             .send()
-            .await
-            .context(PutRequestSnafu {
-                path: path.as_ref(),
-            })?;
+            .await?;
 
         let data = response.bytes().await.context(PutResponseBodySnafu)?;
         let result: InitiateMultipartUploadResult =
@@ -451,12 +446,12 @@ impl GoogleCloudStorageClient {
         self.client
             .request(Method::DELETE, &url)
             .bearer_auth(&credential.bearer)
-            .header(header::CONTENT_TYPE, "application/octet-stream")
-            .header(header::CONTENT_LENGTH, "0")
+            .header(CONTENT_TYPE, "application/octet-stream")
+            .header(CONTENT_LENGTH, "0")
             .query(&[("uploadId", multipart_id)])
             .send_retry(&self.config.retry_config)
             .await
-            .context(PutRequestSnafu {
+            .context(RequestSnafu {
                 path: path.as_ref(),
             })?;
 
@@ -472,9 +467,9 @@ impl GoogleCloudStorageClient {
         if completed_parts.is_empty() {
             // GCS doesn't allow empty multipart uploads
             let result = self
-                .put_request(path, Default::default(), Attributes::new())
-                .set_idempotent(true)
-                .send()
+                .request(Method::PUT, path)
+                .idempotent(true)
+                .do_put()
                 .await?;
             self.multipart_cleanup(path, multipart_id).await?;
             return Ok(result);
@@ -523,18 +518,7 @@ impl GoogleCloudStorageClient {
 
     /// Perform a delete request 
<https://cloud.google.com/storage/docs/xml-api/delete-object>
     pub async fn delete_request(&self, path: &Path) -> Result<()> {
-        let credential = self.get_credential().await?;
-        let url = self.object_url(path);
-
-        let builder = self.client.request(Method::DELETE, url);
-        builder
-            .bearer_auth(&credential.bearer)
-            .send_retry(&self.config.retry_config)
-            .await
-            .context(DeleteRequestSnafu {
-                path: path.as_ref(),
-            })?;
-
+        self.request(Method::DELETE, path).send().await?;
         Ok(())
     }
 
diff --git a/object_store/src/gcp/mod.rs b/object_store/src/gcp/mod.rs
index af6e671cbc3..0ec6e7e8264 100644
--- a/object_store/src/gcp/mod.rs
+++ b/object_store/src/gcp/mod.rs
@@ -42,7 +42,8 @@ use crate::gcp::credential::GCSAuthorizer;
 use crate::signer::Signer;
 use crate::{
     multipart::PartId, path::Path, GetOptions, GetResult, ListResult, 
MultipartId, MultipartUpload,
-    ObjectMeta, ObjectStore, PutOptions, PutPayload, PutResult, Result, 
UploadPart,
+    ObjectMeta, ObjectStore, PutMultipartOpts, PutOptions, PutPayload, 
PutResult, Result,
+    UploadPart,
 };
 use async_trait::async_trait;
 use client::GoogleCloudStorageClient;
@@ -156,8 +157,12 @@ impl ObjectStore for GoogleCloudStorage {
         self.client.put(location, payload, opts).await
     }
 
-    async fn put_multipart(&self, location: &Path) -> Result<Box<dyn 
MultipartUpload>> {
-        let upload_id = self.client.multipart_initiate(location).await?;
+    async fn put_multipart_opts(
+        &self,
+        location: &Path,
+        opts: PutMultipartOpts,
+    ) -> Result<Box<dyn MultipartUpload>> {
+        let upload_id = self.client.multipart_initiate(location, opts).await?;
 
         Ok(Box::new(GCSMultipartUpload {
             part_idx: 0,
@@ -206,7 +211,9 @@ impl ObjectStore for GoogleCloudStorage {
 #[async_trait]
 impl MultipartStore for GoogleCloudStorage {
     async fn create_multipart(&self, path: &Path) -> Result<MultipartId> {
-        self.client.multipart_initiate(path).await
+        self.client
+            .multipart_initiate(path, PutMultipartOpts::default())
+            .await
     }
 
     async fn put_part(
diff --git a/object_store/src/http/mod.rs b/object_store/src/http/mod.rs
index d6ba4f4d913..404211e578d 100644
--- a/object_store/src/http/mod.rs
+++ b/object_store/src/http/mod.rs
@@ -44,7 +44,7 @@ use crate::http::client::Client;
 use crate::path::Path;
 use crate::{
     ClientConfigKey, ClientOptions, GetOptions, GetResult, ListResult, 
MultipartUpload, ObjectMeta,
-    ObjectStore, PutMode, PutOptions, PutPayload, PutResult, Result, 
RetryConfig,
+    ObjectStore, PutMode, PutMultipartOpts, PutOptions, PutPayload, PutResult, 
Result, RetryConfig,
 };
 
 mod client;
@@ -118,7 +118,11 @@ impl ObjectStore for HttpStore {
         })
     }
 
-    async fn put_multipart(&self, _location: &Path) -> Result<Box<dyn 
MultipartUpload>> {
+    async fn put_multipart_opts(
+        &self,
+        _location: &Path,
+        _opts: PutMultipartOpts,
+    ) -> Result<Box<dyn MultipartUpload>> {
         Err(crate::Error::NotImplemented)
     }
 
diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs
index b492d93894a..ad72bd29ef7 100644
--- a/object_store/src/lib.rs
+++ b/object_store/src/lib.rs
@@ -597,7 +597,20 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + 
Debug + 'static {
     ///
     /// Client should prefer [`ObjectStore::put`] for small payloads, as 
streaming uploads
     /// typically require multiple separate requests. See [`MultipartUpload`] 
for more information
-    async fn put_multipart(&self, location: &Path) -> Result<Box<dyn 
MultipartUpload>>;
+    async fn put_multipart(&self, location: &Path) -> Result<Box<dyn 
MultipartUpload>> {
+        self.put_multipart_opts(location, PutMultipartOpts::default())
+            .await
+    }
+
+    /// Perform a multipart upload with options
+    ///
+    /// Client should prefer [`ObjectStore::put`] for small payloads, as 
streaming uploads
+    /// typically require multiple separate requests. See [`MultipartUpload`] 
for more information
+    async fn put_multipart_opts(
+        &self,
+        location: &Path,
+        opts: PutMultipartOpts,
+    ) -> Result<Box<dyn MultipartUpload>>;
 
     /// Return the bytes that are stored at the specified location.
     async fn get(&self, location: &Path) -> Result<GetResult> {
@@ -785,6 +798,14 @@ macro_rules! as_ref_impl {
                 self.as_ref().put_multipart(location).await
             }
 
+            async fn put_multipart_opts(
+                &self,
+                location: &Path,
+                opts: PutMultipartOpts,
+            ) -> Result<Box<dyn MultipartUpload>> {
+                self.as_ref().put_multipart_opts(location, opts).await
+            }
+
             async fn get(&self, location: &Path) -> Result<GetResult> {
                 self.as_ref().get(location).await
             }
@@ -1144,6 +1165,46 @@ impl From<TagSet> for PutOptions {
     }
 }
 
+impl From<Attributes> for PutOptions {
+    fn from(attributes: Attributes) -> Self {
+        Self {
+            attributes,
+            ..Default::default()
+        }
+    }
+}
+
+/// Options for [`ObjectStore::put_multipart_opts`]
+#[derive(Debug, Clone, PartialEq, Eq, Default)]
+pub struct PutMultipartOpts {
+    /// Provide a [`TagSet`] for this object
+    ///
+    /// Implementations that don't support object tagging should ignore this
+    pub tags: TagSet,
+    /// Provide a set of [`Attributes`]
+    ///
+    /// Implementations that don't support an attribute should return an error
+    pub attributes: Attributes,
+}
+
+impl From<TagSet> for PutMultipartOpts {
+    fn from(tags: TagSet) -> Self {
+        Self {
+            tags,
+            ..Default::default()
+        }
+    }
+}
+
+impl From<Attributes> for PutMultipartOpts {
+    fn from(attributes: Attributes) -> Self {
+        Self {
+            attributes,
+            ..Default::default()
+        }
+    }
+}
+
 /// Result for a put request
 #[derive(Debug, Clone, PartialEq, Eq)]
 pub struct PutResult {
@@ -1688,10 +1749,7 @@ mod tests {
         ]);
 
         let path = Path::from("attributes");
-        let opts = PutOptions {
-            attributes: attributes.clone(),
-            ..Default::default()
-        };
+        let opts = attributes.clone().into();
         match integration.put_opts(&path, "foo".into(), opts).await {
             Ok(_) => {
                 let r = integration.get(&path).await.unwrap();
@@ -1700,6 +1758,19 @@ mod tests {
             Err(Error::NotImplemented) => {}
             Err(e) => panic!("{e}"),
         }
+
+        let opts = attributes.clone().into();
+        match integration.put_multipart_opts(&path, opts).await {
+            Ok(mut w) => {
+                w.put_part("foo".into()).await.unwrap();
+                w.complete().await.unwrap();
+
+                let r = integration.get(&path).await.unwrap();
+                assert_eq!(r.attributes, attributes);
+            }
+            Err(Error::NotImplemented) => {}
+            Err(e) => panic!("{e}"),
+        }
     }
 
     pub(crate) async fn get_opts(storage: &dyn ObjectStore) {
@@ -2332,21 +2403,32 @@ mod tests {
 
         let path = Path::from("tag_test");
         storage
-            .put_opts(&path, "test".into(), tag_set.into())
+            .put_opts(&path, "test".into(), tag_set.clone().into())
             .await
             .unwrap();
 
+        let multi_path = Path::from("tag_test_multi");
+        let mut write = storage
+            .put_multipart_opts(&multi_path, tag_set.into())
+            .await
+            .unwrap();
+
+        write.put_part("foo".into()).await.unwrap();
+        write.complete().await.unwrap();
+
         // Write should always succeed, but certain configurations may simply 
ignore tags
         if !validate {
             return;
         }
 
-        let resp = get_tags(path.clone()).await.unwrap();
-        let body = resp.bytes().await.unwrap();
+        for path in [path, multi_path] {
+            let resp = get_tags(path.clone()).await.unwrap();
+            let body = resp.bytes().await.unwrap();
 
-        let mut resp: Tagging = 
quick_xml::de::from_reader(body.reader()).unwrap();
-        resp.list.tags.sort_by(|a, b| a.key.cmp(&b.key));
-        assert_eq!(resp.list.tags, tags);
+            let mut resp: Tagging = 
quick_xml::de::from_reader(body.reader()).unwrap();
+            resp.list.tags.sort_by(|a, b| a.key.cmp(&b.key));
+            assert_eq!(resp.list.tags, tags);
+        }
     }
 
     async fn delete_fixtures(storage: &DynObjectStore) {
diff --git a/object_store/src/limit.rs b/object_store/src/limit.rs
index b94aa05b8b6..f3e1d4296fe 100644
--- a/object_store/src/limit.rs
+++ b/object_store/src/limit.rs
@@ -19,7 +19,8 @@
 
 use crate::{
     BoxStream, GetOptions, GetResult, GetResultPayload, ListResult, 
MultipartUpload, ObjectMeta,
-    ObjectStore, Path, PutOptions, PutPayload, PutResult, Result, StreamExt, 
UploadPart,
+    ObjectStore, Path, PutMultipartOpts, PutOptions, PutPayload, PutResult, 
Result, StreamExt,
+    UploadPart,
 };
 use async_trait::async_trait;
 use bytes::Bytes;
@@ -91,6 +92,19 @@ impl<T: ObjectStore> ObjectStore for LimitStore<T> {
             upload,
         }))
     }
+
+    async fn put_multipart_opts(
+        &self,
+        location: &Path,
+        opts: PutMultipartOpts,
+    ) -> Result<Box<dyn MultipartUpload>> {
+        let upload = self.inner.put_multipart_opts(location, opts).await?;
+        Ok(Box::new(LimitUpload {
+            semaphore: Arc::clone(&self.semaphore),
+            upload,
+        }))
+    }
+
     async fn get(&self, location: &Path) -> Result<GetResult> {
         let permit = 
Arc::clone(&self.semaphore).acquire_owned().await.unwrap();
         let r = self.inner.get(location).await?;
diff --git a/object_store/src/local.rs b/object_store/src/local.rs
index a3695ad9174..8dec5bee0a2 100644
--- a/object_store/src/local.rs
+++ b/object_store/src/local.rs
@@ -39,7 +39,7 @@ use crate::{
     path::{absolute_path_to_url, Path},
     util::InvalidGetRange,
     Attributes, GetOptions, GetResult, GetResultPayload, ListResult, 
MultipartUpload, ObjectMeta,
-    ObjectStore, PutMode, PutOptions, PutPayload, PutResult, Result, 
UploadPart,
+    ObjectStore, PutMode, PutMultipartOpts, PutOptions, PutPayload, PutResult, 
Result, UploadPart,
 };
 
 /// A specialized `Error` for filesystem object store-related errors
@@ -404,7 +404,15 @@ impl ObjectStore for LocalFileSystem {
         .await
     }
 
-    async fn put_multipart(&self, location: &Path) -> Result<Box<dyn 
MultipartUpload>> {
+    async fn put_multipart_opts(
+        &self,
+        location: &Path,
+        opts: PutMultipartOpts,
+    ) -> Result<Box<dyn MultipartUpload>> {
+        if !opts.attributes.is_empty() {
+            return Err(crate::Error::NotImplemented);
+        }
+
         let dest = self.path_to_filesystem(location)?;
         let (file, src) = new_staged_upload(&dest)?;
         Ok(Box::new(LocalUpload::new(src, dest, file)))
diff --git a/object_store/src/memory.rs b/object_store/src/memory.rs
index e34b28fd27c..daf14e17510 100644
--- a/object_store/src/memory.rs
+++ b/object_store/src/memory.rs
@@ -31,8 +31,8 @@ use crate::multipart::{MultipartStore, PartId};
 use crate::util::InvalidGetRange;
 use crate::{
     path::Path, Attributes, GetRange, GetResult, GetResultPayload, ListResult, 
MultipartId,
-    MultipartUpload, ObjectMeta, ObjectStore, PutMode, PutOptions, PutResult, 
Result,
-    UpdateVersion, UploadPart,
+    MultipartUpload, ObjectMeta, ObjectStore, PutMode, PutMultipartOpts, 
PutOptions, PutResult,
+    Result, UpdateVersion, UploadPart,
 };
 use crate::{GetOptions, PutPayload};
 
@@ -223,9 +223,14 @@ impl ObjectStore for InMemory {
         })
     }
 
-    async fn put_multipart(&self, location: &Path) -> Result<Box<dyn 
MultipartUpload>> {
+    async fn put_multipart_opts(
+        &self,
+        location: &Path,
+        opts: PutMultipartOpts,
+    ) -> Result<Box<dyn MultipartUpload>> {
         Ok(Box::new(InMemoryUpload {
             location: location.clone(),
+            attributes: opts.attributes,
             parts: vec![],
             storage: Arc::clone(&self.storage),
         }))
@@ -487,6 +492,7 @@ impl InMemory {
 #[derive(Debug)]
 struct InMemoryUpload {
     location: Path,
+    attributes: Attributes,
     parts: Vec<PutPayload>,
     storage: Arc<RwLock<Storage>>,
 }
@@ -503,10 +509,11 @@ impl MultipartUpload for InMemoryUpload {
         let mut buf = Vec::with_capacity(cap);
         let parts = self.parts.iter().flatten();
         parts.for_each(|x| buf.extend_from_slice(x));
-        let etag = self
-            .storage
-            .write()
-            .insert(&self.location, buf.into(), Attributes::new());
+        let etag = self.storage.write().insert(
+            &self.location,
+            buf.into(),
+            std::mem::take(&mut self.attributes),
+        );
 
         Ok(PutResult {
             e_tag: Some(etag.to_string()),
diff --git a/object_store/src/prefix.rs b/object_store/src/prefix.rs
index 1d1ffeed8c6..7c9ea5804c3 100644
--- a/object_store/src/prefix.rs
+++ b/object_store/src/prefix.rs
@@ -22,8 +22,8 @@ use std::ops::Range;
 
 use crate::path::Path;
 use crate::{
-    GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, 
ObjectStore, PutOptions,
-    PutPayload, PutResult, Result,
+    GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, 
ObjectStore, PutMultipartOpts,
+    PutOptions, PutPayload, PutResult, Result,
 };
 
 #[doc(hidden)]
@@ -100,6 +100,15 @@ impl<T: ObjectStore> ObjectStore for PrefixStore<T> {
         self.inner.put_multipart(&full_path).await
     }
 
+    async fn put_multipart_opts(
+        &self,
+        location: &Path,
+        opts: PutMultipartOpts,
+    ) -> Result<Box<dyn MultipartUpload>> {
+        let full_path = self.full_path(location);
+        self.inner.put_multipart_opts(&full_path, opts).await
+    }
+
     async fn get(&self, location: &Path) -> Result<GetResult> {
         let full_path = self.full_path(location);
         self.inner.get(&full_path).await
diff --git a/object_store/src/throttle.rs b/object_store/src/throttle.rs
index d089784668e..38b6d7c3bf4 100644
--- a/object_store/src/throttle.rs
+++ b/object_store/src/throttle.rs
@@ -23,7 +23,7 @@ use std::{convert::TryInto, sync::Arc};
 use crate::multipart::{MultipartStore, PartId};
 use crate::{
     path::Path, GetResult, GetResultPayload, ListResult, MultipartId, 
MultipartUpload, ObjectMeta,
-    ObjectStore, PutOptions, PutPayload, PutResult, Result,
+    ObjectStore, PutMultipartOpts, PutOptions, PutPayload, PutResult, Result,
 };
 use crate::{GetOptions, UploadPart};
 use async_trait::async_trait;
@@ -171,6 +171,18 @@ impl<T: ObjectStore> ObjectStore for ThrottledStore<T> {
         }))
     }
 
+    async fn put_multipart_opts(
+        &self,
+        location: &Path,
+        opts: PutMultipartOpts,
+    ) -> Result<Box<dyn MultipartUpload>> {
+        let upload = self.inner.put_multipart_opts(location, opts).await?;
+        Ok(Box::new(ThrottledUpload {
+            upload,
+            sleep: self.config().wait_put_per_call,
+        }))
+    }
+
     async fn get(&self, location: &Path) -> Result<GetResult> {
         sleep(self.config().wait_get_per_call).await;
 
diff --git a/object_store/tests/get_range_file.rs 
b/object_store/tests/get_range_file.rs
index 59c59340045..c5550ac2172 100644
--- a/object_store/tests/get_range_file.rs
+++ b/object_store/tests/get_range_file.rs
@@ -46,7 +46,11 @@ impl ObjectStore for MyStore {
         self.0.put_opts(location, payload, opts).await
     }
 
-    async fn put_multipart(&self, _location: &Path) -> Result<Box<dyn 
MultipartUpload>> {
+    async fn put_multipart_opts(
+        &self,
+        _location: &Path,
+        _opts: PutMultipartOpts,
+    ) -> Result<Box<dyn MultipartUpload>> {
         todo!()
     }
 

Reply via email to