This is an automated email from the ASF dual-hosted git repository.

mneumann pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs-object-store.git


The following commit(s) were added to refs/heads/main by this push:
     new cb85a61  refactor: move `put_multipart` to `ObjectStoreExt` (#530)
cb85a61 is described below

commit cb85a6133f31fd2fc0a22d8809f239d398af4916
Author: Marco Neumann <[email protected]>
AuthorDate: Thu Nov 6 14:23:38 2025 +0100

    refactor: move `put_multipart` to `ObjectStoreExt` (#530)
    
    See #385 and #405.
---
 src/aws/mod.rs   |  2 +-
 src/azure/mod.rs |  2 +-
 src/buffered.rs  |  6 +++---
 src/chunked.rs   |  4 ----
 src/gcp/mod.rs   |  2 +-
 src/lib.rs       | 38 ++++++++++++++++++++------------------
 src/limit.rs     |  7 -------
 src/local.rs     |  2 +-
 src/multipart.rs |  4 ++--
 src/prefix.rs    |  5 -----
 src/throttle.rs  |  8 --------
 src/upload.rs    |  2 +-
 12 files changed, 30 insertions(+), 52 deletions(-)

diff --git a/src/aws/mod.rs b/src/aws/mod.rs
index c4e35eb..bcd429d 100644
--- a/src/aws/mod.rs
+++ b/src/aws/mod.rs
@@ -19,7 +19,7 @@
 //!
 //! ## Multipart uploads
 //!
-//! Multipart uploads can be initiated with the [ObjectStore::put_multipart] 
method.
+//! Multipart uploads can be initiated with the 
[`ObjectStore::put_multipart_opts`] method.
 //!
 //! If the writer fails for any reason, you may have parts uploaded to AWS but 
not
 //! used that you will be charged for. [`MultipartUpload::abort`] may be 
invoked to drop
diff --git a/src/azure/mod.rs b/src/azure/mod.rs
index 2bbee67..3f5c723 100644
--- a/src/azure/mod.rs
+++ b/src/azure/mod.rs
@@ -19,7 +19,7 @@
 //!
 //! ## Streaming uploads
 //!
-//! [ObjectStore::put_multipart] will upload data in blocks and write a blob 
from those blocks.
+//! [`ObjectStore::put_multipart_opts`] will upload data in blocks and write a 
blob from those blocks.
 //!
 //! Unused blocks will automatically be dropped after 7 days.
 use crate::{
diff --git a/src/buffered.rs b/src/buffered.rs
index fc235f7..59a858e 100644
--- a/src/buffered.rs
+++ b/src/buffered.rs
@@ -211,12 +211,12 @@ impl AsyncBufRead for BufReader {
 /// An async buffered writer compatible with the tokio IO traits
 ///
 /// This writer adaptively uses [`ObjectStore::put_opts`] or
-/// [`ObjectStore::put_multipart`] depending on the amount of data that has
+/// [`ObjectStore::put_multipart_opts`] depending on the amount of data that 
has
 /// been written.
 ///
 /// Up to `capacity` bytes will be buffered in memory, and flushed on shutdown
 /// using [`ObjectStore::put_opts`]. If `capacity` is exceeded, data will 
instead be
-/// streamed using [`ObjectStore::put_multipart`]
+/// streamed using [`ObjectStore::put_multipart_opts`].
 pub struct BufWriter {
     capacity: usize,
     max_concurrency: usize,
@@ -238,7 +238,7 @@ impl std::fmt::Debug for BufWriter {
 enum BufWriterState {
     /// Buffer up to capacity bytes
     Buffer(Path, PutPayloadMut),
-    /// [`ObjectStore::put_multipart`]
+    /// [`ObjectStore::put_multipart_opts`]
     Prepare(BoxFuture<'static, crate::Result<WriteMultipart>>),
     /// Write to a multipart upload
     Write(Option<WriteMultipart>),
diff --git a/src/chunked.rs b/src/chunked.rs
index 6f5416c..1775d95 100644
--- a/src/chunked.rs
+++ b/src/chunked.rs
@@ -71,10 +71,6 @@ impl ObjectStore for ChunkedStore {
         self.inner.put_opts(location, payload, opts).await
     }
 
-    async fn put_multipart(&self, location: &Path) -> Result<Box<dyn 
MultipartUpload>> {
-        self.inner.put_multipart(location).await
-    }
-
     async fn put_multipart_opts(
         &self,
         location: &Path,
diff --git a/src/gcp/mod.rs b/src/gcp/mod.rs
index f8a2e0f..befe6ae 100644
--- a/src/gcp/mod.rs
+++ b/src/gcp/mod.rs
@@ -20,7 +20,7 @@
 //! ## Multipart uploads
 //!
 //! [Multipart 
uploads](https://cloud.google.com/storage/docs/multipart-uploads)
-//! can be initiated with the [ObjectStore::put_multipart] method. If neither
+//! can be initiated with the [`ObjectStore::put_multipart_opts`] method. If 
neither
 //! [`MultipartUpload::complete`] nor [`MultipartUpload::abort`] is invoked, 
you may
 //! have parts uploaded to GCS but not used, that you will be charged for. It 
is recommended
 //! you configure a [lifecycle rule] to abort incomplete multipart uploads 
after a certain
diff --git a/src/lib.rs b/src/lib.rs
index 2c672e7..55da5d5 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -273,11 +273,12 @@
 //!
 //! # Multipart Upload
 //!
-//! Use the [`ObjectStore::put_multipart`] method to atomically write a large 
amount of data
+//! Use the [`ObjectStoreExt::put_multipart`] / 
[`ObjectStore::put_multipart_opts`] method to atomically write a large
+//! amount of data
 //!
 //! ```ignore-wasm32
 //! # use object_store::local::LocalFileSystem;
-//! # use object_store::{ObjectStore, WriteMultipart};
+//! # use object_store::{ObjectStore, ObjectStoreExt, WriteMultipart};
 //! # use std::sync::Arc;
 //! # use bytes::Bytes;
 //! # use tokio::io::AsyncWriteExt;
@@ -638,17 +639,6 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + 
Debug + 'static {
         opts: PutOptions,
     ) -> Result<PutResult>;
 
-    /// Perform a multipart upload
-    ///
-    /// Client should prefer [`ObjectStoreExt::put`] for small payloads, as 
streaming uploads
-    /// typically require multiple separate requests. See [`MultipartUpload`] 
for more information
-    ///
-    /// For more advanced multipart uploads see 
[`MultipartStore`](multipart::MultipartStore)
-    async fn put_multipart(&self, location: &Path) -> Result<Box<dyn 
MultipartUpload>> {
-        self.put_multipart_opts(location, PutMultipartOptions::default())
-            .await
-    }
-
     /// Perform a multipart upload with options
     ///
     /// Client should prefer [`ObjectStore::put_opts`] for small payloads, as 
streaming uploads
@@ -1117,10 +1107,6 @@ macro_rules! as_ref_impl {
                 self.as_ref().put_opts(location, payload, opts).await
             }
 
-            async fn put_multipart(&self, location: &Path) -> Result<Box<dyn 
MultipartUpload>> {
-                self.as_ref().put_multipart(location).await
-            }
-
             async fn put_multipart_opts(
                 &self,
                 location: &Path,
@@ -1202,7 +1188,7 @@ macro_rules! as_ref_impl {
 as_ref_impl!(Arc<dyn ObjectStore>);
 as_ref_impl!(Box<dyn ObjectStore>);
 
-/// Extension trait for [`ObjectStore`] with convinience functions.
+/// Extension trait for [`ObjectStore`] with convenience functions.
 ///
 /// See "contract" section within the [`ObjectStore`] documentation for more 
reasoning.
 ///
@@ -1215,6 +1201,17 @@ pub trait ObjectStoreExt: ObjectStore {
     /// write the entirety of `payload` to `location`, or fail. No clients
     /// should be able to observe a partially written object
     fn put(&self, location: &Path, payload: PutPayload) -> impl Future<Output 
= Result<PutResult>>;
+
+    /// Perform a multipart upload
+    ///
+    /// Client should prefer [`ObjectStoreExt::put`] for small payloads, as 
streaming uploads
+    /// typically require multiple separate requests. See [`MultipartUpload`] 
for more information
+    ///
+    /// For more advanced multipart uploads see 
[`MultipartStore`](multipart::MultipartStore)
+    fn put_multipart(
+        &self,
+        location: &Path,
+    ) -> impl Future<Output = Result<Box<dyn MultipartUpload>>>;
 }
 
 impl<T> ObjectStoreExt for T
@@ -1225,6 +1222,11 @@ where
         self.put_opts(location, payload, PutOptions::default())
             .await
     }
+
+    async fn put_multipart(&self, location: &Path) -> Result<Box<dyn 
MultipartUpload>> {
+        self.put_multipart_opts(location, PutMultipartOptions::default())
+            .await
+    }
 }
 
 /// Result of a list call that includes objects, prefixes (directories) and a
diff --git a/src/limit.rs b/src/limit.rs
index 45764e8..dc5f7cc 100644
--- a/src/limit.rs
+++ b/src/limit.rs
@@ -80,13 +80,6 @@ impl<T: ObjectStore> ObjectStore for LimitStore<T> {
         let _permit = self.semaphore.acquire().await.unwrap();
         self.inner.put_opts(location, payload, opts).await
     }
-    async fn put_multipart(&self, location: &Path) -> Result<Box<dyn 
MultipartUpload>> {
-        let upload = self.inner.put_multipart(location).await?;
-        Ok(Box::new(LimitUpload {
-            semaphore: Arc::clone(&self.semaphore),
-            upload,
-        }))
-    }
 
     async fn put_multipart_opts(
         &self,
diff --git a/src/local.rs b/src/local.rs
index 44ed654..0571133 100644
--- a/src/local.rs
+++ b/src/local.rs
@@ -1719,7 +1719,7 @@ mod not_wasm_tests {
     use tempfile::TempDir;
 
     use crate::local::LocalFileSystem;
-    use crate::{ObjectStore, Path, PutPayload};
+    use crate::{ObjectStoreExt, Path, PutPayload};
 
     #[tokio::test]
     async fn test_cleanup_intermediate_files() {
diff --git a/src/multipart.rs b/src/multipart.rs
index d94e7f1..c084725 100644
--- a/src/multipart.rs
+++ b/src/multipart.rs
@@ -35,11 +35,11 @@ pub struct PartId {
 
 /// A low-level interface for interacting with multipart upload APIs
 ///
-/// Most use-cases should prefer [`ObjectStore::put_multipart`] as this is 
supported by more
+/// Most use-cases should prefer [`ObjectStore::put_multipart_opts`] as this 
is supported by more
 /// backends, including [`LocalFileSystem`], and automatically handles 
uploading fixed
 /// size parts of sufficient size in parallel
 ///
-/// [`ObjectStore::put_multipart`]: crate::ObjectStore::put_multipart
+/// [`ObjectStore::put_multipart_opts`]: crate::ObjectStore::put_multipart_opts
 /// [`LocalFileSystem`]: crate::local::LocalFileSystem
 #[async_trait]
 pub trait MultipartStore: Send + Sync + 'static {
diff --git a/src/prefix.rs b/src/prefix.rs
index 559cd3b..cf4a72d 100644
--- a/src/prefix.rs
+++ b/src/prefix.rs
@@ -104,11 +104,6 @@ impl<T: ObjectStore> ObjectStore for PrefixStore<T> {
         self.inner.put_opts(&full_path, payload, opts).await
     }
 
-    async fn put_multipart(&self, location: &Path) -> Result<Box<dyn 
MultipartUpload>> {
-        let full_path = self.full_path(location);
-        self.inner.put_multipart(&full_path).await
-    }
-
     async fn put_multipart_opts(
         &self,
         location: &Path,
diff --git a/src/throttle.rs b/src/throttle.rs
index 18b37b8..f6a5389 100644
--- a/src/throttle.rs
+++ b/src/throttle.rs
@@ -158,14 +158,6 @@ impl<T: ObjectStore> ObjectStore for ThrottledStore<T> {
         self.inner.put_opts(location, payload, opts).await
     }
 
-    async fn put_multipart(&self, location: &Path) -> Result<Box<dyn 
MultipartUpload>> {
-        let upload = self.inner.put_multipart(location).await?;
-        Ok(Box::new(ThrottledUpload {
-            upload,
-            sleep: self.config().wait_put_per_call,
-        }))
-    }
-
     async fn put_multipart_opts(
         &self,
         location: &Path,
diff --git a/src/upload.rs b/src/upload.rs
index a27d0dd..33698a4 100644
--- a/src/upload.rs
+++ b/src/upload.rs
@@ -248,7 +248,7 @@ mod tests {
     use rand::prelude::StdRng;
     use rand::{Rng, SeedableRng};
 
-    use crate::ObjectStore;
+    use crate::ObjectStoreExt;
     use crate::memory::InMemory;
     use crate::path::Path;
     use crate::throttle::{ThrottleConfig, ThrottledStore};

Reply via email to