This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 96c4c0b1687 Replace AsyncWrite with Upload trait and rename 
MultiPartStore to MultipartStore (#5458) (#5500)
96c4c0b1687 is described below

commit 96c4c0b1687867a4809ed4d3391982d4f5f7273e
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Wed Mar 20 12:26:54 2024 +1300

    Replace AsyncWrite with Upload trait and rename MultiPartStore to 
MultipartStore (#5458) (#5500)
    
    * Replace AsyncWrite with Upload trait (#5458)
    
    * Make BufWriter abortable
    
    * Flesh out cloud implementations
    
    * Review feedback
    
    * Misc tweaks and fixes
    
    * Format
    
    * Replace multi-part with multipart
    
    * More docs
    
    * Clippy
    
    * Rename to MultipartUpload
    
    * Rename ChunkedUpload to WriteMultipart
    
    * Doc tweaks
    
    * Apply suggestions from code review
    
    Co-authored-by: Andrew Lamb <[email protected]>
    
    * Docs
    
    * Format
    
    ---------
    
    Co-authored-by: Andrew Lamb <[email protected]>
---
 object_store/src/aws/mod.rs          | 104 +++++-----
 object_store/src/azure/mod.rs        |  80 ++++----
 object_store/src/buffered.rs         |  84 ++++----
 object_store/src/chunked.rs          |  16 +-
 object_store/src/client/mod.rs       |   3 +
 object_store/src/client/parts.rs     |  48 +++++
 object_store/src/gcp/client.rs       |   2 +-
 object_store/src/gcp/mod.rs          | 106 ++++++-----
 object_store/src/http/mod.rs         |  14 +-
 object_store/src/lib.rs              | 127 ++++---------
 object_store/src/limit.rs            |  79 ++++----
 object_store/src/local.rs            | 358 ++++++++++++-----------------------
 object_store/src/memory.rs           |  95 ++++------
 object_store/src/multipart.rs        | 243 +-----------------------
 object_store/src/prefix.rs           |  14 +-
 object_store/src/throttle.rs         |  16 +-
 object_store/src/upload.rs           | 175 +++++++++++++++++
 object_store/tests/get_range_file.rs |  10 +-
 18 files changed, 691 insertions(+), 883 deletions(-)

diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs
index b11f4513b6d..b33771de9a8 100644
--- a/object_store/src/aws/mod.rs
+++ b/object_store/src/aws/mod.rs
@@ -17,17 +17,14 @@
 
 //! An object store implementation for S3
 //!
-//! ## Multi-part uploads
+//! ## Multipart uploads
 //!
-//! Multi-part uploads can be initiated with the [ObjectStore::put_multipart] 
method.
-//! Data passed to the writer is automatically buffered to meet the minimum 
size
-//! requirements for a part. Multiple parts are uploaded concurrently.
+//! Multipart uploads can be initiated with the [ObjectStore::put_multipart] 
method.
 //!
 //! If the writer fails for any reason, you may have parts uploaded to AWS but 
not
-//! used that you may be charged for. Use the [ObjectStore::abort_multipart] 
method
-//! to abort the upload and drop those unneeded parts. In addition, you may 
wish to
-//! consider implementing [automatic cleanup] of unused parts that are older 
than one
-//! week.
+//! used that you will be charged for. [`MultipartUpload::abort`] may be 
invoked to drop
+//! these unneeded parts, however, it is recommended that you consider 
implementing
+//! [automatic cleanup] of unused parts that are older than some threshold.
 //!
 //! [automatic cleanup]: 
https://aws.amazon.com/blogs/aws/s3-lifecycle-management-update-support-for-multipart-uploads-and-delete-markers/
 
@@ -38,18 +35,17 @@ use futures::{StreamExt, TryStreamExt};
 use reqwest::header::{HeaderName, IF_MATCH, IF_NONE_MATCH};
 use reqwest::{Method, StatusCode};
 use std::{sync::Arc, time::Duration};
-use tokio::io::AsyncWrite;
 use url::Url;
 
 use crate::aws::client::{RequestError, S3Client};
 use crate::client::get::GetClientExt;
 use crate::client::list::ListClientExt;
 use crate::client::CredentialProvider;
-use crate::multipart::{MultiPartStore, PartId, PutPart, WriteMultiPart};
+use crate::multipart::{MultipartStore, PartId};
 use crate::signer::Signer;
 use crate::{
-    Error, GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, 
ObjectStore, Path, PutMode,
-    PutOptions, PutResult, Result,
+    Error, GetOptions, GetResult, ListResult, MultipartId, MultipartUpload, 
ObjectMeta,
+    ObjectStore, Path, PutMode, PutOptions, PutResult, Result, UploadPart,
 };
 
 static TAGS_HEADER: HeaderName = HeaderName::from_static("x-amz-tagging");
@@ -85,6 +81,7 @@ const STORE: &str = "S3";
 
 /// [`CredentialProvider`] for [`AmazonS3`]
 pub type AwsCredentialProvider = Arc<dyn CredentialProvider<Credential = 
AwsCredential>>;
+use crate::client::parts::Parts;
 pub use credential::{AwsAuthorizer, AwsCredential};
 
 /// Interface for [Amazon S3](https://aws.amazon.com/s3/).
@@ -211,25 +208,18 @@ impl ObjectStore for AmazonS3 {
         }
     }
 
-    async fn put_multipart(
-        &self,
-        location: &Path,
-    ) -> Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)> {
-        let id = self.client.create_multipart(location).await?;
-
-        let upload = S3MultiPartUpload {
-            location: location.clone(),
-            upload_id: id.clone(),
-            client: Arc::clone(&self.client),
-        };
-
-        Ok((id, Box::new(WriteMultiPart::new(upload, 8))))
-    }
-
-    async fn abort_multipart(&self, location: &Path, multipart_id: 
&MultipartId) -> Result<()> {
-        self.client
-            .delete_request(location, &[("uploadId", multipart_id)])
-            .await
+    async fn put_multipart(&self, location: &Path) -> Result<Box<dyn 
MultipartUpload>> {
+        let upload_id = self.client.create_multipart(location).await?;
+
+        Ok(Box::new(S3MultiPartUpload {
+            part_idx: 0,
+            state: Arc::new(UploadState {
+                client: Arc::clone(&self.client),
+                location: location.clone(),
+                upload_id: upload_id.clone(),
+                parts: Default::default(),
+            }),
+        }))
     }
 
     async fn get_opts(&self, location: &Path, options: GetOptions) -> 
Result<GetResult> {
@@ -319,30 +309,55 @@ impl ObjectStore for AmazonS3 {
     }
 }
 
+#[derive(Debug)]
 struct S3MultiPartUpload {
+    part_idx: usize,
+    state: Arc<UploadState>,
+}
+
+#[derive(Debug)]
+struct UploadState {
+    parts: Parts,
     location: Path,
     upload_id: String,
     client: Arc<S3Client>,
 }
 
 #[async_trait]
-impl PutPart for S3MultiPartUpload {
-    async fn put_part(&self, buf: Vec<u8>, part_idx: usize) -> Result<PartId> {
-        self.client
-            .put_part(&self.location, &self.upload_id, part_idx, buf.into())
+impl MultipartUpload for S3MultiPartUpload {
+    fn put_part(&mut self, data: Bytes) -> UploadPart {
+        let idx = self.part_idx;
+        self.part_idx += 1;
+        let state = Arc::clone(&self.state);
+        Box::pin(async move {
+            let part = state
+                .client
+                .put_part(&state.location, &state.upload_id, idx, data)
+                .await?;
+            state.parts.put(idx, part);
+            Ok(())
+        })
+    }
+
+    async fn complete(&mut self) -> Result<PutResult> {
+        let parts = self.state.parts.finish(self.part_idx)?;
+
+        self.state
+            .client
+            .complete_multipart(&self.state.location, &self.state.upload_id, 
parts)
             .await
     }
 
-    async fn complete(&self, completed_parts: Vec<PartId>) -> Result<()> {
-        self.client
-            .complete_multipart(&self.location, &self.upload_id, 
completed_parts)
-            .await?;
-        Ok(())
+    async fn abort(&mut self) -> Result<()> {
+        self.state
+            .client
+            .delete_request(&self.state.location, &[("uploadId", 
&self.state.upload_id)])
+            .await
     }
 }
 
 #[async_trait]
-impl MultiPartStore for AmazonS3 {
+impl MultipartStore for AmazonS3 {
     async fn create_multipart(&self, path: &Path) -> Result<MultipartId> {
         self.client.create_multipart(path).await
     }
@@ -377,7 +392,6 @@ mod tests {
     use crate::{client::get::GetClient, tests::*};
     use bytes::Bytes;
     use hyper::HeaderMap;
-    use tokio::io::AsyncWriteExt;
 
     const NON_EXISTENT_NAME: &str = "nonexistentname";
 
@@ -542,9 +556,9 @@ mod tests {
         store.put(&locations[0], data.clone()).await.unwrap();
         store.copy(&locations[0], &locations[1]).await.unwrap();
 
-        let (_, mut writer) = 
store.put_multipart(&locations[2]).await.unwrap();
-        writer.write_all(&data).await.unwrap();
-        writer.shutdown().await.unwrap();
+        let mut upload = store.put_multipart(&locations[2]).await.unwrap();
+        upload.put_part(data.clone()).await.unwrap();
+        upload.complete().await.unwrap();
 
         for location in &locations {
             let res = store
diff --git a/object_store/src/azure/mod.rs b/object_store/src/azure/mod.rs
index 712b7a36c56..5d3a405ccc9 100644
--- a/object_store/src/azure/mod.rs
+++ b/object_store/src/azure/mod.rs
@@ -19,19 +19,15 @@
 //!
 //! ## Streaming uploads
 //!
-//! [ObjectStore::put_multipart] will upload data in blocks and write a blob 
from those
-//! blocks. Data is buffered internally to make blocks of at least 5MB and 
blocks
-//! are uploaded concurrently.
+//! [ObjectStore::put_multipart] will upload data in blocks and write a blob 
from those blocks.
 //!
-//! [ObjectStore::abort_multipart] is a no-op, since Azure Blob Store doesn't 
provide
-//! a way to drop old blocks. Instead unused blocks are automatically cleaned 
up
-//! after 7 days.
+//! Unused blocks will automatically be dropped after 7 days.
 use crate::{
-    multipart::{MultiPartStore, PartId, PutPart, WriteMultiPart},
+    multipart::{MultipartStore, PartId},
     path::Path,
     signer::Signer,
-    GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, 
PutOptions, PutResult,
-    Result,
+    GetOptions, GetResult, ListResult, MultipartId, MultipartUpload, 
ObjectMeta, ObjectStore,
+    PutOptions, PutResult, Result, UploadPart,
 };
 use async_trait::async_trait;
 use bytes::Bytes;
@@ -40,7 +36,6 @@ use reqwest::Method;
 use std::fmt::Debug;
 use std::sync::Arc;
 use std::time::Duration;
-use tokio::io::AsyncWrite;
 use url::Url;
 
 use crate::client::get::GetClientExt;
@@ -54,6 +49,8 @@ mod credential;
 
 /// [`CredentialProvider`] for [`MicrosoftAzure`]
 pub type AzureCredentialProvider = Arc<dyn CredentialProvider<Credential = 
AzureCredential>>;
+use crate::azure::client::AzureClient;
+use crate::client::parts::Parts;
 pub use builder::{AzureConfigKey, MicrosoftAzureBuilder};
 pub use credential::AzureCredential;
 
@@ -94,21 +91,15 @@ impl ObjectStore for MicrosoftAzure {
         self.client.put_blob(location, bytes, opts).await
     }
 
-    async fn put_multipart(
-        &self,
-        location: &Path,
-    ) -> Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)> {
-        let inner = AzureMultiPartUpload {
-            client: Arc::clone(&self.client),
-            location: location.to_owned(),
-        };
-        Ok((String::new(), Box::new(WriteMultiPart::new(inner, 8))))
-    }
-
-    async fn abort_multipart(&self, _location: &Path, _multipart_id: 
&MultipartId) -> Result<()> {
-        // There is no way to drop blocks that have been uploaded. Instead, 
they simply
-        // expire in 7 days.
-        Ok(())
+    async fn put_multipart(&self, location: &Path) -> Result<Box<dyn 
MultipartUpload>> {
+        Ok(Box::new(AzureMultiPartUpload {
+            part_idx: 0,
+            state: Arc::new(UploadState {
+                client: Arc::clone(&self.client),
+                location: location.clone(),
+                parts: Default::default(),
+            }),
+        }))
     }
 
     async fn get_opts(&self, location: &Path, options: GetOptions) -> 
Result<GetResult> {
@@ -197,26 +188,49 @@ impl Signer for MicrosoftAzure {
 /// put_multipart_part -> PUT block
 /// complete -> PUT block list
 /// abort -> No equivalent; blocks are simply dropped after 7 days
-#[derive(Debug, Clone)]
+#[derive(Debug)]
 struct AzureMultiPartUpload {
-    client: Arc<client::AzureClient>,
+    part_idx: usize,
+    state: Arc<UploadState>,
+}
+
+#[derive(Debug)]
+struct UploadState {
     location: Path,
+    parts: Parts,
+    client: Arc<AzureClient>,
 }
 
 #[async_trait]
-impl PutPart for AzureMultiPartUpload {
-    async fn put_part(&self, buf: Vec<u8>, idx: usize) -> Result<PartId> {
-        self.client.put_block(&self.location, idx, buf.into()).await
+impl MultipartUpload for AzureMultiPartUpload {
+    fn put_part(&mut self, data: Bytes) -> UploadPart {
+        let idx = self.part_idx;
+        self.part_idx += 1;
+        let state = Arc::clone(&self.state);
+        Box::pin(async move {
+            let part = state.client.put_block(&state.location, idx, 
data).await?;
+            state.parts.put(idx, part);
+            Ok(())
+        })
+    }
+
+    async fn complete(&mut self) -> Result<PutResult> {
+        let parts = self.state.parts.finish(self.part_idx)?;
+
+        self.state
+            .client
+            .put_block_list(&self.state.location, parts)
+            .await
     }
 
-    async fn complete(&self, parts: Vec<PartId>) -> Result<()> {
-        self.client.put_block_list(&self.location, parts).await?;
+    async fn abort(&mut self) -> Result<()> {
+        // Nothing to do
         Ok(())
     }
 }
 
 #[async_trait]
-impl MultiPartStore for MicrosoftAzure {
+impl MultipartStore for MicrosoftAzure {
     async fn create_multipart(&self, _: &Path) -> Result<MultipartId> {
         Ok(String::new())
     }
diff --git a/object_store/src/buffered.rs b/object_store/src/buffered.rs
index 9299e1147bc..39f8eafbef7 100644
--- a/object_store/src/buffered.rs
+++ b/object_store/src/buffered.rs
@@ -18,7 +18,7 @@
 //! Utilities for performing tokio-style buffered IO
 
 use crate::path::Path;
-use crate::{MultipartId, ObjectMeta, ObjectStore};
+use crate::{ObjectMeta, ObjectStore, WriteMultipart};
 use bytes::Bytes;
 use futures::future::{BoxFuture, FutureExt};
 use futures::ready;
@@ -27,7 +27,7 @@ use std::io::{Error, ErrorKind, SeekFrom};
 use std::pin::Pin;
 use std::sync::Arc;
 use std::task::{Context, Poll};
-use tokio::io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, AsyncWriteExt, 
ReadBuf};
+use tokio::io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, ReadBuf};
 
 /// The default buffer size used by [`BufReader`]
 pub const DEFAULT_BUFFER_SIZE: usize = 1024 * 1024;
@@ -217,7 +217,6 @@ impl AsyncBufRead for BufReader {
 pub struct BufWriter {
     capacity: usize,
     state: BufWriterState,
-    multipart_id: Option<MultipartId>,
     store: Arc<dyn ObjectStore>,
 }
 
@@ -225,22 +224,19 @@ impl std::fmt::Debug for BufWriter {
     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
         f.debug_struct("BufWriter")
             .field("capacity", &self.capacity)
-            .field("multipart_id", &self.multipart_id)
             .finish()
     }
 }
 
-type MultipartResult = (MultipartId, Box<dyn AsyncWrite + Send + Unpin>);
-
 enum BufWriterState {
     /// Buffer up to capacity bytes
     Buffer(Path, Vec<u8>),
     /// [`ObjectStore::put_multipart`]
-    Prepare(BoxFuture<'static, std::io::Result<MultipartResult>>),
+    Prepare(BoxFuture<'static, std::io::Result<WriteMultipart>>),
     /// Write to a multipart upload
-    Write(Box<dyn AsyncWrite + Send + Unpin>),
+    Write(Option<WriteMultipart>),
     /// [`ObjectStore::put`]
-    Put(BoxFuture<'static, std::io::Result<()>>),
+    Flush(BoxFuture<'static, std::io::Result<()>>),
 }
 
 impl BufWriter {
@@ -255,14 +251,20 @@ impl BufWriter {
             capacity,
             store,
             state: BufWriterState::Buffer(path, Vec::new()),
-            multipart_id: None,
         }
     }
 
-    /// Returns the [`MultipartId`] of the multipart upload created by this
-    /// writer, if any.
-    pub fn multipart_id(&self) -> Option<&MultipartId> {
-        self.multipart_id.as_ref()
+    /// Abort this writer, cleaning up any partially uploaded state
+    ///
+    /// # Panic
+    ///
+    /// Panics if this writer has already been shutdown or aborted
+    pub async fn abort(&mut self) -> crate::Result<()> {
+        match &mut self.state {
+            BufWriterState::Buffer(_, _) | BufWriterState::Prepare(_) => 
Ok(()),
+            BufWriterState::Flush(_) => panic!("Already shut down"),
+            BufWriterState::Write(x) => x.take().unwrap().abort().await,
+        }
     }
 }
 
@@ -275,12 +277,15 @@ impl AsyncWrite for BufWriter {
         let cap = self.capacity;
         loop {
             return match &mut self.state {
-                BufWriterState::Write(write) => Pin::new(write).poll_write(cx, 
buf),
-                BufWriterState::Put(_) => panic!("Already shut down"),
+                BufWriterState::Write(Some(write)) => {
+                    write.write(buf);
+                    Poll::Ready(Ok(buf.len()))
+                }
+                BufWriterState::Write(None) | BufWriterState::Flush(_) => {
+                    panic!("Already shut down")
+                }
                 BufWriterState::Prepare(f) => {
-                    let (id, w) = ready!(f.poll_unpin(cx)?);
-                    self.state = BufWriterState::Write(w);
-                    self.multipart_id = Some(id);
+                    self.state = 
BufWriterState::Write(ready!(f.poll_unpin(cx)?).into());
                     continue;
                 }
                 BufWriterState::Buffer(path, b) => {
@@ -289,9 +294,10 @@ impl AsyncWrite for BufWriter {
                         let path = std::mem::take(path);
                         let store = Arc::clone(&self.store);
                         self.state = BufWriterState::Prepare(Box::pin(async 
move {
-                            let (id, mut writer) = 
store.put_multipart(&path).await?;
-                            writer.write_all(&buffer).await?;
-                            Ok((id, writer))
+                            let upload = store.put_multipart(&path).await?;
+                            let mut chunked = WriteMultipart::new(upload);
+                            chunked.write(&buffer);
+                            Ok(chunked)
                         }));
                         continue;
                     }
@@ -305,13 +311,10 @@ impl AsyncWrite for BufWriter {
     fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> 
Poll<Result<(), Error>> {
         loop {
             return match &mut self.state {
-                BufWriterState::Buffer(_, _) => Poll::Ready(Ok(())),
-                BufWriterState::Write(write) => Pin::new(write).poll_flush(cx),
-                BufWriterState::Put(_) => panic!("Already shut down"),
+                BufWriterState::Write(_) | BufWriterState::Buffer(_, _) => 
Poll::Ready(Ok(())),
+                BufWriterState::Flush(_) => panic!("Already shut down"),
                 BufWriterState::Prepare(f) => {
-                    let (id, w) = ready!(f.poll_unpin(cx)?);
-                    self.state = BufWriterState::Write(w);
-                    self.multipart_id = Some(id);
+                    self.state = 
BufWriterState::Write(ready!(f.poll_unpin(cx)?).into());
                     continue;
                 }
             };
@@ -322,21 +325,28 @@ impl AsyncWrite for BufWriter {
         loop {
             match &mut self.state {
                 BufWriterState::Prepare(f) => {
-                    let (id, w) = ready!(f.poll_unpin(cx)?);
-                    self.state = BufWriterState::Write(w);
-                    self.multipart_id = Some(id);
+                    self.state = 
BufWriterState::Write(ready!(f.poll_unpin(cx)?).into());
                 }
                 BufWriterState::Buffer(p, b) => {
                     let buf = std::mem::take(b);
                     let path = std::mem::take(p);
                     let store = Arc::clone(&self.store);
-                    self.state = BufWriterState::Put(Box::pin(async move {
+                    self.state = BufWriterState::Flush(Box::pin(async move {
                         store.put(&path, buf.into()).await?;
                         Ok(())
                     }));
                 }
-                BufWriterState::Put(f) => return f.poll_unpin(cx),
-                BufWriterState::Write(w) => return 
Pin::new(w).poll_shutdown(cx),
+                BufWriterState::Flush(f) => return f.poll_unpin(cx),
+                BufWriterState::Write(x) => {
+                    let upload = x.take().unwrap();
+                    self.state = BufWriterState::Flush(
+                        async move {
+                            upload.finish().await?;
+                            Ok(())
+                        }
+                        .boxed(),
+                    )
+                }
             }
         }
     }
@@ -357,7 +367,7 @@ mod tests {
     use super::*;
     use crate::memory::InMemory;
     use crate::path::Path;
-    use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncSeekExt};
+    use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncSeekExt, 
AsyncWriteExt};
 
     #[tokio::test]
     async fn test_buf_reader() {
@@ -448,9 +458,7 @@ mod tests {
         writer.write_all(&[0; 20]).await.unwrap();
         writer.flush().await.unwrap();
         writer.write_all(&[0; 5]).await.unwrap();
-        assert!(writer.multipart_id().is_none());
         writer.shutdown().await.unwrap();
-        assert!(writer.multipart_id().is_none());
         assert_eq!(store.head(&path).await.unwrap().size, 25);
 
         // Test multipart
@@ -458,9 +466,7 @@ mod tests {
         writer.write_all(&[0; 20]).await.unwrap();
         writer.flush().await.unwrap();
         writer.write_all(&[0; 20]).await.unwrap();
-        assert!(writer.multipart_id().is_some());
         writer.shutdown().await.unwrap();
-        assert!(writer.multipart_id().is_some());
 
         assert_eq!(store.head(&path).await.unwrap().size, 40);
     }
diff --git a/object_store/src/chunked.rs b/object_store/src/chunked.rs
index d33556f4b12..6db7f4b35e2 100644
--- a/object_store/src/chunked.rs
+++ b/object_store/src/chunked.rs
@@ -25,14 +25,13 @@ use async_trait::async_trait;
 use bytes::{BufMut, Bytes, BytesMut};
 use futures::stream::BoxStream;
 use futures::StreamExt;
-use tokio::io::AsyncWrite;
 
 use crate::path::Path;
+use crate::Result;
 use crate::{
-    GetOptions, GetResult, GetResultPayload, ListResult, ObjectMeta, 
ObjectStore, PutOptions,
-    PutResult,
+    GetOptions, GetResult, GetResultPayload, ListResult, MultipartUpload, 
ObjectMeta, ObjectStore,
+    PutOptions, PutResult,
 };
-use crate::{MultipartId, Result};
 
 /// Wraps a [`ObjectStore`] and makes its get response return chunks
 /// in a controllable manner.
@@ -67,17 +66,10 @@ impl ObjectStore for ChunkedStore {
         self.inner.put_opts(location, bytes, opts).await
     }
 
-    async fn put_multipart(
-        &self,
-        location: &Path,
-    ) -> Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)> {
+    async fn put_multipart(&self, location: &Path) -> Result<Box<dyn 
MultipartUpload>> {
         self.inner.put_multipart(location).await
     }
 
-    async fn abort_multipart(&self, location: &Path, multipart_id: 
&MultipartId) -> Result<()> {
-        self.inner.abort_multipart(location, multipart_id).await
-    }
-
     async fn get_opts(&self, location: &Path, options: GetOptions) -> 
Result<GetResult> {
         let r = self.inner.get_opts(location, options).await?;
         let stream = match r.payload {
diff --git a/object_store/src/client/mod.rs b/object_store/src/client/mod.rs
index 252e9fdcadf..7728f38954f 100644
--- a/object_store/src/client/mod.rs
+++ b/object_store/src/client/mod.rs
@@ -40,6 +40,9 @@ pub mod header;
 #[cfg(any(feature = "aws", feature = "gcp"))]
 pub mod s3;
 
+#[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))]
+pub mod parts;
+
 use async_trait::async_trait;
 use std::collections::HashMap;
 use std::str::FromStr;
diff --git a/object_store/src/client/parts.rs b/object_store/src/client/parts.rs
new file mode 100644
index 00000000000..9fc301edcf8
--- /dev/null
+++ b/object_store/src/client/parts.rs
@@ -0,0 +1,48 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::multipart::PartId;
+use parking_lot::Mutex;
+
+/// An interior mutable collection of upload parts and their corresponding 
part index
+#[derive(Debug, Default)]
+pub(crate) struct Parts(Mutex<Vec<(usize, PartId)>>);
+
+impl Parts {
+    /// Record the [`PartId`] for a given index
+    ///
+    /// Note: calling this method multiple times with the same `part_idx`
+    /// will result in multiple [`PartId`] in the final output
+    pub(crate) fn put(&self, part_idx: usize, id: PartId) {
+        self.0.lock().push((part_idx, id))
+    }
+
+    /// Produce the final list of [`PartId`] ordered by `part_idx`
+    ///
+    /// `expected` is the number of parts expected in the final result
+    pub(crate) fn finish(&self, expected: usize) -> crate::Result<Vec<PartId>> 
{
+        let mut parts = self.0.lock();
+        if parts.len() != expected {
+            return Err(crate::Error::Generic {
+                store: "Parts",
+                source: "Missing part".to_string().into(),
+            });
+        }
+        parts.sort_unstable_by_key(|(idx, _)| *idx);
+        Ok(parts.drain(..).map(|(_, v)| v).collect())
+    }
+}
diff --git a/object_store/src/gcp/client.rs b/object_store/src/gcp/client.rs
index e4b0f9af7d1..def53beefe7 100644
--- a/object_store/src/gcp/client.rs
+++ b/object_store/src/gcp/client.rs
@@ -272,7 +272,7 @@ impl GoogleCloudStorageClient {
         })
     }
 
-    /// Initiate a multi-part upload 
<https://cloud.google.com/storage/docs/xml-api/post-object-multipart>
+    /// Initiate a multipart upload 
<https://cloud.google.com/storage/docs/xml-api/post-object-multipart>
     pub async fn multipart_initiate(&self, path: &Path) -> Result<MultipartId> 
{
         let credential = self.get_credential().await?;
         let url = self.object_url(path);
diff --git a/object_store/src/gcp/mod.rs b/object_store/src/gcp/mod.rs
index 8633abbfb4d..2058d1f8055 100644
--- a/object_store/src/gcp/mod.rs
+++ b/object_store/src/gcp/mod.rs
@@ -17,18 +17,14 @@
 
 //! An object store implementation for Google Cloud Storage
 //!
-//! ## Multi-part uploads
+//! ## Multipart uploads
 //!
-//! [Multi-part 
uploads](https://cloud.google.com/storage/docs/multipart-uploads)
-//! can be initiated with the [ObjectStore::put_multipart] method.
-//! Data passed to the writer is automatically buffered to meet the minimum 
size
-//! requirements for a part. Multiple parts are uploaded concurrently.
-//!
-//! If the writer fails for any reason, you may have parts uploaded to GCS but 
not
-//! used that you may be charged for. Use the [ObjectStore::abort_multipart] 
method
-//! to abort the upload and drop those unneeded parts. In addition, you may 
wish to
-//! consider implementing automatic clean up of unused parts that are older 
than one
-//! week.
+//! [Multipart 
uploads](https://cloud.google.com/storage/docs/multipart-uploads)
+//! can be initiated with the [ObjectStore::put_multipart] method. If neither
+//! [`MultipartUpload::complete`] nor [`MultipartUpload::abort`] is invoked, 
you may
+//! have parts uploaded to GCS but not used, that you will be charged for. It 
is recommended
+//! you configure a [lifecycle rule] to abort incomplete multipart uploads 
after a certain
+//! period of time to avoid being charged for storing partial uploads.
 //!
 //! ## Using HTTP/2
 //!
@@ -36,24 +32,24 @@
 //! because it allows much higher throughput in our benchmarks (see
 //! [#5194](https://github.com/apache/arrow-rs/issues/5194)). HTTP/2 can be
 //! enabled by setting [crate::ClientConfigKey::Http1Only] to false.
+//!
+//! [lifecycle rule]: https://cloud.google.com/storage/docs/lifecycle#abort-mpu
 use std::sync::Arc;
 
 use crate::client::CredentialProvider;
 use crate::{
-    multipart::{PartId, PutPart, WriteMultiPart},
-    path::Path,
-    GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, 
PutOptions, PutResult,
-    Result,
+    multipart::PartId, path::Path, GetOptions, GetResult, ListResult, 
MultipartId, MultipartUpload,
+    ObjectMeta, ObjectStore, PutOptions, PutResult, Result, UploadPart,
 };
 use async_trait::async_trait;
 use bytes::Bytes;
 use client::GoogleCloudStorageClient;
 use futures::stream::BoxStream;
-use tokio::io::AsyncWrite;
 
 use crate::client::get::GetClientExt;
 use crate::client::list::ListClientExt;
-use crate::multipart::MultiPartStore;
+use crate::client::parts::Parts;
+use crate::multipart::MultipartStore;
 pub use builder::{GoogleCloudStorageBuilder, GoogleConfigKey};
 pub use credential::GcpCredential;
 
@@ -89,27 +85,50 @@ impl GoogleCloudStorage {
     }
 }
 
+#[derive(Debug)]
 struct GCSMultipartUpload {
+    state: Arc<UploadState>,
+    part_idx: usize,
+}
+
+#[derive(Debug)]
+struct UploadState {
     client: Arc<GoogleCloudStorageClient>,
     path: Path,
     multipart_id: MultipartId,
+    parts: Parts,
 }
 
 #[async_trait]
-impl PutPart for GCSMultipartUpload {
-    /// Upload an object part 
<https://cloud.google.com/storage/docs/xml-api/put-object-multipart>
-    async fn put_part(&self, buf: Vec<u8>, part_idx: usize) -> Result<PartId> {
-        self.client
-            .put_part(&self.path, &self.multipart_id, part_idx, buf.into())
+impl MultipartUpload for GCSMultipartUpload {
+    fn put_part(&mut self, data: Bytes) -> UploadPart {
+        let idx = self.part_idx;
+        self.part_idx += 1;
+        let state = Arc::clone(&self.state);
+        Box::pin(async move {
+            let part = state
+                .client
+                .put_part(&state.path, &state.multipart_id, idx, data)
+                .await?;
+            state.parts.put(idx, part);
+            Ok(())
+        })
+    }
+
+    async fn complete(&mut self) -> Result<PutResult> {
+        let parts = self.state.parts.finish(self.part_idx)?;
+
+        self.state
+            .client
+            .multipart_complete(&self.state.path, &self.state.multipart_id, 
parts)
             .await
     }
 
-    /// Complete a multipart upload 
<https://cloud.google.com/storage/docs/xml-api/post-object-complete>
-    async fn complete(&self, completed_parts: Vec<PartId>) -> Result<()> {
-        self.client
-            .multipart_complete(&self.path, &self.multipart_id, 
completed_parts)
-            .await?;
-        Ok(())
+    async fn abort(&mut self) -> Result<()> {
+        self.state
+            .client
+            .multipart_cleanup(&self.state.path, &self.state.multipart_id)
+            .await
     }
 }
 
@@ -119,27 +138,18 @@ impl ObjectStore for GoogleCloudStorage {
         self.client.put(location, bytes, opts).await
     }
 
-    async fn put_multipart(
-        &self,
-        location: &Path,
-    ) -> Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)> {
+    async fn put_multipart(&self, location: &Path) -> Result<Box<dyn 
MultipartUpload>> {
         let upload_id = self.client.multipart_initiate(location).await?;
 
-        let inner = GCSMultipartUpload {
-            client: Arc::clone(&self.client),
-            path: location.clone(),
-            multipart_id: upload_id.clone(),
-        };
-
-        Ok((upload_id, Box::new(WriteMultiPart::new(inner, 8))))
-    }
-
-    async fn abort_multipart(&self, location: &Path, multipart_id: 
&MultipartId) -> Result<()> {
-        self.client
-            .multipart_cleanup(location, multipart_id)
-            .await?;
-
-        Ok(())
+        Ok(Box::new(GCSMultipartUpload {
+            part_idx: 0,
+            state: Arc::new(UploadState {
+                client: Arc::clone(&self.client),
+                path: location.clone(),
+                multipart_id: upload_id.clone(),
+                parts: Default::default(),
+            }),
+        }))
     }
 
     async fn get_opts(&self, location: &Path, options: GetOptions) -> 
Result<GetResult> {
@@ -176,7 +186,7 @@ impl ObjectStore for GoogleCloudStorage {
 }
 
 #[async_trait]
-impl MultiPartStore for GoogleCloudStorage {
+impl MultipartStore for GoogleCloudStorage {
     async fn create_multipart(&self, path: &Path) -> Result<MultipartId> {
         self.client.multipart_initiate(path).await
     }
diff --git a/object_store/src/http/mod.rs b/object_store/src/http/mod.rs
index f1d11db4762..626337df27f 100644
--- a/object_store/src/http/mod.rs
+++ b/object_store/src/http/mod.rs
@@ -37,7 +37,6 @@ use futures::stream::BoxStream;
 use futures::{StreamExt, TryStreamExt};
 use itertools::Itertools;
 use snafu::{OptionExt, ResultExt, Snafu};
-use tokio::io::AsyncWrite;
 use url::Url;
 
 use crate::client::get::GetClientExt;
@@ -45,7 +44,7 @@ use crate::client::header::get_etag;
 use crate::http::client::Client;
 use crate::path::Path;
 use crate::{
-    ClientConfigKey, ClientOptions, GetOptions, GetResult, ListResult, 
MultipartId, ObjectMeta,
+    ClientConfigKey, ClientOptions, GetOptions, GetResult, ListResult, 
MultipartUpload, ObjectMeta,
     ObjectStore, PutMode, PutOptions, PutResult, Result, RetryConfig,
 };
 
@@ -115,15 +114,8 @@ impl ObjectStore for HttpStore {
         })
     }
 
-    async fn put_multipart(
-        &self,
-        _location: &Path,
-    ) -> Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)> {
-        Err(super::Error::NotImplemented)
-    }
-
-    async fn abort_multipart(&self, _location: &Path, _multipart_id: 
&MultipartId) -> Result<()> {
-        Err(super::Error::NotImplemented)
+    async fn put_multipart(&self, _location: &Path) -> Result<Box<dyn 
MultipartUpload>> {
+        Err(crate::Error::NotImplemented)
     }
 
     async fn get_opts(&self, location: &Path, options: GetOptions) -> 
Result<GetResult> {
diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs
index 4960f3ba390..e02675d88ab 100644
--- a/object_store/src/lib.rs
+++ b/object_store/src/lib.rs
@@ -269,12 +269,11 @@
 //!
 //! #  Multipart Upload
 //!
-//! Use the [`ObjectStore::put_multipart`] method to atomically write a large 
amount of data,
-//! with implementations automatically handling parallel, chunked upload where 
appropriate.
+//! Use the [`ObjectStore::put_multipart`] method to atomically write a large 
amount of data
 //!
 //! ```
 //! # use object_store::local::LocalFileSystem;
-//! # use object_store::ObjectStore;
+//! # use object_store::{ObjectStore, WriteMultipart};
 //! # use std::sync::Arc;
 //! # use bytes::Bytes;
 //! # use tokio::io::AsyncWriteExt;
@@ -286,12 +285,10 @@
 //! #
 //! let object_store: Arc<dyn ObjectStore> = get_object_store();
 //! let path = Path::from("data/large_file");
-//! let (_id, mut writer) =  object_store.put_multipart(&path).await.unwrap();
-//!
-//! let bytes = Bytes::from_static(b"hello");
-//! writer.write_all(&bytes).await.unwrap();
-//! writer.flush().await.unwrap();
-//! writer.shutdown().await.unwrap();
+//! let upload =  object_store.put_multipart(&path).await.unwrap();
+//! let mut write = WriteMultipart::new(upload);
+//! write.write(b"hello");
+//! write.finish().await.unwrap();
 //! # }
 //! ```
 //!
@@ -501,9 +498,11 @@ pub use tags::TagSet;
 
 pub mod multipart;
 mod parse;
+mod upload;
 mod util;
 
 pub use parse::{parse_url, parse_url_opts};
+pub use upload::*;
 pub use util::GetRange;
 
 use crate::path::Path;
@@ -520,12 +519,11 @@ use std::fmt::{Debug, Formatter};
 use std::io::{Read, Seek, SeekFrom};
 use std::ops::Range;
 use std::sync::Arc;
-use tokio::io::AsyncWrite;
 
 /// An alias for a dynamically dispatched object store implementation.
 pub type DynObjectStore = dyn ObjectStore;
 
-/// Id type for multi-part uploads.
+/// Id type for multipart uploads.
 pub type MultipartId = String;
 
 /// Universal API to multiple object store services.
@@ -543,48 +541,11 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + 
Debug + 'static {
     /// Save the provided bytes to the specified location with the given 
options
     async fn put_opts(&self, location: &Path, bytes: Bytes, opts: PutOptions) 
-> Result<PutResult>;
 
-    /// Get a multi-part upload that allows writing data in chunks.
-    ///
-    /// Most cloud-based uploads will buffer and upload parts in parallel.
-    ///
-    /// To complete the upload, [AsyncWrite::poll_shutdown] must be called
-    /// to completion. This operation is guaranteed to be atomic, it will 
either
-    /// make all the written data available at `location`, or fail. No clients
-    /// should be able to observe a partially written object.
-    ///
-    /// For some object stores (S3, GCS, and local in particular), if the
-    /// writer fails or panics, you must call [ObjectStore::abort_multipart]
-    /// to clean up partially written data.
-    ///
-    /// <div class="warning">
-    /// It is recommended applications wait for any in-flight requests to 
complete by calling `flush`, if
-    /// there may be a significant gap in time (> ~30s) before the next write.
-    /// These gaps can include times where the function returns control to the
-    /// caller while keeping the writer open. If `flush` is not called, futures
-    /// for in-flight requests may be left unpolled long enough for the 
requests
-    /// to time out, causing the write to fail.
-    /// </div>
-    ///
-    /// For applications requiring fine-grained control of multipart uploads
-    /// see [`MultiPartStore`], although note that this interface cannot be
-    /// supported by all [`ObjectStore`] backends.
-    ///
-    /// For applications looking to implement this interface for a custom
-    /// multipart API, see [`WriteMultiPart`] which handles the complexities
-    /// of performing parallel uploads of fixed size parts.
-    ///
-    /// [`WriteMultiPart`]: multipart::WriteMultiPart
-    /// [`MultiPartStore`]: multipart::MultiPartStore
-    async fn put_multipart(
-        &self,
-        location: &Path,
-    ) -> Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)>;
-
-    /// Cleanup an aborted upload.
+    /// Perform a multipart upload
     ///
-    /// See documentation for individual stores for exact behavior, as 
capabilities
-    /// vary by object store.
-    async fn abort_multipart(&self, location: &Path, multipart_id: 
&MultipartId) -> Result<()>;
+    /// Client should prefer [`ObjectStore::put`] for small payloads, as 
streaming uploads
+    /// typically require multiple separate requests. See [`MultipartUpload`] 
for more information
+    async fn put_multipart(&self, location: &Path) -> Result<Box<dyn 
MultipartUpload>>;
 
     /// Return the bytes that are stored at the specified location.
     async fn get(&self, location: &Path) -> Result<GetResult> {
@@ -769,21 +730,10 @@ macro_rules! as_ref_impl {
                 self.as_ref().put_opts(location, bytes, opts).await
             }
 
-            async fn put_multipart(
-                &self,
-                location: &Path,
-            ) -> Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)> {
+            async fn put_multipart(&self, location: &Path) -> Result<Box<dyn 
MultipartUpload>> {
                 self.as_ref().put_multipart(location).await
             }
 
-            async fn abort_multipart(
-                &self,
-                location: &Path,
-                multipart_id: &MultipartId,
-            ) -> Result<()> {
-                self.as_ref().abort_multipart(location, multipart_id).await
-            }
-
             async fn get(&self, location: &Path) -> Result<GetResult> {
                 self.as_ref().get(location).await
             }
@@ -1246,14 +1196,12 @@ mod test_util {
 #[cfg(test)]
 mod tests {
     use super::*;
-    use crate::multipart::MultiPartStore;
+    use crate::multipart::MultipartStore;
     use crate::test_util::flatten_list_stream;
     use chrono::TimeZone;
     use futures::stream::FuturesUnordered;
     use rand::distributions::Alphanumeric;
     use rand::{thread_rng, Rng};
-    use std::future::Future;
-    use tokio::io::AsyncWriteExt;
 
     pub(crate) async fn put_get_delete_list(storage: &DynObjectStore) {
         put_get_delete_list_opts(storage).await
@@ -1928,12 +1876,11 @@ mod tests {
         let location = Path::from("test_dir/test_upload_file.txt");
 
         // Can write to storage
-        let data = get_chunks(5_000, 10);
+        let data = get_chunks(5 * 1024 * 1024, 3);
         let bytes_expected = data.concat();
-        let (_, mut writer) = storage.put_multipart(&location).await.unwrap();
-        for chunk in &data {
-            writer.write_all(chunk).await.unwrap();
-        }
+        let mut upload = storage.put_multipart(&location).await.unwrap();
+        let uploads = data.into_iter().map(|x| upload.put_part(x));
+        futures::future::try_join_all(uploads).await.unwrap();
 
         // Object should not yet exist in store
         let meta_res = storage.head(&location).await;
@@ -1949,7 +1896,8 @@ mod tests {
         let result = storage.list_with_delimiter(None).await.unwrap();
         assert_eq!(&result.objects, &[]);
 
-        writer.shutdown().await.unwrap();
+        upload.complete().await.unwrap();
+
         let bytes_written = 
storage.get(&location).await.unwrap().bytes().await.unwrap();
         assert_eq!(bytes_expected, bytes_written);
 
@@ -1957,22 +1905,19 @@ mod tests {
         // Sizes chosen to ensure we write three parts
         let data = get_chunks(3_200_000, 7);
         let bytes_expected = data.concat();
-        let (_, mut writer) = storage.put_multipart(&location).await.unwrap();
+        let upload = storage.put_multipart(&location).await.unwrap();
+        let mut writer = WriteMultipart::new(upload);
         for chunk in &data {
-            writer.write_all(chunk).await.unwrap();
+            writer.write(chunk)
         }
-        writer.shutdown().await.unwrap();
+        writer.finish().await.unwrap();
         let bytes_written = 
storage.get(&location).await.unwrap().bytes().await.unwrap();
         assert_eq!(bytes_expected, bytes_written);
 
         // We can abort an empty write
         let location = Path::from("test_dir/test_abort_upload.txt");
-        let (upload_id, writer) = 
storage.put_multipart(&location).await.unwrap();
-        drop(writer);
-        storage
-            .abort_multipart(&location, &upload_id)
-            .await
-            .unwrap();
+        let mut upload = storage.put_multipart(&location).await.unwrap();
+        upload.abort().await.unwrap();
         let get_res = storage.get(&location).await;
         assert!(get_res.is_err());
         assert!(matches!(
@@ -1981,17 +1926,13 @@ mod tests {
         ));
 
         // We can abort an in-progress write
-        let (upload_id, mut writer) = 
storage.put_multipart(&location).await.unwrap();
-        if let Some(chunk) = data.first() {
-            writer.write_all(chunk).await.unwrap();
-            let _ = writer.write(chunk).await.unwrap();
-        }
-        drop(writer);
-
-        storage
-            .abort_multipart(&location, &upload_id)
+        let mut upload = storage.put_multipart(&location).await.unwrap();
+        upload
+            .put_part(data.first().unwrap().clone())
             .await
             .unwrap();
+
+        upload.abort().await.unwrap();
         let get_res = storage.get(&location).await;
         assert!(get_res.is_err());
         assert!(matches!(
@@ -2186,7 +2127,7 @@ mod tests {
         storage.delete(&path2).await.unwrap();
     }
 
-    pub(crate) async fn multipart(storage: &dyn ObjectStore, multipart: &dyn 
MultiPartStore) {
+    pub(crate) async fn multipart(storage: &dyn ObjectStore, multipart: &dyn 
MultipartStore) {
         let path = Path::from("test_multipart");
         let chunk_size = 5 * 1024 * 1024;
 
@@ -2253,7 +2194,7 @@ mod tests {
     pub(crate) async fn tagging<F, Fut>(storage: &dyn ObjectStore, validate: 
bool, get_tags: F)
     where
         F: Fn(Path) -> Fut + Send + Sync,
-        Fut: Future<Output = Result<reqwest::Response>> + Send,
+        Fut: std::future::Future<Output = Result<reqwest::Response>> + Send,
     {
         use bytes::Buf;
         use serde::Deserialize;
diff --git a/object_store/src/limit.rs b/object_store/src/limit.rs
index d1363d9a4d4..e5f6841638e 100644
--- a/object_store/src/limit.rs
+++ b/object_store/src/limit.rs
@@ -18,18 +18,16 @@
 //! An object store that limits the maximum concurrency of the wrapped 
implementation
 
 use crate::{
-    BoxStream, GetOptions, GetResult, GetResultPayload, ListResult, 
MultipartId, ObjectMeta,
-    ObjectStore, Path, PutOptions, PutResult, Result, StreamExt,
+    BoxStream, GetOptions, GetResult, GetResultPayload, ListResult, 
MultipartUpload, ObjectMeta,
+    ObjectStore, Path, PutOptions, PutResult, Result, StreamExt, UploadPart,
 };
 use async_trait::async_trait;
 use bytes::Bytes;
 use futures::{FutureExt, Stream};
-use std::io::{Error, IoSlice};
 use std::ops::Range;
 use std::pin::Pin;
 use std::sync::Arc;
 use std::task::{Context, Poll};
-use tokio::io::AsyncWrite;
 use tokio::sync::{OwnedSemaphorePermit, Semaphore};
 
 /// Store wrapper that wraps an inner store and limits the maximum number of 
concurrent
@@ -81,18 +79,12 @@ impl<T: ObjectStore> ObjectStore for LimitStore<T> {
         let _permit = self.semaphore.acquire().await.unwrap();
         self.inner.put_opts(location, bytes, opts).await
     }
-    async fn put_multipart(
-        &self,
-        location: &Path,
-    ) -> Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)> {
-        let permit = 
Arc::clone(&self.semaphore).acquire_owned().await.unwrap();
-        let (id, write) = self.inner.put_multipart(location).await?;
-        Ok((id, Box::new(PermitWrapper::new(write, permit))))
-    }
-
-    async fn abort_multipart(&self, location: &Path, multipart_id: 
&MultipartId) -> Result<()> {
-        let _permit = self.semaphore.acquire().await.unwrap();
-        self.inner.abort_multipart(location, multipart_id).await
+    async fn put_multipart(&self, location: &Path) -> Result<Box<dyn 
MultipartUpload>> {
+        let upload = self.inner.put_multipart(location).await?;
+        Ok(Box::new(LimitUpload {
+            semaphore: Arc::clone(&self.semaphore),
+            upload,
+        }))
     }
     async fn get(&self, location: &Path) -> Result<GetResult> {
         let permit = 
Arc::clone(&self.semaphore).acquire_owned().await.unwrap();
@@ -221,39 +213,42 @@ impl<T: Stream + Unpin> Stream for PermitWrapper<T> {
     }
 }
 
-impl<T: AsyncWrite + Unpin> AsyncWrite for PermitWrapper<T> {
-    fn poll_write(
-        mut self: Pin<&mut Self>,
-        cx: &mut Context<'_>,
-        buf: &[u8],
-    ) -> Poll<std::result::Result<usize, Error>> {
-        Pin::new(&mut self.inner).poll_write(cx, buf)
-    }
+/// An [`MultipartUpload`] wrapper that limits the maximum number of 
concurrent requests
+#[derive(Debug)]
+pub struct LimitUpload {
+    upload: Box<dyn MultipartUpload>,
+    semaphore: Arc<Semaphore>,
+}
 
-    fn poll_flush(
-        mut self: Pin<&mut Self>,
-        cx: &mut Context<'_>,
-    ) -> Poll<std::result::Result<(), Error>> {
-        Pin::new(&mut self.inner).poll_flush(cx)
+impl LimitUpload {
+    /// Create a new [`LimitUpload`] limiting `upload` to `max_concurrency` 
concurrent requests
+    pub fn new(upload: Box<dyn MultipartUpload>, max_concurrency: usize) -> 
Self {
+        Self {
+            upload,
+            semaphore: Arc::new(Semaphore::new(max_concurrency)),
+        }
     }
+}
 
-    fn poll_shutdown(
-        mut self: Pin<&mut Self>,
-        cx: &mut Context<'_>,
-    ) -> Poll<std::result::Result<(), Error>> {
-        Pin::new(&mut self.inner).poll_shutdown(cx)
+#[async_trait]
+impl MultipartUpload for LimitUpload {
+    fn put_part(&mut self, data: Bytes) -> UploadPart {
+        let upload = self.upload.put_part(data);
+        let s = Arc::clone(&self.semaphore);
+        Box::pin(async move {
+            let _permit = s.acquire().await.unwrap();
+            upload.await
+        })
     }
 
-    fn poll_write_vectored(
-        mut self: Pin<&mut Self>,
-        cx: &mut Context<'_>,
-        bufs: &[IoSlice<'_>],
-    ) -> Poll<std::result::Result<usize, Error>> {
-        Pin::new(&mut self.inner).poll_write_vectored(cx, bufs)
+    async fn complete(&mut self) -> Result<PutResult> {
+        let _permit = self.semaphore.acquire().await.unwrap();
+        self.upload.complete().await
     }
 
-    fn is_write_vectored(&self) -> bool {
-        self.inner.is_write_vectored()
+    async fn abort(&mut self) -> Result<()> {
+        let _permit = self.semaphore.acquire().await.unwrap();
+        self.upload.abort().await
     }
 }
 
diff --git a/object_store/src/local.rs b/object_store/src/local.rs
index d631771778d..a7eb4661f68 100644
--- a/object_store/src/local.rs
+++ b/object_store/src/local.rs
@@ -16,34 +16,32 @@
 // under the License.
 
 //! An object store implementation for a local filesystem
-use crate::{
-    maybe_spawn_blocking,
-    path::{absolute_path_to_url, Path},
-    util::InvalidGetRange,
-    GetOptions, GetResult, GetResultPayload, ListResult, MultipartId, 
ObjectMeta, ObjectStore,
-    PutMode, PutOptions, PutResult, Result,
-};
-use async_trait::async_trait;
-use bytes::Bytes;
-use chrono::{DateTime, Utc};
-use futures::future::BoxFuture;
-use futures::ready;
-use futures::{stream::BoxStream, StreamExt};
-use futures::{FutureExt, TryStreamExt};
-use snafu::{ensure, ResultExt, Snafu};
 use std::fs::{metadata, symlink_metadata, File, Metadata, OpenOptions};
 use std::io::{ErrorKind, Read, Seek, SeekFrom, Write};
 use std::ops::Range;
-use std::pin::Pin;
 use std::sync::Arc;
-use std::task::Poll;
 use std::time::SystemTime;
 use std::{collections::BTreeSet, convert::TryFrom, io};
 use std::{collections::VecDeque, path::PathBuf};
-use tokio::io::AsyncWrite;
+
+use async_trait::async_trait;
+use bytes::Bytes;
+use chrono::{DateTime, Utc};
+use futures::{stream::BoxStream, StreamExt};
+use futures::{FutureExt, TryStreamExt};
+use parking_lot::Mutex;
+use snafu::{ensure, OptionExt, ResultExt, Snafu};
 use url::Url;
 use walkdir::{DirEntry, WalkDir};
 
+use crate::{
+    maybe_spawn_blocking,
+    path::{absolute_path_to_url, Path},
+    util::InvalidGetRange,
+    GetOptions, GetResult, GetResultPayload, ListResult, MultipartUpload, 
ObjectMeta, ObjectStore,
+    PutMode, PutOptions, PutResult, Result, UploadPart,
+};
+
 /// A specialized `Error` for filesystem object store-related errors
 #[derive(Debug, Snafu)]
 #[allow(missing_docs)]
@@ -155,6 +153,9 @@ pub(crate) enum Error {
     InvalidPath {
         path: String,
     },
+
+    #[snafu(display("Upload aborted"))]
+    Aborted,
 }
 
 impl From<Error> for super::Error {
@@ -342,8 +343,7 @@ impl ObjectStore for LocalFileSystem {
 
         let path = self.path_to_filesystem(location)?;
         maybe_spawn_blocking(move || {
-            let (mut file, suffix) = new_staged_upload(&path)?;
-            let staging_path = staged_upload_path(&path, &suffix);
+            let (mut file, staging_path) = new_staged_upload(&path)?;
             let mut e_tag = None;
 
             let err = match file.write_all(&bytes) {
@@ -395,31 +395,10 @@ impl ObjectStore for LocalFileSystem {
         .await
     }
 
-    async fn put_multipart(
-        &self,
-        location: &Path,
-    ) -> Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)> {
-        let dest = self.path_to_filesystem(location)?;
-
-        let (file, suffix) = new_staged_upload(&dest)?;
-        Ok((
-            suffix.clone(),
-            Box::new(LocalUpload::new(dest, suffix, Arc::new(file))),
-        ))
-    }
-
-    async fn abort_multipart(&self, location: &Path, multipart_id: 
&MultipartId) -> Result<()> {
+    async fn put_multipart(&self, location: &Path) -> Result<Box<dyn 
MultipartUpload>> {
         let dest = self.path_to_filesystem(location)?;
-        let path: PathBuf = staged_upload_path(&dest, multipart_id);
-
-        maybe_spawn_blocking(move || match std::fs::remove_file(&path) {
-            Ok(_) => Ok(()),
-            Err(source) => match source.kind() {
-                ErrorKind::NotFound => Ok(()), // Already deleted
-                _ => Err(Error::UnableToDeleteFile { path, source }.into()),
-            },
-        })
-        .await
+        let (file, src) = new_staged_upload(&dest)?;
+        Ok(Box::new(LocalUpload::new(src, dest, file)))
     }
 
     async fn get_opts(&self, location: &Path, options: GetOptions) -> 
Result<GetResult> {
@@ -677,17 +656,17 @@ fn create_parent_dirs(path: &std::path::Path, source: 
io::Error) -> Result<()> {
     Ok(())
 }
 
-/// Generates a unique file path `{base}#{suffix}`, returning the opened 
`File` and `suffix`
+/// Generates a unique file path `{base}#{suffix}`, returning the opened 
`File` and `path`
 ///
 /// Creates any directories if necessary
-fn new_staged_upload(base: &std::path::Path) -> Result<(File, String)> {
+fn new_staged_upload(base: &std::path::Path) -> Result<(File, PathBuf)> {
     let mut multipart_id = 1;
     loop {
         let suffix = multipart_id.to_string();
         let path = staged_upload_path(base, &suffix);
         let mut options = OpenOptions::new();
         match options.read(true).write(true).create_new(true).open(&path) {
-            Ok(f) => return Ok((f, suffix)),
+            Ok(f) => return Ok((f, path)),
             Err(source) => match source.kind() {
                 ErrorKind::AlreadyExists => multipart_id += 1,
                 ErrorKind::NotFound => create_parent_dirs(&path, source)?,
@@ -705,194 +684,91 @@ fn staged_upload_path(dest: &std::path::Path, suffix: 
&str) -> PathBuf {
     staging_path.into()
 }
 
-enum LocalUploadState {
-    /// Upload is ready to send new data
-    Idle(Arc<File>),
-    /// In the middle of a write
-    Writing(Arc<File>, BoxFuture<'static, Result<usize, io::Error>>),
-    /// In the middle of syncing data and closing file.
-    ///
-    /// Future will contain last reference to file, so it will call drop on 
completion.
-    ShuttingDown(BoxFuture<'static, Result<(), io::Error>>),
-    /// File is being moved from it's temporary location to the final location
-    Committing(BoxFuture<'static, Result<(), io::Error>>),
-    /// Upload is complete
-    Complete,
+#[derive(Debug)]
+struct LocalUpload {
+    /// The upload state
+    state: Arc<UploadState>,
+    /// The location of the temporary file
+    src: Option<PathBuf>,
+    /// The next offset to write into the file
+    offset: u64,
 }
 
-struct LocalUpload {
-    inner_state: LocalUploadState,
+#[derive(Debug)]
+struct UploadState {
     dest: PathBuf,
-    multipart_id: MultipartId,
+    file: Mutex<Option<File>>,
 }
 
 impl LocalUpload {
-    pub fn new(dest: PathBuf, multipart_id: MultipartId, file: Arc<File>) -> 
Self {
+    pub fn new(src: PathBuf, dest: PathBuf, file: File) -> Self {
         Self {
-            inner_state: LocalUploadState::Idle(file),
-            dest,
-            multipart_id,
+            state: Arc::new(UploadState {
+                dest,
+                file: Mutex::new(Some(file)),
+            }),
+            src: Some(src),
+            offset: 0,
         }
     }
 }
 
-impl AsyncWrite for LocalUpload {
-    fn poll_write(
-        mut self: Pin<&mut Self>,
-        cx: &mut std::task::Context<'_>,
-        buf: &[u8],
-    ) -> Poll<Result<usize, io::Error>> {
-        let invalid_state = |condition: &str| -> Poll<Result<usize, 
io::Error>> {
-            Poll::Ready(Err(io::Error::new(
-                ErrorKind::InvalidInput,
-                format!("Tried to write to file {condition}."),
-            )))
-        };
+#[async_trait]
+impl MultipartUpload for LocalUpload {
+    fn put_part(&mut self, data: Bytes) -> UploadPart {
+        let offset = self.offset;
+        self.offset += data.len() as u64;
 
-        if let Ok(runtime) = tokio::runtime::Handle::try_current() {
-            let mut data: Vec<u8> = buf.to_vec();
-            let data_len = data.len();
-
-            loop {
-                match &mut self.inner_state {
-                    LocalUploadState::Idle(file) => {
-                        let file = Arc::clone(file);
-                        let file2 = Arc::clone(&file);
-                        let data: Vec<u8> = std::mem::take(&mut data);
-                        self.inner_state = LocalUploadState::Writing(
-                            file,
-                            Box::pin(
-                                runtime
-                                    .spawn_blocking(move || 
(&*file2).write_all(&data))
-                                    .map(move |res| match res {
-                                        Err(err) => 
Err(io::Error::new(ErrorKind::Other, err)),
-                                        Ok(res) => res.map(move |_| data_len),
-                                    }),
-                            ),
-                        );
-                    }
-                    LocalUploadState::Writing(file, inner_write) => {
-                        let res = ready!(inner_write.poll_unpin(cx));
-                        self.inner_state = 
LocalUploadState::Idle(Arc::clone(file));
-                        return Poll::Ready(res);
-                    }
-                    LocalUploadState::ShuttingDown(_) => {
-                        return invalid_state("when writer is shutting down");
-                    }
-                    LocalUploadState::Committing(_) => {
-                        return invalid_state("when writer is committing data");
-                    }
-                    LocalUploadState::Complete => {
-                        return invalid_state("when writer is complete");
-                    }
-                }
-            }
-        } else if let LocalUploadState::Idle(file) = &self.inner_state {
-            let file = Arc::clone(file);
-            (&*file).write_all(buf)?;
-            Poll::Ready(Ok(buf.len()))
-        } else {
-            // If we are running on this thread, then only possible states are 
Idle and Complete.
-            invalid_state("when writer is already complete.")
-        }
+        let s = Arc::clone(&self.state);
+        maybe_spawn_blocking(move || {
+            let mut f = s.file.lock();
+            let file = f.as_mut().context(AbortedSnafu)?;
+            file.seek(SeekFrom::Start(offset))
+                .context(SeekSnafu { path: &s.dest })?;
+            file.write_all(&data).context(UnableToCopyDataToFileSnafu)?;
+            Ok(())
+        })
+        .boxed()
     }
 
-    fn poll_flush(
-        self: Pin<&mut Self>,
-        _cx: &mut std::task::Context<'_>,
-    ) -> Poll<Result<(), io::Error>> {
-        Poll::Ready(Ok(()))
+    async fn complete(&mut self) -> Result<PutResult> {
+        let src = self.src.take().context(AbortedSnafu)?;
+        let s = Arc::clone(&self.state);
+        maybe_spawn_blocking(move || {
+            // Ensure no inflight writes
+            let f = s.file.lock().take().context(AbortedSnafu)?;
+            std::fs::rename(&src, &s.dest).context(UnableToRenameFileSnafu)?;
+            let metadata = f.metadata().map_err(|e| Error::Metadata {
+                source: e.into(),
+                path: src.to_string_lossy().to_string(),
+            })?;
+
+            Ok(PutResult {
+                e_tag: Some(get_etag(&metadata)),
+                version: None,
+            })
+        })
+        .await
     }
 
-    fn poll_shutdown(
-        mut self: Pin<&mut Self>,
-        cx: &mut std::task::Context<'_>,
-    ) -> Poll<Result<(), io::Error>> {
-        if let Ok(runtime) = tokio::runtime::Handle::try_current() {
-            loop {
-                match &mut self.inner_state {
-                    LocalUploadState::Idle(file) => {
-                        // We are moving file into the future, and it will be 
dropped on it's completion, closing the file.
-                        let file = Arc::clone(file);
-                        self.inner_state = 
LocalUploadState::ShuttingDown(Box::pin(
-                            runtime
-                                .spawn_blocking(move || (*file).sync_all())
-                                .map(move |res| match res {
-                                    Err(err) => 
Err(io::Error::new(io::ErrorKind::Other, err)),
-                                    Ok(res) => res,
-                                }),
-                        ));
-                    }
-                    LocalUploadState::ShuttingDown(fut) => match 
fut.poll_unpin(cx) {
-                        Poll::Ready(res) => {
-                            res?;
-                            let staging_path = staged_upload_path(&self.dest, 
&self.multipart_id);
-                            let dest = self.dest.clone();
-                            self.inner_state = 
LocalUploadState::Committing(Box::pin(
-                                runtime
-                                    .spawn_blocking(move || 
std::fs::rename(&staging_path, &dest))
-                                    .map(move |res| match res {
-                                        Err(err) => 
Err(io::Error::new(io::ErrorKind::Other, err)),
-                                        Ok(res) => res,
-                                    }),
-                            ));
-                        }
-                        Poll::Pending => {
-                            return Poll::Pending;
-                        }
-                    },
-                    LocalUploadState::Writing(_, _) => {
-                        return Poll::Ready(Err(io::Error::new(
-                            io::ErrorKind::InvalidInput,
-                            "Tried to commit a file where a write is in 
progress.",
-                        )));
-                    }
-                    LocalUploadState::Committing(fut) => {
-                        let res = ready!(fut.poll_unpin(cx));
-                        self.inner_state = LocalUploadState::Complete;
-                        return Poll::Ready(res);
-                    }
-                    LocalUploadState::Complete => {
-                        return Poll::Ready(Err(io::Error::new(
-                            io::ErrorKind::Other,
-                            "Already complete",
-                        )))
-                    }
-                }
-            }
-        } else {
-            let staging_path = staged_upload_path(&self.dest, 
&self.multipart_id);
-            match &mut self.inner_state {
-                LocalUploadState::Idle(file) => {
-                    let file = Arc::clone(file);
-                    self.inner_state = LocalUploadState::Complete;
-                    file.sync_all()?;
-                    drop(file);
-                    std::fs::rename(staging_path, &self.dest)?;
-                    Poll::Ready(Ok(()))
-                }
-                _ => {
-                    // If we are running on this thread, then only possible 
states are Idle and Complete.
-                    Poll::Ready(Err(io::Error::new(ErrorKind::Other, "Already 
complete")))
-                }
-            }
-        }
+    async fn abort(&mut self) -> Result<()> {
+        let src = self.src.take().context(AbortedSnafu)?;
+        maybe_spawn_blocking(move || {
+            std::fs::remove_file(&src).context(UnableToDeleteFileSnafu { path: 
&src })?;
+            Ok(())
+        })
+        .await
     }
 }
 
 impl Drop for LocalUpload {
     fn drop(&mut self) {
-        match self.inner_state {
-            LocalUploadState::Complete => (),
-            _ => {
-                self.inner_state = LocalUploadState::Complete;
-                let path = staged_upload_path(&self.dest, &self.multipart_id);
-                // Try to cleanup intermediate file ignoring any error
-                match tokio::runtime::Handle::try_current() {
-                    Ok(r) => drop(r.spawn_blocking(move || 
std::fs::remove_file(path))),
-                    Err(_) => drop(std::fs::remove_file(path)),
-                };
-            }
+        if let Some(src) = self.src.take() {
+            // Try to clean up intermediate file ignoring any error
+            match tokio::runtime::Handle::try_current() {
+                Ok(r) => drop(r.spawn_blocking(move || 
std::fs::remove_file(src))),
+                Err(_) => drop(std::fs::remove_file(src)),
+            };
         }
     }
 }
@@ -1095,12 +971,13 @@ fn convert_walkdir_result(
 
 #[cfg(test)]
 mod tests {
-    use super::*;
-    use crate::test_util::flatten_list_stream;
-    use crate::tests::*;
     use futures::TryStreamExt;
     use tempfile::{NamedTempFile, TempDir};
-    use tokio::io::AsyncWriteExt;
+
+    use crate::test_util::flatten_list_stream;
+    use crate::tests::*;
+
+    use super::*;
 
     #[tokio::test]
     async fn file_test() {
@@ -1125,7 +1002,18 @@ mod tests {
             put_get_delete_list(&integration).await;
             list_uses_directories_correctly(&integration).await;
             list_with_delimiter(&integration).await;
-            stream_get(&integration).await;
+
+            // Can't use stream_get test as WriteMultipart uses a tokio JoinSet
+            let p = Path::from("manual_upload");
+            let mut upload = integration.put_multipart(&p).await.unwrap();
+            upload.put_part(Bytes::from_static(b"123")).await.unwrap();
+            upload.put_part(Bytes::from_static(b"45678")).await.unwrap();
+            let r = upload.complete().await.unwrap();
+
+            let get = integration.get(&p).await.unwrap();
+            assert_eq!(get.meta.e_tag.as_ref().unwrap(), 
r.e_tag.as_ref().unwrap());
+            let actual = get.bytes().await.unwrap();
+            assert_eq!(actual.as_ref(), b"12345678");
         });
     }
 
@@ -1422,12 +1310,11 @@ mod tests {
         let location = Path::from("some_file");
 
         let data = Bytes::from("arbitrary data");
-        let (multipart_id, mut writer) = 
integration.put_multipart(&location).await.unwrap();
-        writer.write_all(&data).await.unwrap();
+        let mut u1 = integration.put_multipart(&location).await.unwrap();
+        u1.put_part(data.clone()).await.unwrap();
 
-        let (multipart_id_2, mut writer_2) = 
integration.put_multipart(&location).await.unwrap();
-        assert_ne!(multipart_id, multipart_id_2);
-        writer_2.write_all(&data).await.unwrap();
+        let mut u2 = integration.put_multipart(&location).await.unwrap();
+        u2.put_part(data).await.unwrap();
 
         let list = flatten_list_stream(&integration, None).await.unwrap();
         assert_eq!(list.len(), 0);
@@ -1520,11 +1407,13 @@ mod tests {
 #[cfg(not(target_arch = "wasm32"))]
 #[cfg(test)]
 mod not_wasm_tests {
-    use crate::local::LocalFileSystem;
-    use crate::{ObjectStore, Path};
     use std::time::Duration;
+
+    use bytes::Bytes;
     use tempfile::TempDir;
-    use tokio::io::AsyncWriteExt;
+
+    use crate::local::LocalFileSystem;
+    use crate::{ObjectStore, Path};
 
     #[tokio::test]
     async fn test_cleanup_intermediate_files() {
@@ -1532,12 +1421,13 @@ mod not_wasm_tests {
         let integration = 
LocalFileSystem::new_with_prefix(root.path()).unwrap();
 
         let location = Path::from("some_file");
-        let (_, mut writer) = 
integration.put_multipart(&location).await.unwrap();
-        writer.write_all(b"hello").await.unwrap();
+        let data = Bytes::from_static(b"hello");
+        let mut upload = integration.put_multipart(&location).await.unwrap();
+        upload.put_part(data).await.unwrap();
 
         let file_count = std::fs::read_dir(root.path()).unwrap().count();
         assert_eq!(file_count, 1);
-        drop(writer);
+        drop(upload);
 
         tokio::time::sleep(Duration::from_millis(1)).await;
 
@@ -1549,13 +1439,15 @@ mod not_wasm_tests {
 #[cfg(target_family = "unix")]
 #[cfg(test)]
 mod unix_test {
-    use crate::local::LocalFileSystem;
-    use crate::{ObjectStore, Path};
+    use std::fs::OpenOptions;
+
     use nix::sys::stat;
     use nix::unistd;
-    use std::fs::OpenOptions;
     use tempfile::TempDir;
 
+    use crate::local::LocalFileSystem;
+    use crate::{ObjectStore, Path};
+
     #[tokio::test]
     async fn test_fifo() {
         let filename = "some_file";
diff --git a/object_store/src/memory.rs b/object_store/src/memory.rs
index 41ee1091a3b..6c960d4f24f 100644
--- a/object_store/src/memory.rs
+++ b/object_store/src/memory.rs
@@ -16,27 +16,24 @@
 // under the License.
 
 //! An in-memory object store implementation
-use crate::multipart::{MultiPartStore, PartId};
-use crate::util::InvalidGetRange;
-use crate::{
-    path::Path, GetRange, GetResult, GetResultPayload, ListResult, ObjectMeta, 
ObjectStore,
-    PutMode, PutOptions, PutResult, Result, UpdateVersion,
-};
-use crate::{GetOptions, MultipartId};
+use std::collections::{BTreeMap, BTreeSet, HashMap};
+use std::ops::Range;
+use std::sync::Arc;
+
 use async_trait::async_trait;
 use bytes::Bytes;
 use chrono::{DateTime, Utc};
 use futures::{stream::BoxStream, StreamExt};
 use parking_lot::RwLock;
 use snafu::{OptionExt, ResultExt, Snafu};
-use std::collections::BTreeSet;
-use std::collections::{BTreeMap, HashMap};
-use std::io;
-use std::ops::Range;
-use std::pin::Pin;
-use std::sync::Arc;
-use std::task::Poll;
-use tokio::io::AsyncWrite;
+
+use crate::multipart::{MultipartStore, PartId};
+use crate::util::InvalidGetRange;
+use crate::GetOptions;
+use crate::{
+    path::Path, GetRange, GetResult, GetResultPayload, ListResult, 
MultipartId, MultipartUpload,
+    ObjectMeta, ObjectStore, PutMode, PutOptions, PutResult, Result, 
UpdateVersion, UploadPart,
+};
 
 /// A specialized `Error` for in-memory object store-related errors
 #[derive(Debug, Snafu)]
@@ -213,23 +210,12 @@ impl ObjectStore for InMemory {
         })
     }
 
-    async fn put_multipart(
-        &self,
-        location: &Path,
-    ) -> Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)> {
-        Ok((
-            String::new(),
-            Box::new(InMemoryUpload {
-                location: location.clone(),
-                data: Vec::new(),
-                storage: Arc::clone(&self.storage),
-            }),
-        ))
-    }
-
-    async fn abort_multipart(&self, _location: &Path, _multipart_id: 
&MultipartId) -> Result<()> {
-        // Nothing to clean up
-        Ok(())
+    async fn put_multipart(&self, location: &Path) -> Result<Box<dyn 
MultipartUpload>> {
+        Ok(Box::new(InMemoryUpload {
+            location: location.clone(),
+            parts: vec![],
+            storage: Arc::clone(&self.storage),
+        }))
     }
 
     async fn get_opts(&self, location: &Path, options: GetOptions) -> 
Result<GetResult> {
@@ -391,7 +377,7 @@ impl ObjectStore for InMemory {
 }
 
 #[async_trait]
-impl MultiPartStore for InMemory {
+impl MultipartStore for InMemory {
     async fn create_multipart(&self, _path: &Path) -> Result<MultipartId> {
         let mut storage = self.storage.write();
         let etag = storage.next_etag;
@@ -482,45 +468,42 @@ impl InMemory {
     }
 }
 
+#[derive(Debug)]
 struct InMemoryUpload {
     location: Path,
-    data: Vec<u8>,
+    parts: Vec<Bytes>,
     storage: Arc<RwLock<Storage>>,
 }
 
-impl AsyncWrite for InMemoryUpload {
-    fn poll_write(
-        mut self: Pin<&mut Self>,
-        _cx: &mut std::task::Context<'_>,
-        buf: &[u8],
-    ) -> Poll<Result<usize, io::Error>> {
-        self.data.extend_from_slice(buf);
-        Poll::Ready(Ok(buf.len()))
+#[async_trait]
+impl MultipartUpload for InMemoryUpload {
+    fn put_part(&mut self, data: Bytes) -> UploadPart {
+        self.parts.push(data);
+        Box::pin(futures::future::ready(Ok(())))
     }
 
-    fn poll_flush(
-        self: Pin<&mut Self>,
-        _cx: &mut std::task::Context<'_>,
-    ) -> Poll<Result<(), io::Error>> {
-        Poll::Ready(Ok(()))
+    async fn complete(&mut self) -> Result<PutResult> {
+        let cap = self.parts.iter().map(|x| x.len()).sum();
+        let mut buf = Vec::with_capacity(cap);
+        self.parts.iter().for_each(|x| buf.extend_from_slice(x));
+        let etag = self.storage.write().insert(&self.location, buf.into());
+        Ok(PutResult {
+            e_tag: Some(etag.to_string()),
+            version: None,
+        })
     }
 
-    fn poll_shutdown(
-        mut self: Pin<&mut Self>,
-        _cx: &mut std::task::Context<'_>,
-    ) -> Poll<Result<(), io::Error>> {
-        let data = Bytes::from(std::mem::take(&mut self.data));
-        self.storage.write().insert(&self.location, data);
-        Poll::Ready(Ok(()))
+    async fn abort(&mut self) -> Result<()> {
+        Ok(())
     }
 }
 
 #[cfg(test)]
 mod tests {
-    use super::*;
-
     use crate::tests::*;
 
+    use super::*;
+
     #[tokio::test]
     async fn in_memory_test() {
         let integration = InMemory::new();
diff --git a/object_store/src/multipart.rs b/object_store/src/multipart.rs
index 1dcd5a6f496..26cce393624 100644
--- a/object_store/src/multipart.rs
+++ b/object_store/src/multipart.rs
@@ -17,34 +17,16 @@
 
 //! Cloud Multipart Upload
 //!
-//! This crate provides an asynchronous interface for multipart file uploads 
to cloud storage services.
-//! It's designed to offer efficient, non-blocking operations,
+//! This crate provides an asynchronous interface for multipart file uploads to
+//! cloud storage services. It's designed to offer efficient, non-blocking 
operations,
 //! especially useful when dealing with large files or high-throughput systems.
 
 use async_trait::async_trait;
 use bytes::Bytes;
-use futures::{stream::FuturesUnordered, Future, StreamExt};
-use std::{io, pin::Pin, sync::Arc, task::Poll};
-use tokio::io::AsyncWrite;
 
 use crate::path::Path;
 use crate::{MultipartId, PutResult, Result};
 
-type BoxedTryFuture<T> = Pin<Box<dyn Future<Output = Result<T, io::Error>> + 
Send>>;
-
-/// A trait used in combination with [`WriteMultiPart`] to implement
-/// [`AsyncWrite`] on top of an API for multipart upload
-#[async_trait]
-pub trait PutPart: Send + Sync + 'static {
-    /// Upload a single part
-    async fn put_part(&self, buf: Vec<u8>, part_idx: usize) -> Result<PartId>;
-
-    /// Complete the upload with the provided parts
-    ///
-    /// `completed_parts` is in order of part number
-    async fn complete(&self, completed_parts: Vec<PartId>) -> Result<()>;
-}
-
 /// Represents a part of a file that has been successfully uploaded in a 
multipart upload process.
 #[derive(Debug, Clone)]
 pub struct PartId {
@@ -52,222 +34,6 @@ pub struct PartId {
     pub content_id: String,
 }
 
-/// Wrapper around a [`PutPart`] that implements [`AsyncWrite`]
-///
-/// Data will be uploaded in fixed size chunks of 10 MiB in parallel,
-/// up to the configured maximum concurrency
-pub struct WriteMultiPart<T: PutPart> {
-    inner: Arc<T>,
-    /// A list of completed parts, in sequential order.
-    completed_parts: Vec<Option<PartId>>,
-    /// Part upload tasks currently running
-    tasks: FuturesUnordered<BoxedTryFuture<(usize, PartId)>>,
-    /// Maximum number of upload tasks to run concurrently
-    max_concurrency: usize,
-    /// Buffer that will be sent in next upload.
-    current_buffer: Vec<u8>,
-    /// Size of each part.
-    ///
-    /// While S3 and Minio support variable part sizes, R2 requires they all be
-    /// exactly the same size.
-    part_size: usize,
-    /// Index of current part
-    current_part_idx: usize,
-    /// The completion task
-    completion_task: Option<BoxedTryFuture<()>>,
-}
-
-impl<T: PutPart> WriteMultiPart<T> {
-    /// Create a new multipart upload with the implementation and the given 
maximum concurrency
-    pub fn new(inner: T, max_concurrency: usize) -> Self {
-        Self {
-            inner: Arc::new(inner),
-            completed_parts: Vec::new(),
-            tasks: FuturesUnordered::new(),
-            max_concurrency,
-            current_buffer: Vec::new(),
-            // TODO: Should self vary by provider?
-            // TODO: Should we automatically increase then when part index 
gets large?
-
-            // Minimum size of 5 MiB
-            // 
https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html
-            // https://cloud.google.com/storage/quotas#requests
-            part_size: 10 * 1024 * 1024,
-            current_part_idx: 0,
-            completion_task: None,
-        }
-    }
-
-    // Add data to the current buffer, returning the number of bytes added
-    fn add_to_buffer(mut self: Pin<&mut Self>, buf: &[u8], offset: usize) -> 
usize {
-        let remaining_capacity = self.part_size - self.current_buffer.len();
-        let to_copy = std::cmp::min(remaining_capacity, buf.len() - offset);
-        self.current_buffer
-            .extend_from_slice(&buf[offset..offset + to_copy]);
-        to_copy
-    }
-
-    /// Poll current tasks
-    fn poll_tasks(
-        mut self: Pin<&mut Self>,
-        cx: &mut std::task::Context<'_>,
-    ) -> Result<(), io::Error> {
-        if self.tasks.is_empty() {
-            return Ok(());
-        }
-        while let Poll::Ready(Some(res)) = self.tasks.poll_next_unpin(cx) {
-            let (part_idx, part) = res?;
-            let total_parts = self.completed_parts.len();
-            self.completed_parts
-                .resize(std::cmp::max(part_idx + 1, total_parts), None);
-            self.completed_parts[part_idx] = Some(part);
-        }
-        Ok(())
-    }
-
-    // The `poll_flush` function will only flush the in-progress tasks.
-    // The `final_flush` method called during `poll_shutdown` will flush
-    // the `current_buffer` along with in-progress tasks.
-    // Please see https://github.com/apache/arrow-rs/issues/3390 for more 
details.
-    fn final_flush(
-        mut self: Pin<&mut Self>,
-        cx: &mut std::task::Context<'_>,
-    ) -> Poll<Result<(), io::Error>> {
-        // Poll current tasks
-        self.as_mut().poll_tasks(cx)?;
-
-        // If current_buffer is not empty, see if it can be submitted
-        if !self.current_buffer.is_empty() && self.tasks.len() < 
self.max_concurrency {
-            let out_buffer: Vec<u8> = std::mem::take(&mut self.current_buffer);
-            let inner = Arc::clone(&self.inner);
-            let part_idx = self.current_part_idx;
-            self.tasks.push(Box::pin(async move {
-                let upload_part = inner.put_part(out_buffer, part_idx).await?;
-                Ok((part_idx, upload_part))
-            }));
-        }
-
-        self.as_mut().poll_tasks(cx)?;
-
-        // If tasks and current_buffer are empty, return Ready
-        if self.tasks.is_empty() && self.current_buffer.is_empty() {
-            Poll::Ready(Ok(()))
-        } else {
-            Poll::Pending
-        }
-    }
-}
-
-impl<T: PutPart> AsyncWrite for WriteMultiPart<T> {
-    fn poll_write(
-        mut self: Pin<&mut Self>,
-        cx: &mut std::task::Context<'_>,
-        buf: &[u8],
-    ) -> Poll<Result<usize, io::Error>> {
-        // Poll current tasks
-        self.as_mut().poll_tasks(cx)?;
-
-        let mut offset = 0;
-
-        loop {
-            // Fill up current buffer
-            offset += self.as_mut().add_to_buffer(buf, offset);
-
-            // If we don't have a full buffer or we have too many tasks, break
-            if self.current_buffer.len() < self.part_size
-                || self.tasks.len() >= self.max_concurrency
-            {
-                break;
-            }
-
-            let new_buffer = Vec::with_capacity(self.part_size);
-            let out_buffer = std::mem::replace(&mut self.current_buffer, 
new_buffer);
-            let inner = Arc::clone(&self.inner);
-            let part_idx = self.current_part_idx;
-            self.tasks.push(Box::pin(async move {
-                let upload_part = inner.put_part(out_buffer, part_idx).await?;
-                Ok((part_idx, upload_part))
-            }));
-            self.current_part_idx += 1;
-
-            // We need to poll immediately after adding to setup waker
-            self.as_mut().poll_tasks(cx)?;
-        }
-
-        // If offset is zero, then we didn't write anything because we didn't
-        // have capacity for more tasks and our buffer is full.
-        if offset == 0 && !buf.is_empty() {
-            Poll::Pending
-        } else {
-            Poll::Ready(Ok(offset))
-        }
-    }
-
-    fn poll_flush(
-        mut self: Pin<&mut Self>,
-        cx: &mut std::task::Context<'_>,
-    ) -> Poll<Result<(), io::Error>> {
-        // Poll current tasks
-        self.as_mut().poll_tasks(cx)?;
-
-        // If tasks is empty, return Ready
-        if self.tasks.is_empty() {
-            Poll::Ready(Ok(()))
-        } else {
-            Poll::Pending
-        }
-    }
-
-    fn poll_shutdown(
-        mut self: Pin<&mut Self>,
-        cx: &mut std::task::Context<'_>,
-    ) -> Poll<Result<(), io::Error>> {
-        // First, poll flush
-        match self.as_mut().final_flush(cx) {
-            Poll::Pending => return Poll::Pending,
-            Poll::Ready(res) => res?,
-        };
-
-        // If shutdown task is not set, set it
-        let parts = std::mem::take(&mut self.completed_parts);
-        let parts = parts
-            .into_iter()
-            .enumerate()
-            .map(|(idx, part)| {
-                part.ok_or_else(|| {
-                    io::Error::new(
-                        io::ErrorKind::Other,
-                        format!("Missing information for upload part {idx}"),
-                    )
-                })
-            })
-            .collect::<Result<_, _>>()?;
-
-        let inner = Arc::clone(&self.inner);
-        let completion_task = self.completion_task.get_or_insert_with(|| {
-            Box::pin(async move {
-                inner.complete(parts).await?;
-                Ok(())
-            })
-        });
-
-        Pin::new(completion_task).poll(cx)
-    }
-}
-
-impl<T: PutPart> std::fmt::Debug for WriteMultiPart<T> {
-    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
-        f.debug_struct("WriteMultiPart")
-            .field("completed_parts", &self.completed_parts)
-            .field("tasks", &self.tasks)
-            .field("max_concurrency", &self.max_concurrency)
-            .field("current_buffer", &self.current_buffer)
-            .field("part_size", &self.part_size)
-            .field("current_part_idx", &self.current_part_idx)
-            .finish()
-    }
-}
-
 /// A low-level interface for interacting with multipart upload APIs
 ///
 /// Most use-cases should prefer [`ObjectStore::put_multipart`] as this is 
supported by more
@@ -277,7 +43,7 @@ impl<T: PutPart> std::fmt::Debug for WriteMultiPart<T> {
 /// [`ObjectStore::put_multipart`]: crate::ObjectStore::put_multipart
 /// [`LocalFileSystem`]: crate::local::LocalFileSystem
 #[async_trait]
-pub trait MultiPartStore: Send + Sync + 'static {
+pub trait MultipartStore: Send + Sync + 'static {
     /// Creates a new multipart upload, returning the [`MultipartId`]
     async fn create_multipart(&self, path: &Path) -> Result<MultipartId>;
 
@@ -288,10 +54,11 @@ pub trait MultiPartStore: Send + Sync + 'static {
     ///
     /// Most stores require that all parts excluding the last are at least 5 
MiB, and some
     /// further require that all parts excluding the last be the same size, 
e.g. [R2].
-    /// [`WriteMultiPart`] performs writes in fixed size blocks of 10 MiB, and 
clients wanting
+    /// [`WriteMultipart`] performs writes in fixed size blocks of 5 MiB, and 
clients wanting
     /// to maximise compatibility should look to do likewise.
     ///
     /// [R2]: 
https://developers.cloudflare.com/r2/objects/multipart-objects/#limitations
+    /// [`WriteMultipart`]: crate::upload::WriteMultipart
     async fn put_part(
         &self,
         path: &Path,
diff --git a/object_store/src/prefix.rs b/object_store/src/prefix.rs
index 38f9b07bbd0..053f71a2d06 100644
--- a/object_store/src/prefix.rs
+++ b/object_store/src/prefix.rs
@@ -19,12 +19,11 @@
 use bytes::Bytes;
 use futures::{stream::BoxStream, StreamExt, TryStreamExt};
 use std::ops::Range;
-use tokio::io::AsyncWrite;
 
 use crate::path::Path;
 use crate::{
-    GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, 
PutOptions, PutResult,
-    Result,
+    GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, 
ObjectStore, PutOptions,
+    PutResult, Result,
 };
 
 #[doc(hidden)]
@@ -91,18 +90,11 @@ impl<T: ObjectStore> ObjectStore for PrefixStore<T> {
         self.inner.put_opts(&full_path, bytes, opts).await
     }
 
-    async fn put_multipart(
-        &self,
-        location: &Path,
-    ) -> Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)> {
+    async fn put_multipart(&self, location: &Path) -> Result<Box<dyn 
MultipartUpload>> {
         let full_path = self.full_path(location);
         self.inner.put_multipart(&full_path).await
     }
 
-    async fn abort_multipart(&self, location: &Path, multipart_id: 
&MultipartId) -> Result<()> {
-        let full_path = self.full_path(location);
-        self.inner.abort_multipart(&full_path, multipart_id).await
-    }
     async fn get(&self, location: &Path) -> Result<GetResult> {
         let full_path = self.full_path(location);
         self.inner.get(&full_path).await
diff --git a/object_store/src/throttle.rs b/object_store/src/throttle.rs
index 252256a4599..5ca1eedbf73 100644
--- a/object_store/src/throttle.rs
+++ b/object_store/src/throttle.rs
@@ -20,16 +20,15 @@ use parking_lot::Mutex;
 use std::ops::Range;
 use std::{convert::TryInto, sync::Arc};
 
+use crate::GetOptions;
 use crate::{
-    path::Path, GetResult, GetResultPayload, ListResult, ObjectMeta, 
ObjectStore, PutOptions,
-    PutResult, Result,
+    path::Path, GetResult, GetResultPayload, ListResult, MultipartUpload, 
ObjectMeta, ObjectStore,
+    PutOptions, PutResult, Result,
 };
-use crate::{GetOptions, MultipartId};
 use async_trait::async_trait;
 use bytes::Bytes;
 use futures::{stream::BoxStream, FutureExt, StreamExt};
 use std::time::Duration;
-use tokio::io::AsyncWrite;
 
 /// Configuration settings for throttled store
 #[derive(Debug, Default, Clone, Copy)]
@@ -158,14 +157,7 @@ impl<T: ObjectStore> ObjectStore for ThrottledStore<T> {
         self.inner.put_opts(location, bytes, opts).await
     }
 
-    async fn put_multipart(
-        &self,
-        _location: &Path,
-    ) -> Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)> {
-        Err(super::Error::NotImplemented)
-    }
-
-    async fn abort_multipart(&self, _location: &Path, _multipart_id: 
&MultipartId) -> Result<()> {
+    async fn put_multipart(&self, _location: &Path) -> Result<Box<dyn 
MultipartUpload>> {
         Err(super::Error::NotImplemented)
     }
 
diff --git a/object_store/src/upload.rs b/object_store/src/upload.rs
new file mode 100644
index 00000000000..6f8bfa8a5f7
--- /dev/null
+++ b/object_store/src/upload.rs
@@ -0,0 +1,175 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::{PutResult, Result};
+use async_trait::async_trait;
+use bytes::Bytes;
+use futures::future::BoxFuture;
+use tokio::task::JoinSet;
+
+/// An upload part request
+pub type UploadPart = BoxFuture<'static, Result<()>>;
+
+/// A trait allowing writing an object in fixed size chunks
+///
+/// Consecutive chunks of data can be written by calling 
[`MultipartUpload::put_part`] and polling
+/// the returned futures to completion. Multiple futures returned by 
[`MultipartUpload::put_part`]
+/// may be polled in parallel, allowing for concurrent uploads.
+///
+/// Once all part uploads have been polled to completion, the upload can be 
completed by
+/// calling [`MultipartUpload::complete`]. This will make the entire uploaded 
object visible
+/// as an atomic operation.It is implementation behind behaviour if 
[`MultipartUpload::complete`]
+/// is called before all [`UploadPart`] have been polled to completion.
+#[async_trait]
+pub trait MultipartUpload: Send + std::fmt::Debug {
+    /// Upload the next part
+    ///
+    /// Most stores require that all parts excluding the last are at least 5 
MiB, and some
+    /// further require that all parts excluding the last be the same size, 
e.g. [R2].
+    /// Clients wanting to maximise compatibility should therefore perform 
writes in
+    /// fixed size blocks larger than 5 MiB.
+    ///
+    /// Implementations may invoke this method multiple times and then await 
on the
+    /// returned futures in parallel
+    ///
+    /// ```no_run
+    /// # use futures::StreamExt;
+    /// # use object_store::MultipartUpload;
+    /// #
+    /// # async fn test() {
+    /// #
+    /// let mut upload: Box<&dyn MultipartUpload> = todo!();
+    /// let p1 = upload.put_part(vec![0; 10 * 1024 * 1024].into());
+    /// let p2 = upload.put_part(vec![1; 10 * 1024 * 1024].into());
+    /// futures::future::try_join(p1, p2).await.unwrap();
+    /// upload.complete().await.unwrap();
+    /// # }
+    /// ```
+    ///
+    /// [R2]: 
https://developers.cloudflare.com/r2/objects/multipart-objects/#limitations
+    fn put_part(&mut self, data: Bytes) -> UploadPart;
+
+    /// Complete the multipart upload
+    ///
+    /// It is implementation defined behaviour if this method is called before 
polling
+    /// all [`UploadPart`] returned by [`MultipartUpload::put_part`] to 
completion. Additionally,
+    /// it is implementation defined behaviour to call 
[`MultipartUpload::complete`]
+    /// on an already completed or aborted [`MultipartUpload`].
+    async fn complete(&mut self) -> Result<PutResult>;
+
+    /// Abort the multipart upload
+    ///
+    /// If a [`MultipartUpload`] is dropped without calling 
[`MultipartUpload::complete`],
+    /// some object stores will automatically clean up any previously uploaded 
parts.
+    /// However, some stores, such as S3 and GCS, cannot perform cleanup on 
drop.
+    /// As such [`MultipartUpload::abort`] can be invoked to perform this 
cleanup.
+    ///
+    /// It will not be possible to call `abort` in all failure scenarios, for 
example
+    /// non-graceful shutdown of the calling application. It is therefore 
recommended
+    /// object stores are configured with lifecycle rules to automatically 
cleanup
+    /// unused parts older than some threshold. See [crate::aws] and 
[crate::gcp]
+    /// for more information.
+    ///
+    /// It is implementation defined behaviour to call 
[`MultipartUpload::abort`]
+    /// on an already completed or aborted [`MultipartUpload`]
+    async fn abort(&mut self) -> Result<()>;
+}
+
+/// A synchronous write API for uploading data in parallel in fixed size chunks
+///
+/// Uses multiple tokio tasks in a [`JoinSet`] to multiplex upload tasks in 
parallel
+///
+/// The design also takes inspiration from [`Sink`] with 
[`WriteMultipart::wait_for_capacity`]
+/// allowing back pressure on producers, prior to buffering the next part. 
However, unlike
+/// [`Sink`] this back pressure is optional, allowing integration with 
synchronous producers
+///
+/// [`Sink`]: futures::sink::Sink
+#[derive(Debug)]
+pub struct WriteMultipart {
+    upload: Box<dyn MultipartUpload>,
+
+    buffer: Vec<u8>,
+
+    tasks: JoinSet<Result<()>>,
+}
+
+impl WriteMultipart {
+    /// Create a new [`WriteMultipart`] that will upload using 5MB chunks
+    pub fn new(upload: Box<dyn MultipartUpload>) -> Self {
+        Self::new_with_capacity(upload, 5 * 1024 * 1024)
+    }
+
+    /// Create a new [`WriteMultipart`] that will upload in fixed `capacity` 
sized chunks
+    pub fn new_with_capacity(upload: Box<dyn MultipartUpload>, capacity: 
usize) -> Self {
+        Self {
+            upload,
+            buffer: Vec::with_capacity(capacity),
+            tasks: Default::default(),
+        }
+    }
+
+    /// Wait until there are `max_concurrency` or fewer requests in-flight
+    pub async fn wait_for_capacity(&mut self, max_concurrency: usize) -> 
Result<()> {
+        while self.tasks.len() > max_concurrency {
+            self.tasks.join_next().await.unwrap()??;
+        }
+        Ok(())
+    }
+
+    /// Write data to this [`WriteMultipart`]
+    ///
+    /// Note this method is synchronous (not `async`) and will immediately 
start new uploads
+    /// as soon as the internal `capacity` is hit, regardless of
+    /// how many outstanding uploads are already in progress.
+    ///
+    /// Back pressure can optionally be applied to producers by calling
+    /// [`Self::wait_for_capacity`] prior to calling this method
+    pub fn write(&mut self, mut buf: &[u8]) {
+        while !buf.is_empty() {
+            let capacity = self.buffer.capacity();
+            let remaining = capacity - self.buffer.len();
+            let to_read = buf.len().min(remaining);
+            self.buffer.extend_from_slice(&buf[..to_read]);
+            if to_read == remaining {
+                let part = std::mem::replace(&mut self.buffer, 
Vec::with_capacity(capacity));
+                self.put_part(part.into())
+            }
+            buf = &buf[to_read..]
+        }
+    }
+
+    fn put_part(&mut self, part: Bytes) {
+        self.tasks.spawn(self.upload.put_part(part));
+    }
+
+    /// Abort this upload, attempting to clean up any successfully uploaded 
parts
+    pub async fn abort(mut self) -> Result<()> {
+        self.tasks.shutdown().await;
+        self.upload.abort().await
+    }
+
+    /// Flush final chunk, and await completion of all in-flight requests
+    pub async fn finish(mut self) -> Result<PutResult> {
+        if !self.buffer.is_empty() {
+            let part = std::mem::take(&mut self.buffer);
+            self.put_part(part.into())
+        }
+
+        self.wait_for_capacity(0).await?;
+        self.upload.complete().await
+    }
+}
diff --git a/object_store/tests/get_range_file.rs 
b/object_store/tests/get_range_file.rs
index f73d78578f0..309a86d8fe9 100644
--- a/object_store/tests/get_range_file.rs
+++ b/object_store/tests/get_range_file.rs
@@ -25,7 +25,6 @@ use object_store::path::Path;
 use object_store::*;
 use std::fmt::Formatter;
 use tempfile::tempdir;
-use tokio::io::AsyncWrite;
 
 #[derive(Debug)]
 struct MyStore(LocalFileSystem);
@@ -42,14 +41,7 @@ impl ObjectStore for MyStore {
         self.0.put_opts(path, data, opts).await
     }
 
-    async fn put_multipart(
-        &self,
-        _: &Path,
-    ) -> Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)> {
-        todo!()
-    }
-
-    async fn abort_multipart(&self, _: &Path, _: &MultipartId) -> Result<()> {
+    async fn put_multipart(&self, _location: &Path) -> Result<Box<dyn 
MultipartUpload>> {
         todo!()
     }
 


Reply via email to