This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 873176078ab Support non-contiguous put payloads / vectored writes 
(#5514) (#5538)
873176078ab is described below

commit 873176078abe54b4503ef15b8d1a2509095331e9
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Mon Apr 15 14:50:19 2024 +0100

    Support non-contiguous put payloads / vectored writes (#5514) (#5538)
    
    * Support non-contiguous put payloads (#5514)
    
    * Docs
    
    * Add more docs
    
    * Review feedback
---
 object_store/src/aws/checksum.rs     |  15 --
 object_store/src/aws/client.rs       |  79 ++++-----
 object_store/src/aws/credential.rs   |   8 +-
 object_store/src/aws/dynamo.rs       |   6 +-
 object_store/src/aws/mod.rs          |  25 +--
 object_store/src/azure/client.rs     |  44 +++--
 object_store/src/azure/credential.rs |   8 +-
 object_store/src/azure/mod.rs        |  19 ++-
 object_store/src/buffered.rs         |  16 +-
 object_store/src/chunked.rs          |  16 +-
 object_store/src/client/retry.rs     | 165 ++++++++++--------
 object_store/src/gcp/client.rs       |  50 ++++--
 object_store/src/gcp/credential.rs   |   4 +-
 object_store/src/gcp/mod.rs          |  23 +--
 object_store/src/http/client.rs      |  23 ++-
 object_store/src/http/mod.rs         |  12 +-
 object_store/src/lib.rs              | 142 ++++++++++------
 object_store/src/limit.rs            |  17 +-
 object_store/src/local.rs            |  53 +++---
 object_store/src/memory.rs           |  34 ++--
 object_store/src/multipart.rs        |   5 +-
 object_store/src/payload.rs          | 314 +++++++++++++++++++++++++++++++++++
 object_store/src/prefix.rs           |  24 +--
 object_store/src/throttle.rs         |  30 ++--
 object_store/src/upload.rs           |  44 +++--
 object_store/tests/get_range_file.rs |  13 +-
 26 files changed, 843 insertions(+), 346 deletions(-)

diff --git a/object_store/src/aws/checksum.rs b/object_store/src/aws/checksum.rs
index a50bd2d18b9..d15bbf08df6 100644
--- a/object_store/src/aws/checksum.rs
+++ b/object_store/src/aws/checksum.rs
@@ -16,7 +16,6 @@
 // under the License.
 
 use crate::config::Parse;
-use ring::digest::{self, digest as ring_digest};
 use std::str::FromStr;
 
 #[allow(non_camel_case_types)]
@@ -27,20 +26,6 @@ pub enum Checksum {
     SHA256,
 }
 
-impl Checksum {
-    pub(super) fn digest(&self, bytes: &[u8]) -> Vec<u8> {
-        match self {
-            Self::SHA256 => ring_digest(&digest::SHA256, 
bytes).as_ref().to_owned(),
-        }
-    }
-
-    pub(super) fn header_name(&self) -> &'static str {
-        match self {
-            Self::SHA256 => "x-amz-checksum-sha256",
-        }
-    }
-}
-
 impl std::fmt::Display for Checksum {
     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
         match &self {
diff --git a/object_store/src/aws/client.rs b/object_store/src/aws/client.rs
index 838bef8ac23..c1789ed143e 100644
--- a/object_store/src/aws/client.rs
+++ b/object_store/src/aws/client.rs
@@ -35,7 +35,8 @@ use crate::client::GetOptionsExt;
 use crate::multipart::PartId;
 use crate::path::DELIMITER;
 use crate::{
-    ClientOptions, GetOptions, ListResult, MultipartId, Path, PutResult, 
Result, RetryConfig,
+    ClientOptions, GetOptions, ListResult, MultipartId, Path, PutPayload, 
PutResult, Result,
+    RetryConfig,
 };
 use async_trait::async_trait;
 use base64::prelude::BASE64_STANDARD;
@@ -51,11 +52,14 @@ use reqwest::{
     header::{CONTENT_LENGTH, CONTENT_TYPE},
     Client as ReqwestClient, Method, RequestBuilder, Response,
 };
+use ring::digest;
+use ring::digest::Context;
 use serde::{Deserialize, Serialize};
 use snafu::{ResultExt, Snafu};
 use std::sync::Arc;
 
 const VERSION_HEADER: &str = "x-amz-version-id";
+const SHA256_CHECKSUM: &str = "x-amz-checksum-sha256";
 
 /// A specialized `Error` for object store-related errors
 #[derive(Debug, Snafu)]
@@ -266,7 +270,8 @@ pub(crate) struct Request<'a> {
     path: &'a Path,
     config: &'a S3Config,
     builder: RequestBuilder,
-    payload_sha256: Option<Vec<u8>>,
+    payload_sha256: Option<digest::Digest>,
+    payload: Option<PutPayload>,
     use_session_creds: bool,
     idempotent: bool,
 }
@@ -286,7 +291,7 @@ impl<'a> Request<'a> {
         Self { builder, ..self }
     }
 
-    pub fn set_idempotent(mut self, idempotent: bool) -> Self {
+    pub fn idempotent(mut self, idempotent: bool) -> Self {
         self.idempotent = idempotent;
         self
     }
@@ -301,10 +306,15 @@ impl<'a> Request<'a> {
             },
         };
 
+        let sha = self.payload_sha256.as_ref().map(|x| x.as_ref());
+
         let path = self.path.as_ref();
         self.builder
-            .with_aws_sigv4(credential.authorizer(), 
self.payload_sha256.as_deref())
-            .send_retry_with_idempotency(&self.config.retry_config, 
self.idempotent)
+            .with_aws_sigv4(credential.authorizer(), sha)
+            .retryable(&self.config.retry_config)
+            .idempotent(self.idempotent)
+            .payload(self.payload)
+            .send()
             .await
             .context(RetrySnafu { path })
     }
@@ -333,7 +343,7 @@ impl S3Client {
     pub fn put_request<'a>(
         &'a self,
         path: &'a Path,
-        bytes: Bytes,
+        payload: PutPayload,
         with_encryption_headers: bool,
     ) -> Request<'a> {
         let url = self.config.path_url(path);
@@ -341,20 +351,17 @@ impl S3Client {
         if with_encryption_headers {
             builder = 
builder.headers(self.config.encryption_headers.clone().into());
         }
-        let mut payload_sha256 = None;
 
-        if let Some(checksum) = self.config.checksum {
-            let digest = checksum.digest(&bytes);
-            builder = builder.header(checksum.header_name(), 
BASE64_STANDARD.encode(&digest));
-            if checksum == Checksum::SHA256 {
-                payload_sha256 = Some(digest);
-            }
-        }
+        let mut sha256 = Context::new(&digest::SHA256);
+        payload.iter().for_each(|x| sha256.update(x));
+        let payload_sha256 = sha256.finish();
 
-        builder = match bytes.is_empty() {
-            true => builder.header(CONTENT_LENGTH, 0), // Handle empty uploads 
(#4514)
-            false => builder.body(bytes),
-        };
+        if let Some(Checksum::SHA256) = self.config.checksum {
+            builder = builder.header(
+                "x-amz-checksum-sha256",
+                BASE64_STANDARD.encode(payload_sha256),
+            )
+        }
 
         if let Some(value) = self.config.client_options.get_content_type(path) 
{
             builder = builder.header(CONTENT_TYPE, value);
@@ -362,8 +369,9 @@ impl S3Client {
 
         Request {
             path,
-            builder,
-            payload_sha256,
+            builder: builder.header(CONTENT_LENGTH, payload.content_length()),
+            payload: Some(payload),
+            payload_sha256: Some(payload_sha256),
             config: &self.config,
             use_session_creds: true,
             idempotent: false,
@@ -446,16 +454,8 @@ impl S3Client {
 
         let mut builder = self.client.request(Method::POST, url);
 
-        // Compute checksum - S3 *requires* this for DeleteObjects requests, 
so we default to
-        // their algorithm if the user hasn't specified one.
-        let checksum = self.config.checksum.unwrap_or(Checksum::SHA256);
-        let digest = checksum.digest(&body);
-        builder = builder.header(checksum.header_name(), 
BASE64_STANDARD.encode(&digest));
-        let payload_sha256 = if checksum == Checksum::SHA256 {
-            Some(digest)
-        } else {
-            None
-        };
+        let digest = digest::digest(&digest::SHA256, &body);
+        builder = builder.header(SHA256_CHECKSUM, 
BASE64_STANDARD.encode(digest));
 
         // S3 *requires* DeleteObjects to include a Content-MD5 header:
         // 
https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html
@@ -468,8 +468,8 @@ impl S3Client {
         let response = builder
             .header(CONTENT_TYPE, "application/xml")
             .body(body)
-            .with_aws_sigv4(credential.authorizer(), payload_sha256.as_deref())
-            .send_retry_with_idempotency(&self.config.retry_config, false)
+            .with_aws_sigv4(credential.authorizer(), Some(digest.as_ref()))
+            .send_retry(&self.config.retry_config)
             .await
             .context(DeleteObjectsRequestSnafu {})?
             .bytes()
@@ -515,6 +515,7 @@ impl S3Client {
             builder,
             path: from,
             config: &self.config,
+            payload: None,
             payload_sha256: None,
             use_session_creds: false,
             idempotent: false,
@@ -530,7 +531,9 @@ impl S3Client {
             .request(Method::POST, url)
             .headers(self.config.encryption_headers.clone().into())
             .with_aws_sigv4(credential.authorizer(), None)
-            .send_retry_with_idempotency(&self.config.retry_config, true)
+            .retryable(&self.config.retry_config)
+            .idempotent(true)
+            .send()
             .await
             .context(CreateMultipartRequestSnafu)?
             .bytes()
@@ -548,14 +551,14 @@ impl S3Client {
         path: &Path,
         upload_id: &MultipartId,
         part_idx: usize,
-        data: Bytes,
+        data: PutPayload,
     ) -> Result<PartId> {
         let part = (part_idx + 1).to_string();
 
         let response = self
             .put_request(path, data, false)
             .query(&[("partNumber", &part), ("uploadId", upload_id)])
-            .set_idempotent(true)
+            .idempotent(true)
             .send()
             .await?;
 
@@ -573,7 +576,7 @@ impl S3Client {
             // If no parts were uploaded, upload an empty part
             // otherwise the completion request will fail
             let part = self
-                .put_part(location, &upload_id.to_string(), 0, Bytes::new())
+                .put_part(location, &upload_id.to_string(), 0, 
PutPayload::default())
                 .await?;
             vec![part]
         } else {
@@ -591,7 +594,9 @@ impl S3Client {
             .query(&[("uploadId", upload_id)])
             .body(body)
             .with_aws_sigv4(credential.authorizer(), None)
-            .send_retry_with_idempotency(&self.config.retry_config, true)
+            .retryable(&self.config.retry_config)
+            .idempotent(true)
+            .send()
             .await
             .context(CompleteMultipartRequestSnafu)?;
 
diff --git a/object_store/src/aws/credential.rs 
b/object_store/src/aws/credential.rs
index a7d1a9772aa..08831fd5123 100644
--- a/object_store/src/aws/credential.rs
+++ b/object_store/src/aws/credential.rs
@@ -517,7 +517,9 @@ async fn instance_creds(
     let token_result = client
         .request(Method::PUT, token_url)
         .header("X-aws-ec2-metadata-token-ttl-seconds", "600") // 10 minute TTL
-        .send_retry_with_idempotency(retry_config, true)
+        .retryable(retry_config)
+        .idempotent(true)
+        .send()
         .await;
 
     let token = match token_result {
@@ -607,7 +609,9 @@ async fn web_identity(
             ("Version", "2011-06-15"),
             ("WebIdentityToken", &token),
         ])
-        .send_retry_with_idempotency(retry_config, true)
+        .retryable(retry_config)
+        .idempotent(true)
+        .send()
         .await?
         .bytes()
         .await?;
diff --git a/object_store/src/aws/dynamo.rs b/object_store/src/aws/dynamo.rs
index 2e60bbad222..2390187e7f7 100644
--- a/object_store/src/aws/dynamo.rs
+++ b/object_store/src/aws/dynamo.rs
@@ -186,11 +186,7 @@ impl DynamoCommit {
         to: &Path,
     ) -> Result<()> {
         self.conditional_op(client, to, None, || async {
-            client
-                .copy_request(from, to)
-                .set_idempotent(false)
-                .send()
-                .await?;
+            client.copy_request(from, to).send().await?;
             Ok(())
         })
         .await
diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs
index 16af4d3b410..9e741c9142d 100644
--- a/object_store/src/aws/mod.rs
+++ b/object_store/src/aws/mod.rs
@@ -29,7 +29,6 @@
 //! [automatic cleanup]: 
https://aws.amazon.com/blogs/aws/s3-lifecycle-management-update-support-for-multipart-uploads-and-delete-markers/
 
 use async_trait::async_trait;
-use bytes::Bytes;
 use futures::stream::BoxStream;
 use futures::{StreamExt, TryStreamExt};
 use reqwest::header::{HeaderName, IF_MATCH, IF_NONE_MATCH};
@@ -46,7 +45,7 @@ use crate::signer::Signer;
 use crate::util::STRICT_ENCODE_SET;
 use crate::{
     Error, GetOptions, GetResult, ListResult, MultipartId, MultipartUpload, 
ObjectMeta,
-    ObjectStore, Path, PutMode, PutOptions, PutResult, Result, UploadPart,
+    ObjectStore, Path, PutMode, PutOptions, PutPayload, PutResult, Result, 
UploadPart,
 };
 
 static TAGS_HEADER: HeaderName = HeaderName::from_static("x-amz-tagging");
@@ -151,15 +150,20 @@ impl Signer for AmazonS3 {
 
 #[async_trait]
 impl ObjectStore for AmazonS3 {
-    async fn put_opts(&self, location: &Path, bytes: Bytes, opts: PutOptions) 
-> Result<PutResult> {
-        let mut request = self.client.put_request(location, bytes, true);
+    async fn put_opts(
+        &self,
+        location: &Path,
+        payload: PutPayload,
+        opts: PutOptions,
+    ) -> Result<PutResult> {
+        let mut request = self.client.put_request(location, payload, true);
         let tags = opts.tags.encoded();
         if !tags.is_empty() && !self.client.config.disable_tagging {
             request = request.header(&TAGS_HEADER, tags);
         }
 
         match (opts.mode, &self.client.config.conditional_put) {
-            (PutMode::Overwrite, _) => 
request.set_idempotent(true).do_put().await,
+            (PutMode::Overwrite, _) => request.idempotent(true).do_put().await,
             (PutMode::Create | PutMode::Update(_), None) => 
Err(Error::NotImplemented),
             (PutMode::Create, Some(S3ConditionalPut::ETagMatch)) => {
                 match request.header(&IF_NONE_MATCH, "*").do_put().await {
@@ -270,7 +274,7 @@ impl ObjectStore for AmazonS3 {
     async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
         self.client
             .copy_request(from, to)
-            .set_idempotent(true)
+            .idempotent(true)
             .send()
             .await?;
         Ok(())
@@ -320,7 +324,7 @@ struct UploadState {
 
 #[async_trait]
 impl MultipartUpload for S3MultiPartUpload {
-    fn put_part(&mut self, data: Bytes) -> UploadPart {
+    fn put_part(&mut self, data: PutPayload) -> UploadPart {
         let idx = self.part_idx;
         self.part_idx += 1;
         let state = Arc::clone(&self.state);
@@ -362,7 +366,7 @@ impl MultipartStore for AmazonS3 {
         path: &Path,
         id: &MultipartId,
         part_idx: usize,
-        data: Bytes,
+        data: PutPayload,
     ) -> Result<PartId> {
         self.client.put_part(path, id, part_idx, data).await
     }
@@ -385,7 +389,6 @@ impl MultipartStore for AmazonS3 {
 mod tests {
     use super::*;
     use crate::{client::get::GetClient, tests::*};
-    use bytes::Bytes;
     use hyper::HeaderMap;
 
     const NON_EXISTENT_NAME: &str = "nonexistentname";
@@ -474,7 +477,7 @@ mod tests {
         let integration = config.build().unwrap();
 
         let location = Path::from_iter([NON_EXISTENT_NAME]);
-        let data = Bytes::from("arbitrary data");
+        let data = PutPayload::from("arbitrary data");
 
         let err = integration.put(&location, data).await.unwrap_err();
         assert!(matches!(err, crate::Error::NotFound { .. }), "{}", err);
@@ -531,7 +534,7 @@ mod tests {
     async fn s3_encryption(store: &AmazonS3) {
         crate::test_util::maybe_skip_integration!();
 
-        let data = Bytes::from(vec![3u8; 1024]);
+        let data = PutPayload::from(vec![3u8; 1024]);
 
         let encryption_headers: HeaderMap = 
store.client.config.encryption_headers.clone().into();
         let expected_encryption =
diff --git a/object_store/src/azure/client.rs b/object_store/src/azure/client.rs
index 0e6af50fbf9..d5972d0a8c1 100644
--- a/object_store/src/azure/client.rs
+++ b/object_store/src/azure/client.rs
@@ -27,8 +27,8 @@ use crate::multipart::PartId;
 use crate::path::DELIMITER;
 use crate::util::{deserialize_rfc1123, GetRange};
 use crate::{
-    ClientOptions, GetOptions, ListResult, ObjectMeta, Path, PutMode, 
PutOptions, PutResult,
-    Result, RetryConfig,
+    ClientOptions, GetOptions, ListResult, ObjectMeta, Path, PutMode, 
PutOptions, PutPayload,
+    PutResult, Result, RetryConfig,
 };
 use async_trait::async_trait;
 use base64::prelude::BASE64_STANDARD;
@@ -171,6 +171,7 @@ impl AzureConfig {
 struct PutRequest<'a> {
     path: &'a Path,
     config: &'a AzureConfig,
+    payload: PutPayload,
     builder: RequestBuilder,
     idempotent: bool,
 }
@@ -195,8 +196,12 @@ impl<'a> PutRequest<'a> {
         let credential = self.config.get_credential().await?;
         let response = self
             .builder
+            .header(CONTENT_LENGTH, self.payload.content_length())
             .with_azure_authorization(&credential, &self.config.account)
-            .send_retry_with_idempotency(&self.config.retry_config, 
self.idempotent)
+            .retryable(&self.config.retry_config)
+            .idempotent(true)
+            .payload(Some(self.payload))
+            .send()
             .await
             .context(PutRequestSnafu {
                 path: self.path.as_ref(),
@@ -228,7 +233,7 @@ impl AzureClient {
         self.config.get_credential().await
     }
 
-    fn put_request<'a>(&'a self, path: &'a Path, bytes: Bytes) -> 
PutRequest<'a> {
+    fn put_request<'a>(&'a self, path: &'a Path, payload: PutPayload) -> 
PutRequest<'a> {
         let url = self.config.path_url(path);
 
         let mut builder = self.client.request(Method::PUT, url);
@@ -237,21 +242,23 @@ impl AzureClient {
             builder = builder.header(CONTENT_TYPE, value);
         }
 
-        builder = builder
-            .header(CONTENT_LENGTH, HeaderValue::from(bytes.len()))
-            .body(bytes);
-
         PutRequest {
             path,
             builder,
+            payload,
             config: &self.config,
             idempotent: false,
         }
     }
 
     /// Make an Azure PUT request 
<https://docs.microsoft.com/en-us/rest/api/storageservices/put-blob>
-    pub async fn put_blob(&self, path: &Path, bytes: Bytes, opts: PutOptions) 
-> Result<PutResult> {
-        let builder = self.put_request(path, bytes);
+    pub async fn put_blob(
+        &self,
+        path: &Path,
+        payload: PutPayload,
+        opts: PutOptions,
+    ) -> Result<PutResult> {
+        let builder = self.put_request(path, payload);
 
         let builder = match &opts.mode {
             PutMode::Overwrite => builder.set_idempotent(true),
@@ -272,11 +279,16 @@ impl AzureClient {
     }
 
     /// PUT a block 
<https://learn.microsoft.com/en-us/rest/api/storageservices/put-block>
-    pub async fn put_block(&self, path: &Path, part_idx: usize, data: Bytes) 
-> Result<PartId> {
+    pub async fn put_block(
+        &self,
+        path: &Path,
+        part_idx: usize,
+        payload: PutPayload,
+    ) -> Result<PartId> {
         let content_id = format!("{part_idx:20}");
         let block_id = BASE64_STANDARD.encode(&content_id);
 
-        self.put_request(path, data)
+        self.put_request(path, payload)
             .query(&[("comp", "block"), ("blockid", &block_id)])
             .set_idempotent(true)
             .send()
@@ -349,7 +361,9 @@ impl AzureClient {
 
         builder
             .with_azure_authorization(&credential, &self.config.account)
-            .send_retry_with_idempotency(&self.config.retry_config, true)
+            .retryable(&self.config.retry_config)
+            .idempotent(overwrite)
+            .send()
             .await
             .map_err(|err| err.error(STORE, from.to_string()))?;
 
@@ -382,7 +396,9 @@ impl AzureClient {
             .body(body)
             .query(&[("restype", "service"), ("comp", "userdelegationkey")])
             .with_azure_authorization(&credential, &self.config.account)
-            .send_retry_with_idempotency(&self.config.retry_config, true)
+            .retryable(&self.config.retry_config)
+            .idempotent(true)
+            .send()
             .await
             .context(DelegationKeyRequestSnafu)?
             .bytes()
diff --git a/object_store/src/azure/credential.rs 
b/object_store/src/azure/credential.rs
index 36845bd1d64..c8212a9290f 100644
--- a/object_store/src/azure/credential.rs
+++ b/object_store/src/azure/credential.rs
@@ -615,7 +615,9 @@ impl TokenProvider for ClientSecretOAuthProvider {
                 ("scope", AZURE_STORAGE_SCOPE),
                 ("grant_type", "client_credentials"),
             ])
-            .send_retry_with_idempotency(retry, true)
+            .retryable(retry)
+            .idempotent(true)
+            .send()
             .await
             .context(TokenRequestSnafu)?
             .json()
@@ -797,7 +799,9 @@ impl TokenProvider for WorkloadIdentityOAuthProvider {
                 ("scope", AZURE_STORAGE_SCOPE),
                 ("grant_type", "client_credentials"),
             ])
-            .send_retry_with_idempotency(retry, true)
+            .retryable(retry)
+            .idempotent(true)
+            .send()
             .await
             .context(TokenRequestSnafu)?
             .json()
diff --git a/object_store/src/azure/mod.rs b/object_store/src/azure/mod.rs
index 5d3a405ccc9..8dc52422b7d 100644
--- a/object_store/src/azure/mod.rs
+++ b/object_store/src/azure/mod.rs
@@ -27,10 +27,9 @@ use crate::{
     path::Path,
     signer::Signer,
     GetOptions, GetResult, ListResult, MultipartId, MultipartUpload, 
ObjectMeta, ObjectStore,
-    PutOptions, PutResult, Result, UploadPart,
+    PutOptions, PutPayload, PutResult, Result, UploadPart,
 };
 use async_trait::async_trait;
-use bytes::Bytes;
 use futures::stream::BoxStream;
 use reqwest::Method;
 use std::fmt::Debug;
@@ -87,8 +86,13 @@ impl std::fmt::Display for MicrosoftAzure {
 
 #[async_trait]
 impl ObjectStore for MicrosoftAzure {
-    async fn put_opts(&self, location: &Path, bytes: Bytes, opts: PutOptions) 
-> Result<PutResult> {
-        self.client.put_blob(location, bytes, opts).await
+    async fn put_opts(
+        &self,
+        location: &Path,
+        payload: PutPayload,
+        opts: PutOptions,
+    ) -> Result<PutResult> {
+        self.client.put_blob(location, payload, opts).await
     }
 
     async fn put_multipart(&self, location: &Path) -> Result<Box<dyn 
MultipartUpload>> {
@@ -203,7 +207,7 @@ struct UploadState {
 
 #[async_trait]
 impl MultipartUpload for AzureMultiPartUpload {
-    fn put_part(&mut self, data: Bytes) -> UploadPart {
+    fn put_part(&mut self, data: PutPayload) -> UploadPart {
         let idx = self.part_idx;
         self.part_idx += 1;
         let state = Arc::clone(&self.state);
@@ -240,7 +244,7 @@ impl MultipartStore for MicrosoftAzure {
         path: &Path,
         _: &MultipartId,
         part_idx: usize,
-        data: Bytes,
+        data: PutPayload,
     ) -> Result<PartId> {
         self.client.put_block(path, part_idx, data).await
     }
@@ -265,6 +269,7 @@ impl MultipartStore for MicrosoftAzure {
 mod tests {
     use super::*;
     use crate::tests::*;
+    use bytes::Bytes;
 
     #[tokio::test]
     async fn azure_blob_test() {
@@ -309,7 +314,7 @@ mod tests {
 
         let data = Bytes::from("hello world");
         let path = Path::from("file.txt");
-        integration.put(&path, data.clone()).await.unwrap();
+        integration.put(&path, data.clone().into()).await.unwrap();
 
         let signed = integration
             .signed_url(Method::GET, &path, Duration::from_secs(60))
diff --git a/object_store/src/buffered.rs b/object_store/src/buffered.rs
index de6d4eb1bb9..d41224177a3 100644
--- a/object_store/src/buffered.rs
+++ b/object_store/src/buffered.rs
@@ -18,7 +18,7 @@
 //! Utilities for performing tokio-style buffered IO
 
 use crate::path::Path;
-use crate::{ObjectMeta, ObjectStore, WriteMultipart};
+use crate::{ObjectMeta, ObjectStore, PutPayloadMut, WriteMultipart};
 use bytes::Bytes;
 use futures::future::{BoxFuture, FutureExt};
 use futures::ready;
@@ -231,7 +231,7 @@ impl std::fmt::Debug for BufWriter {
 
 enum BufWriterState {
     /// Buffer up to capacity bytes
-    Buffer(Path, Vec<u8>),
+    Buffer(Path, PutPayloadMut),
     /// [`ObjectStore::put_multipart`]
     Prepare(BoxFuture<'static, std::io::Result<WriteMultipart>>),
     /// Write to a multipart upload
@@ -252,7 +252,7 @@ impl BufWriter {
             capacity,
             store,
             max_concurrency: 8,
-            state: BufWriterState::Buffer(path, Vec::new()),
+            state: BufWriterState::Buffer(path, PutPayloadMut::new()),
         }
     }
 
@@ -303,14 +303,16 @@ impl AsyncWrite for BufWriter {
                     continue;
                 }
                 BufWriterState::Buffer(path, b) => {
-                    if b.len().saturating_add(buf.len()) >= cap {
+                    if b.content_length().saturating_add(buf.len()) >= cap {
                         let buffer = std::mem::take(b);
                         let path = std::mem::take(path);
                         let store = Arc::clone(&self.store);
                         self.state = BufWriterState::Prepare(Box::pin(async 
move {
                             let upload = store.put_multipart(&path).await?;
-                            let mut chunked = WriteMultipart::new(upload);
-                            chunked.write(&buffer);
+                            let mut chunked = 
WriteMultipart::new_with_chunk_size(upload, cap);
+                            for chunk in buffer.freeze() {
+                                chunked.put(chunk);
+                            }
                             Ok(chunked)
                         }));
                         continue;
@@ -391,7 +393,7 @@ mod tests {
         const BYTES: usize = 4096;
 
         let data: Bytes = 
b"12345678".iter().cycle().copied().take(BYTES).collect();
-        store.put(&existent, data.clone()).await.unwrap();
+        store.put(&existent, data.clone().into()).await.unwrap();
 
         let meta = store.head(&existent).await.unwrap();
 
diff --git a/object_store/src/chunked.rs b/object_store/src/chunked.rs
index 6db7f4b35e2..9abe49dbfce 100644
--- a/object_store/src/chunked.rs
+++ b/object_store/src/chunked.rs
@@ -27,11 +27,11 @@ use futures::stream::BoxStream;
 use futures::StreamExt;
 
 use crate::path::Path;
-use crate::Result;
 use crate::{
     GetOptions, GetResult, GetResultPayload, ListResult, MultipartUpload, 
ObjectMeta, ObjectStore,
     PutOptions, PutResult,
 };
+use crate::{PutPayload, Result};
 
 /// Wraps a [`ObjectStore`] and makes its get response return chunks
 /// in a controllable manner.
@@ -62,8 +62,13 @@ impl Display for ChunkedStore {
 
 #[async_trait]
 impl ObjectStore for ChunkedStore {
-    async fn put_opts(&self, location: &Path, bytes: Bytes, opts: PutOptions) 
-> Result<PutResult> {
-        self.inner.put_opts(location, bytes, opts).await
+    async fn put_opts(
+        &self,
+        location: &Path,
+        payload: PutPayload,
+        opts: PutOptions,
+    ) -> Result<PutResult> {
+        self.inner.put_opts(location, payload, opts).await
     }
 
     async fn put_multipart(&self, location: &Path) -> Result<Box<dyn 
MultipartUpload>> {
@@ -176,10 +181,7 @@ mod tests {
     async fn test_chunked_basic() {
         let location = Path::parse("test").unwrap();
         let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
-        store
-            .put(&location, Bytes::from(vec![0; 1001]))
-            .await
-            .unwrap();
+        store.put(&location, vec![0; 1001].into()).await.unwrap();
 
         for chunk_size in [10, 20, 31] {
             let store = ChunkedStore::new(Arc::clone(&store), chunk_size);
diff --git a/object_store/src/client/retry.rs b/object_store/src/client/retry.rs
index f3fa7153e93..5dfdd55b654 100644
--- a/object_store/src/client/retry.rs
+++ b/object_store/src/client/retry.rs
@@ -18,10 +18,10 @@
 //! A shared HTTP client implementation incorporating retries
 
 use crate::client::backoff::{Backoff, BackoffConfig};
+use crate::PutPayload;
 use futures::future::BoxFuture;
-use futures::FutureExt;
 use reqwest::header::LOCATION;
-use reqwest::{Response, StatusCode};
+use reqwest::{Client, Request, Response, StatusCode};
 use snafu::Error as SnafuError;
 use snafu::Snafu;
 use std::time::{Duration, Instant};
@@ -166,26 +166,57 @@ impl Default for RetryConfig {
     }
 }
 
-fn send_retry_impl(
-    builder: reqwest::RequestBuilder,
-    config: &RetryConfig,
-    is_idempotent: Option<bool>,
-) -> BoxFuture<'static, Result<Response>> {
-    let mut backoff = Backoff::new(&config.backoff);
-    let max_retries = config.max_retries;
-    let retry_timeout = config.retry_timeout;
+pub struct RetryableRequest {
+    client: Client,
+    request: Request,
 
-    let (client, req) = builder.build_split();
-    let req = req.expect("request must be valid");
-    let is_idempotent = is_idempotent.unwrap_or(req.method().is_safe());
+    max_retries: usize,
+    retry_timeout: Duration,
+    backoff: Backoff,
 
-    async move {
+    idempotent: Option<bool>,
+    payload: Option<PutPayload>,
+}
+
+impl RetryableRequest {
+    /// Set whether this request is idempotent
+    ///
+    /// An idempotent request will be retried on timeout even if the request
+    /// method is not 
[safe](https://datatracker.ietf.org/doc/html/rfc7231#section-4.2.1)
+    pub fn idempotent(self, idempotent: bool) -> Self {
+        Self {
+            idempotent: Some(idempotent),
+            ..self
+        }
+    }
+
+    /// Provide a [`PutPayload`]
+    pub fn payload(self, payload: Option<PutPayload>) -> Self {
+        Self { payload, ..self }
+    }
+
+    pub async fn send(self) -> Result<Response> {
+        let max_retries = self.max_retries;
+        let retry_timeout = self.retry_timeout;
         let mut retries = 0;
         let now = Instant::now();
 
+        let mut backoff = self.backoff;
+        let is_idempotent = self
+            .idempotent
+            .unwrap_or_else(|| self.request.method().is_safe());
+
         loop {
-            let s = req.try_clone().expect("request body must be cloneable");
-            match client.execute(s).await {
+            let mut request = self
+                .request
+                .try_clone()
+                .expect("request body must be cloneable");
+
+            if let Some(payload) = &self.payload {
+                *request.body_mut() = Some(payload.body());
+            }
+
+            match self.client.execute(request).await {
                 Ok(r) => match r.error_for_status_ref() {
                     Ok(_) if r.status().is_success() => return Ok(r),
                     Ok(r) if r.status() == StatusCode::NOT_MODIFIED => {
@@ -195,47 +226,44 @@ fn send_retry_impl(
                         })
                     }
                     Ok(r) => {
-                        let is_bare_redirect = r.status().is_redirection() && 
!r.headers().contains_key(LOCATION);
+                        let is_bare_redirect =
+                            r.status().is_redirection() && 
!r.headers().contains_key(LOCATION);
                         return match is_bare_redirect {
                             true => Err(Error::BareRedirect),
                             // Not actually sure if this is reachable, but 
here for completeness
                             false => Err(Error::Client {
                                 body: None,
                                 status: r.status(),
-                            })
-                        }
+                            }),
+                        };
                     }
                     Err(e) => {
                         let status = r.status();
                         if retries == max_retries
                             || now.elapsed() > retry_timeout
-                            || !status.is_server_error() {
-
+                            || !status.is_server_error()
+                        {
                             return Err(match status.is_client_error() {
                                 true => match r.text().await {
-                                    Ok(body) => {
-                                        Error::Client {
-                                            body: Some(body).filter(|b| 
!b.is_empty()),
-                                            status,
-                                        }
-                                    }
-                                    Err(e) => {
-                                        Error::Reqwest {
-                                            retries,
-                                            max_retries,
-                                            elapsed: now.elapsed(),
-                                            retry_timeout,
-                                            source: e,
-                                        }
-                                    }
-                                }
+                                    Ok(body) => Error::Client {
+                                        body: Some(body).filter(|b| 
!b.is_empty()),
+                                        status,
+                                    },
+                                    Err(e) => Error::Reqwest {
+                                        retries,
+                                        max_retries,
+                                        elapsed: now.elapsed(),
+                                        retry_timeout,
+                                        source: e,
+                                    },
+                                },
                                 false => Error::Reqwest {
                                     retries,
                                     max_retries,
                                     elapsed: now.elapsed(),
                                     retry_timeout,
                                     source: e,
-                                }
+                                },
                             });
                         }
 
@@ -251,13 +279,13 @@ fn send_retry_impl(
                         tokio::time::sleep(sleep).await;
                     }
                 },
-                Err(e) =>
-                {
+                Err(e) => {
                     let mut do_retry = false;
                     if e.is_connect()
                         || e.is_body()
                         || (e.is_request() && !e.is_timeout())
-                        || (is_idempotent && e.is_timeout()) {
+                        || (is_idempotent && e.is_timeout())
+                    {
                         do_retry = true
                     } else {
                         let mut source = e.source();
@@ -267,7 +295,7 @@ fn send_retry_impl(
                                     || e.is_incomplete_message()
                                     || e.is_body_write_aborted()
                                     || (is_idempotent && e.is_timeout());
-                                break
+                                break;
                             }
                             if let Some(e) = 
e.downcast_ref::<std::io::Error>() {
                                 if e.kind() == std::io::ErrorKind::TimedOut {
@@ -276,9 +304,9 @@ fn send_retry_impl(
                                     do_retry = matches!(
                                         e.kind(),
                                         std::io::ErrorKind::ConnectionReset
-                                        | std::io::ErrorKind::ConnectionAborted
-                                        | std::io::ErrorKind::BrokenPipe
-                                        | std::io::ErrorKind::UnexpectedEof
+                                            | 
std::io::ErrorKind::ConnectionAborted
+                                            | std::io::ErrorKind::BrokenPipe
+                                            | std::io::ErrorKind::UnexpectedEof
                                     );
                                 }
                                 break;
@@ -287,17 +315,14 @@ fn send_retry_impl(
                         }
                     }
 
-                    if retries == max_retries
-                        || now.elapsed() > retry_timeout
-                        || !do_retry {
-
+                    if retries == max_retries || now.elapsed() > retry_timeout 
|| !do_retry {
                         return Err(Error::Reqwest {
                             retries,
                             max_retries,
                             elapsed: now.elapsed(),
                             retry_timeout,
                             source: e,
-                        })
+                        });
                     }
                     let sleep = backoff.next();
                     retries += 1;
@@ -313,39 +338,39 @@ fn send_retry_impl(
             }
         }
     }
-    .boxed()
 }
 
 pub trait RetryExt {
+    /// Return a [`RetryableRequest`]
+    fn retryable(self, config: &RetryConfig) -> RetryableRequest;
+
     /// Dispatch a request with the given retry configuration
     ///
     /// # Panic
     ///
     /// This will panic if the request body is a stream
     fn send_retry(self, config: &RetryConfig) -> BoxFuture<'static, 
Result<Response>>;
-
-    /// Dispatch a request with the given retry configuration and idempotency
-    ///
-    /// # Panic
-    ///
-    /// This will panic if the request body is a stream
-    fn send_retry_with_idempotency(
-        self,
-        config: &RetryConfig,
-        is_idempotent: bool,
-    ) -> BoxFuture<'static, Result<Response>>;
 }
 
 impl RetryExt for reqwest::RequestBuilder {
-    fn send_retry(self, config: &RetryConfig) -> BoxFuture<'static, 
Result<Response>> {
-        send_retry_impl(self, config, None)
+    fn retryable(self, config: &RetryConfig) -> RetryableRequest {
+        let (client, request) = self.build_split();
+        let request = request.expect("request must be valid");
+
+        RetryableRequest {
+            client,
+            request,
+            max_retries: config.max_retries,
+            retry_timeout: config.retry_timeout,
+            backoff: Backoff::new(&config.backoff),
+            idempotent: None,
+            payload: None,
+        }
     }
-    fn send_retry_with_idempotency(
-        self,
-        config: &RetryConfig,
-        is_idempotent: bool,
-    ) -> BoxFuture<'static, Result<Response>> {
-        send_retry_impl(self, config, Some(is_idempotent))
+
+    fn send_retry(self, config: &RetryConfig) -> BoxFuture<'static, 
Result<Response>> {
+        let request = self.retryable(config);
+        Box::pin(async move { request.send().await })
     }
 }
 
diff --git a/object_store/src/gcp/client.rs b/object_store/src/gcp/client.rs
index 4aed81a94c8..f91217f6f9a 100644
--- a/object_store/src/gcp/client.rs
+++ b/object_store/src/gcp/client.rs
@@ -29,13 +29,14 @@ use crate::multipart::PartId;
 use crate::path::{Path, DELIMITER};
 use crate::util::hex_encode;
 use crate::{
-    ClientOptions, GetOptions, ListResult, MultipartId, PutMode, PutOptions, 
PutResult, Result,
-    RetryConfig,
+    ClientOptions, GetOptions, ListResult, MultipartId, PutMode, PutOptions, 
PutPayload, PutResult,
+    Result, RetryConfig,
 };
 use async_trait::async_trait;
 use base64::prelude::BASE64_STANDARD;
 use base64::Engine;
-use bytes::{Buf, Bytes};
+use bytes::Buf;
+use hyper::header::CONTENT_LENGTH;
 use percent_encoding::{percent_encode, utf8_percent_encode, NON_ALPHANUMERIC};
 use reqwest::header::HeaderName;
 use reqwest::{header, Client, Method, RequestBuilder, Response, StatusCode};
@@ -172,6 +173,7 @@ impl GoogleCloudStorageConfig {
 pub struct PutRequest<'a> {
     path: &'a Path,
     config: &'a GoogleCloudStorageConfig,
+    payload: PutPayload,
     builder: RequestBuilder,
     idempotent: bool,
 }
@@ -197,7 +199,11 @@ impl<'a> PutRequest<'a> {
         let response = self
             .builder
             .bearer_auth(&credential.bearer)
-            .send_retry_with_idempotency(&self.config.retry_config, 
self.idempotent)
+            .header(CONTENT_LENGTH, self.payload.content_length())
+            .retryable(&self.config.retry_config)
+            .idempotent(self.idempotent)
+            .payload(Some(self.payload))
+            .send()
             .await
             .context(PutRequestSnafu {
                 path: self.path.as_ref(),
@@ -287,7 +293,9 @@ impl GoogleCloudStorageClient {
             .post(&url)
             .bearer_auth(&credential.bearer)
             .json(&body)
-            .send_retry_with_idempotency(&self.config.retry_config, true)
+            .retryable(&self.config.retry_config)
+            .idempotent(true)
+            .send()
             .await
             .context(SignBlobRequestSnafu)?;
 
@@ -315,7 +323,7 @@ impl GoogleCloudStorageClient {
     /// Perform a put request 
<https://cloud.google.com/storage/docs/xml-api/put-object-upload>
     ///
     /// Returns the new ETag
-    pub fn put_request<'a>(&'a self, path: &'a Path, payload: Bytes) -> 
PutRequest<'a> {
+    pub fn put_request<'a>(&'a self, path: &'a Path, payload: PutPayload) -> 
PutRequest<'a> {
         let url = self.object_url(path);
 
         let content_type = self
@@ -327,20 +335,24 @@ impl GoogleCloudStorageClient {
         let builder = self
             .client
             .request(Method::PUT, url)
-            .header(header::CONTENT_TYPE, content_type)
-            .header(header::CONTENT_LENGTH, payload.len())
-            .body(payload);
+            .header(header::CONTENT_TYPE, content_type);
 
         PutRequest {
             path,
             builder,
+            payload,
             config: &self.config,
             idempotent: false,
         }
     }
 
-    pub async fn put(&self, path: &Path, data: Bytes, opts: PutOptions) -> 
Result<PutResult> {
-        let builder = self.put_request(path, data);
+    pub async fn put(
+        &self,
+        path: &Path,
+        payload: PutPayload,
+        opts: PutOptions,
+    ) -> Result<PutResult> {
+        let builder = self.put_request(path, payload);
 
         let builder = match &opts.mode {
             PutMode::Overwrite => builder.set_idempotent(true),
@@ -367,7 +379,7 @@ impl GoogleCloudStorageClient {
         path: &Path,
         upload_id: &MultipartId,
         part_idx: usize,
-        data: Bytes,
+        data: PutPayload,
     ) -> Result<PartId> {
         let query = &[
             ("partNumber", &format!("{}", part_idx + 1)),
@@ -403,7 +415,9 @@ impl GoogleCloudStorageClient {
             .header(header::CONTENT_TYPE, content_type)
             .header(header::CONTENT_LENGTH, "0")
             .query(&[("uploads", "")])
-            .send_retry_with_idempotency(&self.config.retry_config, true)
+            .retryable(&self.config.retry_config)
+            .idempotent(true)
+            .send()
             .await
             .context(PutRequestSnafu {
                 path: path.as_ref(),
@@ -472,7 +486,9 @@ impl GoogleCloudStorageClient {
             .bearer_auth(&credential.bearer)
             .query(&[("uploadId", upload_id)])
             .body(data)
-            .send_retry_with_idempotency(&self.config.retry_config, true)
+            .retryable(&self.config.retry_config)
+            .idempotent(true)
+            .send()
             .await
             .context(CompleteMultipartRequestSnafu)?;
 
@@ -530,8 +546,10 @@ impl GoogleCloudStorageClient {
             .bearer_auth(&credential.bearer)
             // Needed if reqwest is compiled with native-tls instead of 
rustls-tls
             // See https://github.com/apache/arrow-rs/pull/3921
-            .header(header::CONTENT_LENGTH, 0)
-            .send_retry_with_idempotency(&self.config.retry_config, 
!if_not_exists)
+            .header(CONTENT_LENGTH, 0)
+            .retryable(&self.config.retry_config)
+            .idempotent(!if_not_exists)
+            .send()
             .await
             .map_err(|err| match err.status() {
                 Some(StatusCode::PRECONDITION_FAILED) => 
crate::Error::AlreadyExists {
diff --git a/object_store/src/gcp/credential.rs 
b/object_store/src/gcp/credential.rs
index abb04173f69..d7fc2cea2c7 100644
--- a/object_store/src/gcp/credential.rs
+++ b/object_store/src/gcp/credential.rs
@@ -623,7 +623,9 @@ impl TokenProvider for AuthorizedUserCredentials {
                 ("client_secret", &self.client_secret),
                 ("refresh_token", &self.refresh_token),
             ])
-            .send_retry_with_idempotency(retry, true)
+            .retryable(retry)
+            .idempotent(true)
+            .send()
             .await
             .context(TokenRequestSnafu)?
             .json::<TokenResponse>()
diff --git a/object_store/src/gcp/mod.rs b/object_store/src/gcp/mod.rs
index 96afa45f2b6..149da76f559 100644
--- a/object_store/src/gcp/mod.rs
+++ b/object_store/src/gcp/mod.rs
@@ -42,10 +42,9 @@ use crate::gcp::credential::GCSAuthorizer;
 use crate::signer::Signer;
 use crate::{
     multipart::PartId, path::Path, GetOptions, GetResult, ListResult, 
MultipartId, MultipartUpload,
-    ObjectMeta, ObjectStore, PutOptions, PutResult, Result, UploadPart,
+    ObjectMeta, ObjectStore, PutOptions, PutPayload, PutResult, Result, 
UploadPart,
 };
 use async_trait::async_trait;
-use bytes::Bytes;
 use client::GoogleCloudStorageClient;
 use futures::stream::BoxStream;
 use hyper::Method;
@@ -115,14 +114,14 @@ struct UploadState {
 
 #[async_trait]
 impl MultipartUpload for GCSMultipartUpload {
-    fn put_part(&mut self, data: Bytes) -> UploadPart {
+    fn put_part(&mut self, payload: PutPayload) -> UploadPart {
         let idx = self.part_idx;
         self.part_idx += 1;
         let state = Arc::clone(&self.state);
         Box::pin(async move {
             let part = state
                 .client
-                .put_part(&state.path, &state.multipart_id, idx, data)
+                .put_part(&state.path, &state.multipart_id, idx, payload)
                 .await?;
             state.parts.put(idx, part);
             Ok(())
@@ -148,8 +147,13 @@ impl MultipartUpload for GCSMultipartUpload {
 
 #[async_trait]
 impl ObjectStore for GoogleCloudStorage {
-    async fn put_opts(&self, location: &Path, bytes: Bytes, opts: PutOptions) 
-> Result<PutResult> {
-        self.client.put(location, bytes, opts).await
+    async fn put_opts(
+        &self,
+        location: &Path,
+        payload: PutPayload,
+        opts: PutOptions,
+    ) -> Result<PutResult> {
+        self.client.put(location, payload, opts).await
     }
 
     async fn put_multipart(&self, location: &Path) -> Result<Box<dyn 
MultipartUpload>> {
@@ -210,9 +214,9 @@ impl MultipartStore for GoogleCloudStorage {
         path: &Path,
         id: &MultipartId,
         part_idx: usize,
-        data: Bytes,
+        payload: PutPayload,
     ) -> Result<PartId> {
-        self.client.put_part(path, id, part_idx, data).await
+        self.client.put_part(path, id, part_idx, payload).await
     }
 
     async fn complete_multipart(
@@ -260,7 +264,6 @@ impl Signer for GoogleCloudStorage {
 #[cfg(test)]
 mod test {
 
-    use bytes::Bytes;
     use credential::DEFAULT_GCS_BASE_URL;
 
     use crate::tests::*;
@@ -391,7 +394,7 @@ mod test {
         let integration = 
config.with_bucket_name(NON_EXISTENT_NAME).build().unwrap();
 
         let location = Path::from_iter([NON_EXISTENT_NAME]);
-        let data = Bytes::from("arbitrary data");
+        let data = PutPayload::from("arbitrary data");
 
         let err = integration
             .put(&location, data)
diff --git a/object_store/src/http/client.rs b/object_store/src/http/client.rs
index fdc8751c1ca..39f68ece65a 100644
--- a/object_store/src/http/client.rs
+++ b/object_store/src/http/client.rs
@@ -21,10 +21,11 @@ use crate::client::retry::{self, RetryConfig, RetryExt};
 use crate::client::GetOptionsExt;
 use crate::path::{Path, DELIMITER};
 use crate::util::deserialize_rfc1123;
-use crate::{ClientOptions, GetOptions, ObjectMeta, Result};
+use crate::{ClientOptions, GetOptions, ObjectMeta, PutPayload, Result};
 use async_trait::async_trait;
-use bytes::{Buf, Bytes};
+use bytes::Buf;
 use chrono::{DateTime, Utc};
+use hyper::header::CONTENT_LENGTH;
 use percent_encoding::percent_decode_str;
 use reqwest::header::CONTENT_TYPE;
 use reqwest::{Method, Response, StatusCode};
@@ -156,16 +157,24 @@ impl Client {
         Ok(())
     }
 
-    pub async fn put(&self, location: &Path, bytes: Bytes) -> Result<Response> 
{
+    pub async fn put(&self, location: &Path, payload: PutPayload) -> 
Result<Response> {
         let mut retry = false;
         loop {
             let url = self.path_url(location);
-            let mut builder = self.client.put(url).body(bytes.clone());
+            let mut builder = self.client.put(url);
             if let Some(value) = 
self.client_options.get_content_type(location) {
                 builder = builder.header(CONTENT_TYPE, value);
             }
 
-            match builder.send_retry(&self.retry_config).await {
+            let resp = builder
+                .header(CONTENT_LENGTH, payload.content_length())
+                .retryable(&self.retry_config)
+                .idempotent(true)
+                .payload(Some(payload.clone()))
+                .send()
+                .await;
+
+            match resp {
                 Ok(response) => return Ok(response),
                 Err(source) => match source.status() {
                     // Some implementations return 404 instead of 409
@@ -189,7 +198,9 @@ impl Client {
             .client
             .request(method, url)
             .header("Depth", depth)
-            .send_retry_with_idempotency(&self.retry_config, true)
+            .retryable(&self.retry_config)
+            .idempotent(true)
+            .send()
             .await;
 
         let response = match result {
diff --git a/object_store/src/http/mod.rs b/object_store/src/http/mod.rs
index 626337df27f..a838a0f479d 100644
--- a/object_store/src/http/mod.rs
+++ b/object_store/src/http/mod.rs
@@ -32,7 +32,6 @@
 //! [WebDAV]: https://en.wikipedia.org/wiki/WebDAV
 
 use async_trait::async_trait;
-use bytes::Bytes;
 use futures::stream::BoxStream;
 use futures::{StreamExt, TryStreamExt};
 use itertools::Itertools;
@@ -45,7 +44,7 @@ use crate::http::client::Client;
 use crate::path::Path;
 use crate::{
     ClientConfigKey, ClientOptions, GetOptions, GetResult, ListResult, 
MultipartUpload, ObjectMeta,
-    ObjectStore, PutMode, PutOptions, PutResult, Result, RetryConfig,
+    ObjectStore, PutMode, PutOptions, PutPayload, PutResult, Result, 
RetryConfig,
 };
 
 mod client;
@@ -95,13 +94,18 @@ impl std::fmt::Display for HttpStore {
 
 #[async_trait]
 impl ObjectStore for HttpStore {
-    async fn put_opts(&self, location: &Path, bytes: Bytes, opts: PutOptions) 
-> Result<PutResult> {
+    async fn put_opts(
+        &self,
+        location: &Path,
+        payload: PutPayload,
+        opts: PutOptions,
+    ) -> Result<PutResult> {
         if opts.mode != PutMode::Overwrite {
             // TODO: Add support for If header - 
https://datatracker.ietf.org/doc/html/rfc2518#section-9.4
             return Err(crate::Error::NotImplemented);
         }
 
-        let response = self.client.put(location, bytes).await?;
+        let response = self.client.put(location, payload).await?;
         let e_tag = match get_etag(response.headers()) {
             Ok(e_tag) => Some(e_tag),
             Err(crate::client::header::Error::MissingEtag) => None,
diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs
index 97604a7dce6..157852ff9a6 100644
--- a/object_store/src/lib.rs
+++ b/object_store/src/lib.rs
@@ -245,15 +245,14 @@
 //! # }
 //! ```
 //!
-//! #  Put Object
+//! # Put Object
 //!
 //! Use the [`ObjectStore::put`] method to atomically write data.
 //!
 //! ```
 //! # use object_store::local::LocalFileSystem;
-//! # use object_store::ObjectStore;
+//! # use object_store::{ObjectStore, PutPayload};
 //! # use std::sync::Arc;
-//! # use bytes::Bytes;
 //! # use object_store::path::Path;
 //! # fn get_object_store() -> Arc<dyn ObjectStore> {
 //! #   Arc::new(LocalFileSystem::new())
@@ -262,12 +261,12 @@
 //! #
 //! let object_store: Arc<dyn ObjectStore> = get_object_store();
 //! let path = Path::from("data/file1");
-//! let bytes = Bytes::from_static(b"hello");
-//! object_store.put(&path, bytes).await.unwrap();
+//! let payload = PutPayload::from_static(b"hello");
+//! object_store.put(&path, payload).await.unwrap();
 //! # }
 //! ```
 //!
-//! #  Multipart Upload
+//! # Multipart Upload
 //!
 //! Use the [`ObjectStore::put_multipart`] method to atomically write a large 
amount of data
 //!
@@ -320,6 +319,48 @@
 //! # }
 //! ```
 //!
+//! # Vectored Write
+//!
+//! When writing data it is often the case that the size of the output is not 
known ahead of time.
+//!
+//! A common approach to handling this is to bump-allocate a `Vec`, whereby 
the underlying
+//! allocation is repeatedly reallocated, each time doubling the capacity. The 
performance of
+//! this is suboptimal as reallocating memory will often involve copying it to 
a new location.
+//!
+//! Fortunately, as [`PutPayload`] does not require memory regions to be 
contiguous, it is
+//! possible to instead allocate memory in chunks and avoid bump allocating. 
[`PutPayloadMut`]
+//! encapsulates this approach
+//!
+//! ```
+//! # use object_store::local::LocalFileSystem;
+//! # use object_store::{ObjectStore, PutPayloadMut};
+//! # use std::sync::Arc;
+//! # use bytes::Bytes;
+//! # use tokio::io::AsyncWriteExt;
+//! # use object_store::path::Path;
+//! # fn get_object_store() -> Arc<dyn ObjectStore> {
+//! #   Arc::new(LocalFileSystem::new())
+//! # }
+//! # async fn multi_upload() {
+//! #
+//! let object_store: Arc<dyn ObjectStore> = get_object_store();
+//! let path = Path::from("data/large_file");
+//! let mut buffer = PutPayloadMut::new().with_block_size(8192);
+//! for _ in 0..22 {
+//!     buffer.extend_from_slice(&[0; 1024]);
+//! }
+//! let payload = buffer.freeze();
+//!
+//! // Payload consists of 3 separate 8KB allocations
+//! assert_eq!(payload.as_ref().len(), 3);
+//! assert_eq!(payload.as_ref()[0].len(), 8192);
+//! assert_eq!(payload.as_ref()[1].len(), 8192);
+//! assert_eq!(payload.as_ref()[2].len(), 6144);
+//!
+//! object_store.put(&path, payload).await.unwrap();
+//! # }
+//! ```
+//!
 //! # Conditional Fetch
 //!
 //! More complex object retrieval can be supported by 
[`ObjectStore::get_opts`].
@@ -427,7 +468,7 @@
 //!     let new = do_update(r.bytes().await.unwrap());
 //!
 //!     // Attempt to commit transaction
-//!     match store.put_opts(&path, new, 
PutMode::Update(version).into()).await {
+//!     match store.put_opts(&path, new.into(), 
PutMode::Update(version).into()).await {
 //!         Ok(_) => break, // Successfully committed
 //!         Err(Error::Precondition { .. }) => continue, // Object has 
changed, try again
 //!         Err(e) => panic!("{e}")
@@ -498,17 +539,18 @@ pub use tags::TagSet;
 
 pub mod multipart;
 mod parse;
+mod payload;
 mod upload;
 mod util;
 
 pub use parse::{parse_url, parse_url_opts};
+pub use payload::*;
 pub use upload::*;
-pub use util::GetRange;
+pub use util::{coalesce_ranges, collect_bytes, GetRange, 
OBJECT_STORE_COALESCE_DEFAULT};
 
 use crate::path::Path;
 #[cfg(not(target_arch = "wasm32"))]
 use crate::util::maybe_spawn_blocking;
-pub use crate::util::{coalesce_ranges, collect_bytes, 
OBJECT_STORE_COALESCE_DEFAULT};
 use async_trait::async_trait;
 use bytes::Bytes;
 use chrono::{DateTime, Utc};
@@ -532,14 +574,20 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + 
Debug + 'static {
     /// Save the provided bytes to the specified location
     ///
     /// The operation is guaranteed to be atomic, it will either successfully
-    /// write the entirety of `bytes` to `location`, or fail. No clients
+    /// write the entirety of `payload` to `location`, or fail. No clients
     /// should be able to observe a partially written object
-    async fn put(&self, location: &Path, bytes: Bytes) -> Result<PutResult> {
-        self.put_opts(location, bytes, PutOptions::default()).await
+    async fn put(&self, location: &Path, payload: PutPayload) -> 
Result<PutResult> {
+        self.put_opts(location, payload, PutOptions::default())
+            .await
     }
 
-    /// Save the provided bytes to the specified location with the given 
options
-    async fn put_opts(&self, location: &Path, bytes: Bytes, opts: PutOptions) 
-> Result<PutResult>;
+    /// Save the provided `payload` to `location` with the given options
+    async fn put_opts(
+        &self,
+        location: &Path,
+        payload: PutPayload,
+        opts: PutOptions,
+    ) -> Result<PutResult>;
 
     /// Perform a multipart upload
     ///
@@ -616,11 +664,10 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + 
Debug + 'static {
     /// # use object_store::{ObjectStore, ObjectMeta};
     /// # use object_store::path::Path;
     /// # use futures::{StreamExt, TryStreamExt};
-    /// # use bytes::Bytes;
     /// #
     /// // Create two objects
-    /// store.put(&Path::from("foo"), Bytes::from("foo")).await?;
-    /// store.put(&Path::from("bar"), Bytes::from("bar")).await?;
+    /// store.put(&Path::from("foo"), "foo".into()).await?;
+    /// store.put(&Path::from("bar"), "bar".into()).await?;
     ///
     /// // List object
     /// let locations = store.list(None).map_ok(|m| m.location).boxed();
@@ -717,17 +764,17 @@ macro_rules! as_ref_impl {
     ($type:ty) => {
         #[async_trait]
         impl ObjectStore for $type {
-            async fn put(&self, location: &Path, bytes: Bytes) -> 
Result<PutResult> {
-                self.as_ref().put(location, bytes).await
+            async fn put(&self, location: &Path, payload: PutPayload) -> 
Result<PutResult> {
+                self.as_ref().put(location, payload).await
             }
 
             async fn put_opts(
                 &self,
                 location: &Path,
-                bytes: Bytes,
+                payload: PutPayload,
                 opts: PutOptions,
             ) -> Result<PutResult> {
-                self.as_ref().put_opts(location, bytes, opts).await
+                self.as_ref().put_opts(location, payload, opts).await
             }
 
             async fn put_multipart(&self, location: &Path) -> Result<Box<dyn 
MultipartUpload>> {
@@ -1219,8 +1266,7 @@ mod tests {
         let location = Path::from("test_dir/test_file.json");
 
         let data = Bytes::from("arbitrary data");
-        let expected_data = data.clone();
-        storage.put(&location, data).await.unwrap();
+        storage.put(&location, data.clone().into()).await.unwrap();
 
         let root = Path::from("/");
 
@@ -1263,14 +1309,14 @@ mod tests {
         assert!(content_list.is_empty());
 
         let read_data = 
storage.get(&location).await.unwrap().bytes().await.unwrap();
-        assert_eq!(&*read_data, expected_data);
+        assert_eq!(&*read_data, data);
 
         // Test range request
         let range = 3..7;
         let range_result = storage.get_range(&location, range.clone()).await;
 
         let bytes = range_result.unwrap();
-        assert_eq!(bytes, expected_data.slice(range.clone()));
+        assert_eq!(bytes, data.slice(range.clone()));
 
         let opts = GetOptions {
             range: Some(GetRange::Bounded(2..5)),
@@ -1348,11 +1394,11 @@ mod tests {
         let ranges = vec![0..1, 2..3, 0..5];
         let bytes = storage.get_ranges(&location, &ranges).await.unwrap();
         for (range, bytes) in ranges.iter().zip(bytes) {
-            assert_eq!(bytes, expected_data.slice(range.clone()))
+            assert_eq!(bytes, data.slice(range.clone()))
         }
 
         let head = storage.head(&location).await.unwrap();
-        assert_eq!(head.size, expected_data.len());
+        assert_eq!(head.size, data.len());
 
         storage.delete(&location).await.unwrap();
 
@@ -1369,7 +1415,7 @@ mod tests {
 
         let file_with_delimiter = Path::from_iter(["a", "b/c", "foo.file"]);
         storage
-            .put(&file_with_delimiter, Bytes::from("arbitrary"))
+            .put(&file_with_delimiter, "arbitrary".into())
             .await
             .unwrap();
 
@@ -1409,10 +1455,7 @@ mod tests {
 
         let emoji_prefix = Path::from("🙀");
         let emoji_file = Path::from("🙀/😀.parquet");
-        storage
-            .put(&emoji_file, Bytes::from("arbitrary"))
-            .await
-            .unwrap();
+        storage.put(&emoji_file, "arbitrary".into()).await.unwrap();
 
         storage.head(&emoji_file).await.unwrap();
         storage
@@ -1464,7 +1507,7 @@ mod tests {
         let hello_prefix = Path::parse("%48%45%4C%4C%4F").unwrap();
         let path = hello_prefix.child("foo.parquet");
 
-        storage.put(&path, Bytes::from(vec![0, 1])).await.unwrap();
+        storage.put(&path, vec![0, 1].into()).await.unwrap();
         let files = flatten_list_stream(storage, Some(&hello_prefix))
             .await
             .unwrap();
@@ -1504,7 +1547,7 @@ mod tests {
 
         // Can also write non-percent encoded sequences
         let path = Path::parse("%Q.parquet").unwrap();
-        storage.put(&path, Bytes::from(vec![0, 1])).await.unwrap();
+        storage.put(&path, vec![0, 1].into()).await.unwrap();
 
         let files = flatten_list_stream(storage, None).await.unwrap();
         assert_eq!(files, vec![path.clone()]);
@@ -1512,7 +1555,7 @@ mod tests {
         storage.delete(&path).await.unwrap();
 
         let path = Path::parse("foo bar/I contain spaces.parquet").unwrap();
-        storage.put(&path, Bytes::from(vec![0, 1])).await.unwrap();
+        storage.put(&path, vec![0, 1].into()).await.unwrap();
         storage.head(&path).await.unwrap();
 
         let files = flatten_list_stream(storage, Some(&Path::from("foo bar")))
@@ -1622,7 +1665,7 @@ mod tests {
         delete_fixtures(storage).await;
 
         let path = Path::from("empty");
-        storage.put(&path, Bytes::new()).await.unwrap();
+        storage.put(&path, PutPayload::default()).await.unwrap();
         let meta = storage.head(&path).await.unwrap();
         assert_eq!(meta.size, 0);
         let data = storage.get(&path).await.unwrap().bytes().await.unwrap();
@@ -1879,7 +1922,7 @@ mod tests {
         let data = get_chunks(5 * 1024 * 1024, 3);
         let bytes_expected = data.concat();
         let mut upload = storage.put_multipart(&location).await.unwrap();
-        let uploads = data.into_iter().map(|x| upload.put_part(x));
+        let uploads = data.into_iter().map(|x| upload.put_part(x.into()));
         futures::future::try_join_all(uploads).await.unwrap();
 
         // Object should not yet exist in store
@@ -1928,7 +1971,7 @@ mod tests {
         // We can abort an in-progress write
         let mut upload = storage.put_multipart(&location).await.unwrap();
         upload
-            .put_part(data.first().unwrap().clone())
+            .put_part(data.first().unwrap().clone().into())
             .await
             .unwrap();
 
@@ -1953,7 +1996,7 @@ mod tests {
         let location1 = Path::from("foo/x.json");
         let location2 = Path::from("foo.bar/y.json");
 
-        let data = Bytes::from("arbitrary data");
+        let data = PutPayload::from("arbitrary data");
         storage.put(&location1, data.clone()).await.unwrap();
         storage.put(&location2, data).await.unwrap();
 
@@ -2011,8 +2054,7 @@ mod tests {
         .collect();
 
         for f in &files {
-            let data = data.clone();
-            storage.put(f, data).await.unwrap();
+            storage.put(f, data.clone().into()).await.unwrap();
         }
 
         // ==================== check: prefix-list `mydb/wb` (directory) 
====================
@@ -2076,15 +2118,15 @@ mod tests {
         let contents2 = Bytes::from("dogs");
 
         // copy() make both objects identical
-        storage.put(&path1, contents1.clone()).await.unwrap();
-        storage.put(&path2, contents2.clone()).await.unwrap();
+        storage.put(&path1, contents1.clone().into()).await.unwrap();
+        storage.put(&path2, contents2.clone().into()).await.unwrap();
         storage.copy(&path1, &path2).await.unwrap();
         let new_contents = 
storage.get(&path2).await.unwrap().bytes().await.unwrap();
         assert_eq!(&new_contents, &contents1);
 
         // rename() copies contents and deletes original
-        storage.put(&path1, contents1.clone()).await.unwrap();
-        storage.put(&path2, contents2.clone()).await.unwrap();
+        storage.put(&path1, contents1.clone().into()).await.unwrap();
+        storage.put(&path2, contents2.clone().into()).await.unwrap();
         storage.rename(&path1, &path2).await.unwrap();
         let new_contents = 
storage.get(&path2).await.unwrap().bytes().await.unwrap();
         assert_eq!(&new_contents, &contents1);
@@ -2104,8 +2146,8 @@ mod tests {
         let contents2 = Bytes::from("dogs");
 
         // copy_if_not_exists() errors if destination already exists
-        storage.put(&path1, contents1.clone()).await.unwrap();
-        storage.put(&path2, contents2.clone()).await.unwrap();
+        storage.put(&path1, contents1.clone().into()).await.unwrap();
+        storage.put(&path2, contents2.clone().into()).await.unwrap();
         let result = storage.copy_if_not_exists(&path1, &path2).await;
         assert!(result.is_err());
         assert!(matches!(
@@ -2133,7 +2175,7 @@ mod tests {
 
         // Create destination object
         let path2 = Path::from("test2");
-        storage.put(&path2, Bytes::from("hello")).await.unwrap();
+        storage.put(&path2, "hello".into()).await.unwrap();
 
         // copy() errors if source does not exist
         let result = storage.copy(&path1, &path2).await;
@@ -2164,7 +2206,7 @@ mod tests {
 
         let parts: Vec<_> = futures::stream::iter(chunks)
             .enumerate()
-            .map(|(idx, b)| multipart.put_part(&path, &id, idx, b))
+            .map(|(idx, b)| multipart.put_part(&path, &id, idx, b.into()))
             .buffered(2)
             .try_collect()
             .await
@@ -2204,7 +2246,7 @@ mod tests {
 
         let data = Bytes::from("hello world");
         let path = Path::from("file.txt");
-        integration.put(&path, data.clone()).await.unwrap();
+        integration.put(&path, data.clone().into()).await.unwrap();
 
         let signed = integration
             .signed_url(Method::GET, &path, Duration::from_secs(60))
diff --git a/object_store/src/limit.rs b/object_store/src/limit.rs
index e5f6841638e..b94aa05b8b6 100644
--- a/object_store/src/limit.rs
+++ b/object_store/src/limit.rs
@@ -19,7 +19,7 @@
 
 use crate::{
     BoxStream, GetOptions, GetResult, GetResultPayload, ListResult, 
MultipartUpload, ObjectMeta,
-    ObjectStore, Path, PutOptions, PutResult, Result, StreamExt, UploadPart,
+    ObjectStore, Path, PutOptions, PutPayload, PutResult, Result, StreamExt, 
UploadPart,
 };
 use async_trait::async_trait;
 use bytes::Bytes;
@@ -70,14 +70,19 @@ impl<T: ObjectStore> std::fmt::Display for LimitStore<T> {
 
 #[async_trait]
 impl<T: ObjectStore> ObjectStore for LimitStore<T> {
-    async fn put(&self, location: &Path, bytes: Bytes) -> Result<PutResult> {
+    async fn put(&self, location: &Path, payload: PutPayload) -> 
Result<PutResult> {
         let _permit = self.semaphore.acquire().await.unwrap();
-        self.inner.put(location, bytes).await
+        self.inner.put(location, payload).await
     }
 
-    async fn put_opts(&self, location: &Path, bytes: Bytes, opts: PutOptions) 
-> Result<PutResult> {
+    async fn put_opts(
+        &self,
+        location: &Path,
+        payload: PutPayload,
+        opts: PutOptions,
+    ) -> Result<PutResult> {
         let _permit = self.semaphore.acquire().await.unwrap();
-        self.inner.put_opts(location, bytes, opts).await
+        self.inner.put_opts(location, payload, opts).await
     }
     async fn put_multipart(&self, location: &Path) -> Result<Box<dyn 
MultipartUpload>> {
         let upload = self.inner.put_multipart(location).await?;
@@ -232,7 +237,7 @@ impl LimitUpload {
 
 #[async_trait]
 impl MultipartUpload for LimitUpload {
-    fn put_part(&mut self, data: Bytes) -> UploadPart {
+    fn put_part(&mut self, data: PutPayload) -> UploadPart {
         let upload = self.upload.put_part(data);
         let s = Arc::clone(&self.semaphore);
         Box::pin(async move {
diff --git a/object_store/src/local.rs b/object_store/src/local.rs
index 6cc0c672af4..0d7c279b319 100644
--- a/object_store/src/local.rs
+++ b/object_store/src/local.rs
@@ -39,7 +39,7 @@ use crate::{
     path::{absolute_path_to_url, Path},
     util::InvalidGetRange,
     GetOptions, GetResult, GetResultPayload, ListResult, MultipartUpload, 
ObjectMeta, ObjectStore,
-    PutMode, PutOptions, PutResult, Result, UploadPart,
+    PutMode, PutOptions, PutPayload, PutResult, Result, UploadPart,
 };
 
 /// A specialized `Error` for filesystem object store-related errors
@@ -336,7 +336,12 @@ fn is_valid_file_path(path: &Path) -> bool {
 
 #[async_trait]
 impl ObjectStore for LocalFileSystem {
-    async fn put_opts(&self, location: &Path, bytes: Bytes, opts: PutOptions) 
-> Result<PutResult> {
+    async fn put_opts(
+        &self,
+        location: &Path,
+        payload: PutPayload,
+        opts: PutOptions,
+    ) -> Result<PutResult> {
         if matches!(opts.mode, PutMode::Update(_)) {
             return Err(crate::Error::NotImplemented);
         }
@@ -346,7 +351,7 @@ impl ObjectStore for LocalFileSystem {
             let (mut file, staging_path) = new_staged_upload(&path)?;
             let mut e_tag = None;
 
-            let err = match file.write_all(&bytes) {
+            let err = match payload.iter().try_for_each(|x| file.write_all(x)) 
{
                 Ok(_) => {
                     let metadata = file.metadata().map_err(|e| Error::Metadata 
{
                         source: e.into(),
@@ -724,9 +729,9 @@ impl LocalUpload {
 
 #[async_trait]
 impl MultipartUpload for LocalUpload {
-    fn put_part(&mut self, data: Bytes) -> UploadPart {
+    fn put_part(&mut self, data: PutPayload) -> UploadPart {
         let offset = self.offset;
-        self.offset += data.len() as u64;
+        self.offset += data.content_length() as u64;
 
         let s = Arc::clone(&self.state);
         maybe_spawn_blocking(move || {
@@ -734,7 +739,11 @@ impl MultipartUpload for LocalUpload {
             let file = f.as_mut().context(AbortedSnafu)?;
             file.seek(SeekFrom::Start(offset))
                 .context(SeekSnafu { path: &s.dest })?;
-            file.write_all(&data).context(UnableToCopyDataToFileSnafu)?;
+
+            data.iter()
+                .try_for_each(|x| file.write_all(x))
+                .context(UnableToCopyDataToFileSnafu)?;
+
             Ok(())
         })
         .boxed()
@@ -1016,8 +1025,8 @@ mod tests {
             // Can't use stream_get test as WriteMultipart uses a tokio JoinSet
             let p = Path::from("manual_upload");
             let mut upload = integration.put_multipart(&p).await.unwrap();
-            upload.put_part(Bytes::from_static(b"123")).await.unwrap();
-            upload.put_part(Bytes::from_static(b"45678")).await.unwrap();
+            upload.put_part("123".into()).await.unwrap();
+            upload.put_part("45678".into()).await.unwrap();
             let r = upload.complete().await.unwrap();
 
             let get = integration.get(&p).await.unwrap();
@@ -1035,9 +1044,11 @@ mod tests {
         let location = Path::from("nested/file/test_file");
 
         let data = Bytes::from("arbitrary data");
-        let expected_data = data.clone();
 
-        integration.put(&location, data).await.unwrap();
+        integration
+            .put(&location, data.clone().into())
+            .await
+            .unwrap();
 
         let read_data = integration
             .get(&location)
@@ -1046,7 +1057,7 @@ mod tests {
             .bytes()
             .await
             .unwrap();
-        assert_eq!(&*read_data, expected_data);
+        assert_eq!(&*read_data, data);
     }
 
     #[tokio::test]
@@ -1057,9 +1068,11 @@ mod tests {
         let location = Path::from("some_file");
 
         let data = Bytes::from("arbitrary data");
-        let expected_data = data.clone();
 
-        integration.put(&location, data).await.unwrap();
+        integration
+            .put(&location, data.clone().into())
+            .await
+            .unwrap();
 
         let read_data = integration
             .get(&location)
@@ -1068,7 +1081,7 @@ mod tests {
             .bytes()
             .await
             .unwrap();
-        assert_eq!(&*read_data, expected_data);
+        assert_eq!(&*read_data, data);
     }
 
     #[tokio::test]
@@ -1260,7 +1273,7 @@ mod tests {
 
         // Adding a file through a symlink creates in both paths
         integration
-            .put(&Path::from("b/file.parquet"), Bytes::from(vec![0, 1, 2]))
+            .put(&Path::from("b/file.parquet"), vec![0, 1, 2].into())
             .await
             .unwrap();
 
@@ -1279,7 +1292,7 @@ mod tests {
         let directory = Path::from("directory");
         let object = directory.child("child.txt");
         let data = Bytes::from("arbitrary");
-        integration.put(&object, data.clone()).await.unwrap();
+        integration.put(&object, data.clone().into()).await.unwrap();
         integration.head(&object).await.unwrap();
         let result = integration.get(&object).await.unwrap();
         assert_eq!(result.bytes().await.unwrap(), data);
@@ -1319,7 +1332,7 @@ mod tests {
         let integration = 
LocalFileSystem::new_with_prefix(root.path()).unwrap();
         let location = Path::from("some_file");
 
-        let data = Bytes::from("arbitrary data");
+        let data = PutPayload::from("arbitrary data");
         let mut u1 = integration.put_multipart(&location).await.unwrap();
         u1.put_part(data.clone()).await.unwrap();
 
@@ -1418,12 +1431,10 @@ mod tests {
 #[cfg(test)]
 mod not_wasm_tests {
     use std::time::Duration;
-
-    use bytes::Bytes;
     use tempfile::TempDir;
 
     use crate::local::LocalFileSystem;
-    use crate::{ObjectStore, Path};
+    use crate::{ObjectStore, Path, PutPayload};
 
     #[tokio::test]
     async fn test_cleanup_intermediate_files() {
@@ -1431,7 +1442,7 @@ mod not_wasm_tests {
         let integration = 
LocalFileSystem::new_with_prefix(root.path()).unwrap();
 
         let location = Path::from("some_file");
-        let data = Bytes::from_static(b"hello");
+        let data = PutPayload::from_static(b"hello");
         let mut upload = integration.put_multipart(&location).await.unwrap();
         upload.put_part(data).await.unwrap();
 
diff --git a/object_store/src/memory.rs b/object_store/src/memory.rs
index 6c960d4f24f..d42e6f231c0 100644
--- a/object_store/src/memory.rs
+++ b/object_store/src/memory.rs
@@ -29,11 +29,11 @@ use snafu::{OptionExt, ResultExt, Snafu};
 
 use crate::multipart::{MultipartStore, PartId};
 use crate::util::InvalidGetRange;
-use crate::GetOptions;
 use crate::{
     path::Path, GetRange, GetResult, GetResultPayload, ListResult, 
MultipartId, MultipartUpload,
     ObjectMeta, ObjectStore, PutMode, PutOptions, PutResult, Result, 
UpdateVersion, UploadPart,
 };
+use crate::{GetOptions, PutPayload};
 
 /// A specialized `Error` for in-memory object store-related errors
 #[derive(Debug, Snafu)]
@@ -192,10 +192,15 @@ impl std::fmt::Display for InMemory {
 
 #[async_trait]
 impl ObjectStore for InMemory {
-    async fn put_opts(&self, location: &Path, bytes: Bytes, opts: PutOptions) 
-> Result<PutResult> {
+    async fn put_opts(
+        &self,
+        location: &Path,
+        payload: PutPayload,
+        opts: PutOptions,
+    ) -> Result<PutResult> {
         let mut storage = self.storage.write();
         let etag = storage.next_etag;
-        let entry = Entry::new(bytes, Utc::now(), etag);
+        let entry = Entry::new(payload.into(), Utc::now(), etag);
 
         match opts.mode {
             PutMode::Overwrite => storage.overwrite(location, entry),
@@ -391,14 +396,14 @@ impl MultipartStore for InMemory {
         _path: &Path,
         id: &MultipartId,
         part_idx: usize,
-        data: Bytes,
+        payload: PutPayload,
     ) -> Result<PartId> {
         let mut storage = self.storage.write();
         let upload = storage.upload_mut(id)?;
         if part_idx <= upload.parts.len() {
             upload.parts.resize(part_idx + 1, None);
         }
-        upload.parts[part_idx] = Some(data);
+        upload.parts[part_idx] = Some(payload.into());
         Ok(PartId {
             content_id: Default::default(),
         })
@@ -471,21 +476,22 @@ impl InMemory {
 #[derive(Debug)]
 struct InMemoryUpload {
     location: Path,
-    parts: Vec<Bytes>,
+    parts: Vec<PutPayload>,
     storage: Arc<RwLock<Storage>>,
 }
 
 #[async_trait]
 impl MultipartUpload for InMemoryUpload {
-    fn put_part(&mut self, data: Bytes) -> UploadPart {
-        self.parts.push(data);
+    fn put_part(&mut self, payload: PutPayload) -> UploadPart {
+        self.parts.push(payload);
         Box::pin(futures::future::ready(Ok(())))
     }
 
     async fn complete(&mut self) -> Result<PutResult> {
-        let cap = self.parts.iter().map(|x| x.len()).sum();
+        let cap = self.parts.iter().map(|x| x.content_length()).sum();
         let mut buf = Vec::with_capacity(cap);
-        self.parts.iter().for_each(|x| buf.extend_from_slice(x));
+        let parts = self.parts.iter().flatten();
+        parts.for_each(|x| buf.extend_from_slice(x));
         let etag = self.storage.write().insert(&self.location, buf.into());
         Ok(PutResult {
             e_tag: Some(etag.to_string()),
@@ -552,9 +558,11 @@ mod tests {
         let location = Path::from("some_file");
 
         let data = Bytes::from("arbitrary data");
-        let expected_data = data.clone();
 
-        integration.put(&location, data).await.unwrap();
+        integration
+            .put(&location, data.clone().into())
+            .await
+            .unwrap();
 
         let read_data = integration
             .get(&location)
@@ -563,7 +571,7 @@ mod tests {
             .bytes()
             .await
             .unwrap();
-        assert_eq!(&*read_data, expected_data);
+        assert_eq!(&*read_data, data);
     }
 
     const NON_EXISTENT_NAME: &str = "nonexistentname";
diff --git a/object_store/src/multipart.rs b/object_store/src/multipart.rs
index 26cce393624..d94e7f15051 100644
--- a/object_store/src/multipart.rs
+++ b/object_store/src/multipart.rs
@@ -22,10 +22,9 @@
 //! especially useful when dealing with large files or high-throughput systems.
 
 use async_trait::async_trait;
-use bytes::Bytes;
 
 use crate::path::Path;
-use crate::{MultipartId, PutResult, Result};
+use crate::{MultipartId, PutPayload, PutResult, Result};
 
 /// Represents a part of a file that has been successfully uploaded in a 
multipart upload process.
 #[derive(Debug, Clone)]
@@ -64,7 +63,7 @@ pub trait MultipartStore: Send + Sync + 'static {
         path: &Path,
         id: &MultipartId,
         part_idx: usize,
-        data: Bytes,
+        data: PutPayload,
     ) -> Result<PartId>;
 
     /// Completes a multipart upload
diff --git a/object_store/src/payload.rs b/object_store/src/payload.rs
new file mode 100644
index 00000000000..486bea3ea91
--- /dev/null
+++ b/object_store/src/payload.rs
@@ -0,0 +1,314 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use bytes::Bytes;
+use std::sync::Arc;
+
+/// A cheaply cloneable, ordered collection of [`Bytes`]
+#[derive(Debug, Clone)]
+pub struct PutPayload(Arc<[Bytes]>);
+
+impl Default for PutPayload {
+    fn default() -> Self {
+        Self(Arc::new([]))
+    }
+}
+
+impl PutPayload {
+    /// Create a new empty [`PutPayload`]
+    pub fn new() -> Self {
+        Self::default()
+    }
+
+    /// Creates a [`PutPayload`] from a static slice
+    pub fn from_static(s: &'static [u8]) -> Self {
+        s.into()
+    }
+
+    /// Creates a [`PutPayload`] from a [`Bytes`]
+    pub fn from_bytes(s: Bytes) -> Self {
+        s.into()
+    }
+
+    #[cfg(feature = "cloud")]
+    pub(crate) fn body(&self) -> reqwest::Body {
+        reqwest::Body::wrap_stream(futures::stream::iter(
+            self.clone().into_iter().map(Ok::<_, crate::Error>),
+        ))
+    }
+
+    /// Returns the total length of the [`Bytes`] in this payload
+    pub fn content_length(&self) -> usize {
+        self.0.iter().map(|b| b.len()).sum()
+    }
+
+    /// Returns an iterator over the [`Bytes`] in this payload
+    pub fn iter(&self) -> PutPayloadIter<'_> {
+        PutPayloadIter(self.0.iter())
+    }
+}
+
+impl AsRef<[Bytes]> for PutPayload {
+    fn as_ref(&self) -> &[Bytes] {
+        self.0.as_ref()
+    }
+}
+
+impl<'a> IntoIterator for &'a PutPayload {
+    type Item = &'a Bytes;
+    type IntoIter = PutPayloadIter<'a>;
+
+    fn into_iter(self) -> Self::IntoIter {
+        self.iter()
+    }
+}
+
+impl IntoIterator for PutPayload {
+    type Item = Bytes;
+    type IntoIter = PutPayloadIntoIter;
+
+    fn into_iter(self) -> Self::IntoIter {
+        PutPayloadIntoIter {
+            payload: self,
+            idx: 0,
+        }
+    }
+}
+
+/// An iterator over [`PutPayload`]
+#[derive(Debug)]
+pub struct PutPayloadIter<'a>(std::slice::Iter<'a, Bytes>);
+
+impl<'a> Iterator for PutPayloadIter<'a> {
+    type Item = &'a Bytes;
+
+    fn next(&mut self) -> Option<Self::Item> {
+        self.0.next()
+    }
+
+    fn size_hint(&self) -> (usize, Option<usize>) {
+        self.0.size_hint()
+    }
+}
+
+/// An owning iterator of [`PutPayload`]
+#[derive(Debug)]
+pub struct PutPayloadIntoIter {
+    payload: PutPayload,
+    idx: usize,
+}
+
+impl Iterator for PutPayloadIntoIter {
+    type Item = Bytes;
+
+    fn next(&mut self) -> Option<Self::Item> {
+        let p = self.payload.0.get(self.idx)?.clone();
+        self.idx += 1;
+        Some(p)
+    }
+
+    fn size_hint(&self) -> (usize, Option<usize>) {
+        let l = self.payload.0.len() - self.idx;
+        (l, Some(l))
+    }
+}
+
+impl From<Bytes> for PutPayload {
+    fn from(value: Bytes) -> Self {
+        Self(Arc::new([value]))
+    }
+}
+
+impl From<Vec<u8>> for PutPayload {
+    fn from(value: Vec<u8>) -> Self {
+        Self(Arc::new([value.into()]))
+    }
+}
+
+impl From<&'static str> for PutPayload {
+    fn from(value: &'static str) -> Self {
+        Bytes::from(value).into()
+    }
+}
+
+impl From<&'static [u8]> for PutPayload {
+    fn from(value: &'static [u8]) -> Self {
+        Bytes::from(value).into()
+    }
+}
+
+impl From<String> for PutPayload {
+    fn from(value: String) -> Self {
+        Bytes::from(value).into()
+    }
+}
+
+impl FromIterator<u8> for PutPayload {
+    fn from_iter<T: IntoIterator<Item = u8>>(iter: T) -> Self {
+        Bytes::from_iter(iter).into()
+    }
+}
+
+impl FromIterator<Bytes> for PutPayload {
+    fn from_iter<T: IntoIterator<Item = Bytes>>(iter: T) -> Self {
+        Self(iter.into_iter().collect())
+    }
+}
+
+impl From<PutPayload> for Bytes {
+    fn from(value: PutPayload) -> Self {
+        match value.0.len() {
+            0 => Self::new(),
+            1 => value.0[0].clone(),
+            _ => {
+                let mut buf = Vec::with_capacity(value.content_length());
+                value.iter().for_each(|x| buf.extend_from_slice(x));
+                buf.into()
+            }
+        }
+    }
+}
+
+/// A builder for [`PutPayload`] that avoids reallocating memory
+///
+/// Data is allocated in fixed blocks, which are flushed to [`Bytes`] once 
full.
+/// Unlike [`Vec`] this avoids needing to repeatedly reallocate blocks of 
memory,
+/// which typically involves copying all the previously written data to a new
+/// contiguous memory region.
+#[derive(Debug)]
+pub struct PutPayloadMut {
+    len: usize,
+    completed: Vec<Bytes>,
+    in_progress: Vec<u8>,
+    block_size: usize,
+}
+
+impl Default for PutPayloadMut {
+    fn default() -> Self {
+        Self {
+            len: 0,
+            completed: vec![],
+            in_progress: vec![],
+
+            block_size: 8 * 1024,
+        }
+    }
+}
+
+impl PutPayloadMut {
+    /// Create a new [`PutPayloadMut`]
+    pub fn new() -> Self {
+        Self::default()
+    }
+
+    /// Configures the minimum allocation size
+    ///
+    /// Defaults to 8KB
+    pub fn with_block_size(self, block_size: usize) -> Self {
+        Self { block_size, ..self }
+    }
+
+    /// Write bytes into this [`PutPayloadMut`]
+    ///
+    /// If there is an in-progress block, data will be first written to it, 
flushing
+    /// it to [`Bytes`] once full. If data remains to be written, a new block 
of memory
+    /// of at least the configured block size will be allocated, to hold the 
remaining data.
+    pub fn extend_from_slice(&mut self, slice: &[u8]) {
+        let remaining = self.in_progress.capacity() - self.in_progress.len();
+        let to_copy = remaining.min(slice.len());
+
+        self.in_progress.extend_from_slice(&slice[..to_copy]);
+        if self.in_progress.capacity() == self.in_progress.len() {
+            let new_cap = self.block_size.max(slice.len() - to_copy);
+            let completed = std::mem::replace(&mut self.in_progress, 
Vec::with_capacity(new_cap));
+            if !completed.is_empty() {
+                self.completed.push(completed.into())
+            }
+            self.in_progress.extend_from_slice(&slice[to_copy..])
+        }
+        self.len += slice.len();
+    }
+
+    /// Append a [`Bytes`] to this [`PutPayloadMut`] without copying
+    ///
+    /// This will close any currently buffered block populated by 
[`Self::extend_from_slice`],
+    /// and append `bytes` to this payload without copying.
+    pub fn push(&mut self, bytes: Bytes) {
+        if !self.in_progress.is_empty() {
+            let completed = std::mem::take(&mut self.in_progress);
+            self.completed.push(completed.into())
+        }
+        self.completed.push(bytes)
+    }
+
+    /// Returns `true` if this [`PutPayloadMut`] contains no bytes
+    #[inline]
+    pub fn is_empty(&self) -> bool {
+        self.len == 0
+    }
+
+    /// Returns the total length of the [`Bytes`] in this payload
+    #[inline]
+    pub fn content_length(&self) -> usize {
+        self.len
+    }
+
+    /// Convert into [`PutPayload`]
+    pub fn freeze(mut self) -> PutPayload {
+        if !self.in_progress.is_empty() {
+            let completed = std::mem::take(&mut self.in_progress).into();
+            self.completed.push(completed);
+        }
+        PutPayload(self.completed.into())
+    }
+}
+
+impl From<PutPayloadMut> for PutPayload {
+    fn from(value: PutPayloadMut) -> Self {
+        value.freeze()
+    }
+}
+
+#[cfg(test)]
+mod test {
+    use crate::PutPayloadMut;
+
+    #[test]
+    fn test_put_payload() {
+        let mut chunk = PutPayloadMut::new().with_block_size(23);
+        chunk.extend_from_slice(&[1; 16]);
+        chunk.extend_from_slice(&[2; 32]);
+        chunk.extend_from_slice(&[2; 5]);
+        chunk.extend_from_slice(&[2; 21]);
+        chunk.extend_from_slice(&[2; 40]);
+        chunk.extend_from_slice(&[0; 0]);
+        chunk.push("foobar".into());
+
+        let payload = chunk.freeze();
+        assert_eq!(payload.content_length(), 120);
+
+        let chunks = payload.as_ref();
+        assert_eq!(chunks.len(), 6);
+
+        assert_eq!(chunks[0].len(), 23);
+        assert_eq!(chunks[1].len(), 25); // 32 - (23 - 16)
+        assert_eq!(chunks[2].len(), 23);
+        assert_eq!(chunks[3].len(), 23);
+        assert_eq!(chunks[4].len(), 20);
+        assert_eq!(chunks[5].len(), 6);
+    }
+}
diff --git a/object_store/src/prefix.rs b/object_store/src/prefix.rs
index 053f71a2d06..1d1ffeed8c6 100644
--- a/object_store/src/prefix.rs
+++ b/object_store/src/prefix.rs
@@ -23,7 +23,7 @@ use std::ops::Range;
 use crate::path::Path;
 use crate::{
     GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, 
ObjectStore, PutOptions,
-    PutResult, Result,
+    PutPayload, PutResult, Result,
 };
 
 #[doc(hidden)]
@@ -80,14 +80,19 @@ impl<T: ObjectStore> PrefixStore<T> {
 
 #[async_trait::async_trait]
 impl<T: ObjectStore> ObjectStore for PrefixStore<T> {
-    async fn put(&self, location: &Path, bytes: Bytes) -> Result<PutResult> {
+    async fn put(&self, location: &Path, payload: PutPayload) -> 
Result<PutResult> {
         let full_path = self.full_path(location);
-        self.inner.put(&full_path, bytes).await
+        self.inner.put(&full_path, payload).await
     }
 
-    async fn put_opts(&self, location: &Path, bytes: Bytes, opts: PutOptions) 
-> Result<PutResult> {
+    async fn put_opts(
+        &self,
+        location: &Path,
+        payload: PutPayload,
+        opts: PutOptions,
+    ) -> Result<PutResult> {
         let full_path = self.full_path(location);
-        self.inner.put_opts(&full_path, bytes, opts).await
+        self.inner.put_opts(&full_path, payload, opts).await
     }
 
     async fn put_multipart(&self, location: &Path) -> Result<Box<dyn 
MultipartUpload>> {
@@ -218,9 +223,8 @@ mod tests {
 
         let location = Path::from("prefix/test_file.json");
         let data = Bytes::from("arbitrary data");
-        let expected_data = data.clone();
 
-        local.put(&location, data).await.unwrap();
+        local.put(&location, data.clone().into()).await.unwrap();
 
         let prefix = PrefixStore::new(local, "prefix");
         let location_prefix = Path::from("test_file.json");
@@ -239,11 +243,11 @@ mod tests {
             .bytes()
             .await
             .unwrap();
-        assert_eq!(&*read_data, expected_data);
+        assert_eq!(&*read_data, data);
 
         let target_prefix = Path::from("/test_written.json");
         prefix
-            .put(&target_prefix, expected_data.clone())
+            .put(&target_prefix, data.clone().into())
             .await
             .unwrap();
 
@@ -256,6 +260,6 @@ mod tests {
 
         let location = Path::from("prefix/test_written.json");
         let read_data = 
local.get(&location).await.unwrap().bytes().await.unwrap();
-        assert_eq!(&*read_data, expected_data)
+        assert_eq!(&*read_data, data)
     }
 }
diff --git a/object_store/src/throttle.rs b/object_store/src/throttle.rs
index 65fac5922f6..d089784668e 100644
--- a/object_store/src/throttle.rs
+++ b/object_store/src/throttle.rs
@@ -23,7 +23,7 @@ use std::{convert::TryInto, sync::Arc};
 use crate::multipart::{MultipartStore, PartId};
 use crate::{
     path::Path, GetResult, GetResultPayload, ListResult, MultipartId, 
MultipartUpload, ObjectMeta,
-    ObjectStore, PutOptions, PutResult, Result,
+    ObjectStore, PutOptions, PutPayload, PutResult, Result,
 };
 use crate::{GetOptions, UploadPart};
 use async_trait::async_trait;
@@ -148,14 +148,19 @@ impl<T: ObjectStore> std::fmt::Display for 
ThrottledStore<T> {
 
 #[async_trait]
 impl<T: ObjectStore> ObjectStore for ThrottledStore<T> {
-    async fn put(&self, location: &Path, bytes: Bytes) -> Result<PutResult> {
+    async fn put(&self, location: &Path, payload: PutPayload) -> 
Result<PutResult> {
         sleep(self.config().wait_put_per_call).await;
-        self.inner.put(location, bytes).await
+        self.inner.put(location, payload).await
     }
 
-    async fn put_opts(&self, location: &Path, bytes: Bytes, opts: PutOptions) 
-> Result<PutResult> {
+    async fn put_opts(
+        &self,
+        location: &Path,
+        payload: PutPayload,
+        opts: PutOptions,
+    ) -> Result<PutResult> {
         sleep(self.config().wait_put_per_call).await;
-        self.inner.put_opts(location, bytes, opts).await
+        self.inner.put_opts(location, payload, opts).await
     }
 
     async fn put_multipart(&self, location: &Path) -> Result<Box<dyn 
MultipartUpload>> {
@@ -332,7 +337,7 @@ impl<T: MultipartStore> MultipartStore for 
ThrottledStore<T> {
         path: &Path,
         id: &MultipartId,
         part_idx: usize,
-        data: Bytes,
+        data: PutPayload,
     ) -> Result<PartId> {
         sleep(self.config().wait_put_per_call).await;
         self.inner.put_part(path, id, part_idx, data).await
@@ -360,7 +365,7 @@ struct ThrottledUpload {
 
 #[async_trait]
 impl MultipartUpload for ThrottledUpload {
-    fn put_part(&mut self, data: Bytes) -> UploadPart {
+    fn put_part(&mut self, data: PutPayload) -> UploadPart {
         let duration = self.sleep;
         let put = self.upload.put_part(data);
         Box::pin(async move {
@@ -382,7 +387,6 @@ impl MultipartUpload for ThrottledUpload {
 mod tests {
     use super::*;
     use crate::{memory::InMemory, tests::*, GetResultPayload};
-    use bytes::Bytes;
     use futures::TryStreamExt;
     use tokio::time::Duration;
     use tokio::time::Instant;
@@ -536,8 +540,7 @@ mod tests {
 
         if let Some(n_bytes) = n_bytes {
             let data: Vec<_> = std::iter::repeat(1u8).take(n_bytes).collect();
-            let bytes = Bytes::from(data);
-            store.put(&path, bytes).await.unwrap();
+            store.put(&path, data.into()).await.unwrap();
         } else {
             // ensure object is absent
             store.delete(&path).await.unwrap();
@@ -560,9 +563,7 @@ mod tests {
         // create new entries
         for i in 0..n_entries {
             let path = prefix.child(i.to_string().as_str());
-
-            let data = Bytes::from("bar");
-            store.put(&path, data).await.unwrap();
+            store.put(&path, "bar".into()).await.unwrap();
         }
 
         prefix
@@ -630,10 +631,9 @@ mod tests {
 
     async fn measure_put(store: &ThrottledStore<InMemory>, n_bytes: usize) -> 
Duration {
         let data: Vec<_> = std::iter::repeat(1u8).take(n_bytes).collect();
-        let bytes = Bytes::from(data);
 
         let t0 = Instant::now();
-        store.put(&Path::from("foo"), bytes).await.unwrap();
+        store.put(&Path::from("foo"), data.into()).await.unwrap();
 
         t0.elapsed()
     }
diff --git a/object_store/src/upload.rs b/object_store/src/upload.rs
index fe864e2821c..9805df0dda7 100644
--- a/object_store/src/upload.rs
+++ b/object_store/src/upload.rs
@@ -17,14 +17,13 @@
 
 use std::task::{Context, Poll};
 
+use crate::{PutPayload, PutPayloadMut, PutResult, Result};
 use async_trait::async_trait;
 use bytes::Bytes;
 use futures::future::BoxFuture;
 use futures::ready;
 use tokio::task::JoinSet;
 
-use crate::{PutResult, Result};
-
 /// An upload part request
 pub type UploadPart = BoxFuture<'static, Result<()>>;
 
@@ -65,7 +64,7 @@ pub trait MultipartUpload: Send + std::fmt::Debug {
     /// ```
     ///
     /// [R2]: 
https://developers.cloudflare.com/r2/objects/multipart-objects/#limitations
-    fn put_part(&mut self, data: Bytes) -> UploadPart;
+    fn put_part(&mut self, data: PutPayload) -> UploadPart;
 
     /// Complete the multipart upload
     ///
@@ -106,7 +105,9 @@ pub trait MultipartUpload: Send + std::fmt::Debug {
 pub struct WriteMultipart {
     upload: Box<dyn MultipartUpload>,
 
-    buffer: Vec<u8>,
+    buffer: PutPayloadMut,
+
+    chunk_size: usize,
 
     tasks: JoinSet<Result<()>>,
 }
@@ -121,7 +122,8 @@ impl WriteMultipart {
     pub fn new_with_chunk_size(upload: Box<dyn MultipartUpload>, chunk_size: 
usize) -> Self {
         Self {
             upload,
-            buffer: Vec::with_capacity(chunk_size),
+            chunk_size,
+            buffer: PutPayloadMut::new(),
             tasks: Default::default(),
         }
     }
@@ -149,6 +151,9 @@ impl WriteMultipart {
 
     /// Write data to this [`WriteMultipart`]
     ///
+    /// Data is buffered using [`PutPayloadMut::extend_from_slice`]. 
Implementations looking to
+    /// write data from owned buffers may prefer [`Self::put`] as this avoids 
copying.
+    ///
     /// Note this method is synchronous (not `async`) and will immediately
     /// start new uploads as soon as the internal `chunk_size` is hit,
     /// regardless of how many outstanding uploads are already in progress.
@@ -157,19 +162,38 @@ impl WriteMultipart {
     /// [`Self::wait_for_capacity`] prior to calling this method
     pub fn write(&mut self, mut buf: &[u8]) {
         while !buf.is_empty() {
-            let capacity = self.buffer.capacity();
-            let remaining = capacity - self.buffer.len();
+            let remaining = self.chunk_size - self.buffer.content_length();
             let to_read = buf.len().min(remaining);
             self.buffer.extend_from_slice(&buf[..to_read]);
             if to_read == remaining {
-                let part = std::mem::replace(&mut self.buffer, 
Vec::with_capacity(capacity));
-                self.put_part(part.into())
+                let buffer = std::mem::take(&mut self.buffer);
+                self.put_part(buffer.into())
             }
             buf = &buf[to_read..]
         }
     }
 
-    fn put_part(&mut self, part: Bytes) {
+    /// Put a chunk of data into this [`WriteMultipart`] without copying
+    ///
+    /// Data is buffered using [`PutPayloadMut::push`]. Implementations 
looking to
+    /// perform writes from non-owned buffers should prefer [`Self::write`] as 
this
+    /// will allow multiple calls to share the same underlying allocation.
+    ///
+    /// See [`Self::write`] for information on backpressure
+    pub fn put(&mut self, mut bytes: Bytes) {
+        while !bytes.is_empty() {
+            let remaining = self.chunk_size - self.buffer.content_length();
+            if bytes.len() < remaining {
+                self.buffer.push(bytes);
+                return;
+            }
+            self.buffer.push(bytes.split_to(remaining));
+            let buffer = std::mem::take(&mut self.buffer);
+            self.put_part(buffer.into())
+        }
+    }
+
+    pub(crate) fn put_part(&mut self, part: PutPayload) {
         self.tasks.spawn(self.upload.put_part(part));
     }
 
diff --git a/object_store/tests/get_range_file.rs 
b/object_store/tests/get_range_file.rs
index 309a86d8fe9..59c59340045 100644
--- a/object_store/tests/get_range_file.rs
+++ b/object_store/tests/get_range_file.rs
@@ -37,8 +37,13 @@ impl std::fmt::Display for MyStore {
 
 #[async_trait]
 impl ObjectStore for MyStore {
-    async fn put_opts(&self, path: &Path, data: Bytes, opts: PutOptions) -> 
Result<PutResult> {
-        self.0.put_opts(path, data, opts).await
+    async fn put_opts(
+        &self,
+        location: &Path,
+        payload: PutPayload,
+        opts: PutOptions,
+    ) -> Result<PutResult> {
+        self.0.put_opts(location, payload, opts).await
     }
 
     async fn put_multipart(&self, _location: &Path) -> Result<Box<dyn 
MultipartUpload>> {
@@ -77,7 +82,7 @@ async fn test_get_range() {
     let path = Path::from("foo");
 
     let expected = Bytes::from_static(b"hello world");
-    store.put(&path, expected.clone()).await.unwrap();
+    store.put(&path, expected.clone().into()).await.unwrap();
     let fetched = store.get(&path).await.unwrap().bytes().await.unwrap();
     assert_eq!(expected, fetched);
 
@@ -101,7 +106,7 @@ async fn test_get_opts_over_range() {
     let path = Path::from("foo");
 
     let expected = Bytes::from_static(b"hello world");
-    store.put(&path, expected.clone()).await.unwrap();
+    store.put(&path, expected.clone().into()).await.unwrap();
 
     let opts = GetOptions {
         range: Some(GetRange::Bounded(0..(expected.len() * 2))),

Reply via email to