This is an automated email from the ASF dual-hosted git repository.
mneumann pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs-object-store.git
The following commit(s) were added to refs/heads/main by this push:
new cb85a61 refactor: move `put_multipart` to `ObjectStoreExt` (#530)
cb85a61 is described below
commit cb85a6133f31fd2fc0a22d8809f239d398af4916
Author: Marco Neumann <[email protected]>
AuthorDate: Thu Nov 6 14:23:38 2025 +0100
refactor: move `put_multipart` to `ObjectStoreExt` (#530)
See #385 and #405.
---
src/aws/mod.rs | 2 +-
src/azure/mod.rs | 2 +-
src/buffered.rs | 6 +++---
src/chunked.rs | 4 ----
src/gcp/mod.rs | 2 +-
src/lib.rs | 38 ++++++++++++++++++++------------------
src/limit.rs | 7 -------
src/local.rs | 2 +-
src/multipart.rs | 4 ++--
src/prefix.rs | 5 -----
src/throttle.rs | 8 --------
src/upload.rs | 2 +-
12 files changed, 30 insertions(+), 52 deletions(-)
diff --git a/src/aws/mod.rs b/src/aws/mod.rs
index c4e35eb..bcd429d 100644
--- a/src/aws/mod.rs
+++ b/src/aws/mod.rs
@@ -19,7 +19,7 @@
//!
//! ## Multipart uploads
//!
-//! Multipart uploads can be initiated with the [ObjectStore::put_multipart]
method.
+//! Multipart uploads can be initiated with the
[`ObjectStore::put_multipart_opts`] method.
//!
//! If the writer fails for any reason, you may have parts uploaded to AWS but
not
//! used that you will be charged for. [`MultipartUpload::abort`] may be
invoked to drop
diff --git a/src/azure/mod.rs b/src/azure/mod.rs
index 2bbee67..3f5c723 100644
--- a/src/azure/mod.rs
+++ b/src/azure/mod.rs
@@ -19,7 +19,7 @@
//!
//! ## Streaming uploads
//!
-//! [ObjectStore::put_multipart] will upload data in blocks and write a blob
from those blocks.
+//! [`ObjectStore::put_multipart_opts`] will upload data in blocks and write a
blob from those blocks.
//!
//! Unused blocks will automatically be dropped after 7 days.
use crate::{
diff --git a/src/buffered.rs b/src/buffered.rs
index fc235f7..59a858e 100644
--- a/src/buffered.rs
+++ b/src/buffered.rs
@@ -211,12 +211,12 @@ impl AsyncBufRead for BufReader {
/// An async buffered writer compatible with the tokio IO traits
///
/// This writer adaptively uses [`ObjectStore::put_opts`] or
-/// [`ObjectStore::put_multipart`] depending on the amount of data that has
+/// [`ObjectStore::put_multipart_opts`] depending on the amount of data that
has
/// been written.
///
/// Up to `capacity` bytes will be buffered in memory, and flushed on shutdown
/// using [`ObjectStore::put_opts`]. If `capacity` is exceeded, data will
instead be
-/// streamed using [`ObjectStore::put_multipart`]
+/// streamed using [`ObjectStore::put_multipart_opts`].
pub struct BufWriter {
capacity: usize,
max_concurrency: usize,
@@ -238,7 +238,7 @@ impl std::fmt::Debug for BufWriter {
enum BufWriterState {
/// Buffer up to capacity bytes
Buffer(Path, PutPayloadMut),
- /// [`ObjectStore::put_multipart`]
+ /// [`ObjectStore::put_multipart_opts`]
Prepare(BoxFuture<'static, crate::Result<WriteMultipart>>),
/// Write to a multipart upload
Write(Option<WriteMultipart>),
diff --git a/src/chunked.rs b/src/chunked.rs
index 6f5416c..1775d95 100644
--- a/src/chunked.rs
+++ b/src/chunked.rs
@@ -71,10 +71,6 @@ impl ObjectStore for ChunkedStore {
self.inner.put_opts(location, payload, opts).await
}
- async fn put_multipart(&self, location: &Path) -> Result<Box<dyn
MultipartUpload>> {
- self.inner.put_multipart(location).await
- }
-
async fn put_multipart_opts(
&self,
location: &Path,
diff --git a/src/gcp/mod.rs b/src/gcp/mod.rs
index f8a2e0f..befe6ae 100644
--- a/src/gcp/mod.rs
+++ b/src/gcp/mod.rs
@@ -20,7 +20,7 @@
//! ## Multipart uploads
//!
//! [Multipart
uploads](https://cloud.google.com/storage/docs/multipart-uploads)
-//! can be initiated with the [ObjectStore::put_multipart] method. If neither
+//! can be initiated with the [`ObjectStore::put_multipart_opts`] method. If
neither
//! [`MultipartUpload::complete`] nor [`MultipartUpload::abort`] is invoked,
you may
//! have parts uploaded to GCS but not used, that you will be charged for. It
is recommended
//! you configure a [lifecycle rule] to abort incomplete multipart uploads
after a certain
diff --git a/src/lib.rs b/src/lib.rs
index 2c672e7..55da5d5 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -273,11 +273,12 @@
//!
//! # Multipart Upload
//!
-//! Use the [`ObjectStore::put_multipart`] method to atomically write a large
amount of data
+//! Use the [`ObjectStoreExt::put_multipart`] /
[`ObjectStore::put_multipart_opts`] method to atomically write a large
+//! amount of data
//!
//! ```ignore-wasm32
//! # use object_store::local::LocalFileSystem;
-//! # use object_store::{ObjectStore, WriteMultipart};
+//! # use object_store::{ObjectStore, ObjectStoreExt, WriteMultipart};
//! # use std::sync::Arc;
//! # use bytes::Bytes;
//! # use tokio::io::AsyncWriteExt;
@@ -638,17 +639,6 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync +
Debug + 'static {
opts: PutOptions,
) -> Result<PutResult>;
- /// Perform a multipart upload
- ///
- /// Client should prefer [`ObjectStoreExt::put`] for small payloads, as
streaming uploads
- /// typically require multiple separate requests. See [`MultipartUpload`]
for more information
- ///
- /// For more advanced multipart uploads see
[`MultipartStore`](multipart::MultipartStore)
- async fn put_multipart(&self, location: &Path) -> Result<Box<dyn
MultipartUpload>> {
- self.put_multipart_opts(location, PutMultipartOptions::default())
- .await
- }
-
/// Perform a multipart upload with options
///
/// Client should prefer [`ObjectStore::put_opts`] for small payloads, as
streaming uploads
@@ -1117,10 +1107,6 @@ macro_rules! as_ref_impl {
self.as_ref().put_opts(location, payload, opts).await
}
- async fn put_multipart(&self, location: &Path) -> Result<Box<dyn
MultipartUpload>> {
- self.as_ref().put_multipart(location).await
- }
-
async fn put_multipart_opts(
&self,
location: &Path,
@@ -1202,7 +1188,7 @@ macro_rules! as_ref_impl {
as_ref_impl!(Arc<dyn ObjectStore>);
as_ref_impl!(Box<dyn ObjectStore>);
-/// Extension trait for [`ObjectStore`] with convinience functions.
+/// Extension trait for [`ObjectStore`] with convenience functions.
///
/// See "contract" section within the [`ObjectStore`] documentation for more
reasoning.
///
@@ -1215,6 +1201,17 @@ pub trait ObjectStoreExt: ObjectStore {
/// write the entirety of `payload` to `location`, or fail. No clients
/// should be able to observe a partially written object
fn put(&self, location: &Path, payload: PutPayload) -> impl Future<Output
= Result<PutResult>>;
+
+ /// Perform a multipart upload
+ ///
+ /// Client should prefer [`ObjectStoreExt::put`] for small payloads, as
streaming uploads
+ /// typically require multiple separate requests. See [`MultipartUpload`]
for more information
+ ///
+ /// For more advanced multipart uploads see
[`MultipartStore`](multipart::MultipartStore)
+ fn put_multipart(
+ &self,
+ location: &Path,
+ ) -> impl Future<Output = Result<Box<dyn MultipartUpload>>>;
}
impl<T> ObjectStoreExt for T
@@ -1225,6 +1222,11 @@ where
self.put_opts(location, payload, PutOptions::default())
.await
}
+
+ async fn put_multipart(&self, location: &Path) -> Result<Box<dyn
MultipartUpload>> {
+ self.put_multipart_opts(location, PutMultipartOptions::default())
+ .await
+ }
}
/// Result of a list call that includes objects, prefixes (directories) and a
diff --git a/src/limit.rs b/src/limit.rs
index 45764e8..dc5f7cc 100644
--- a/src/limit.rs
+++ b/src/limit.rs
@@ -80,13 +80,6 @@ impl<T: ObjectStore> ObjectStore for LimitStore<T> {
let _permit = self.semaphore.acquire().await.unwrap();
self.inner.put_opts(location, payload, opts).await
}
- async fn put_multipart(&self, location: &Path) -> Result<Box<dyn
MultipartUpload>> {
- let upload = self.inner.put_multipart(location).await?;
- Ok(Box::new(LimitUpload {
- semaphore: Arc::clone(&self.semaphore),
- upload,
- }))
- }
async fn put_multipart_opts(
&self,
diff --git a/src/local.rs b/src/local.rs
index 44ed654..0571133 100644
--- a/src/local.rs
+++ b/src/local.rs
@@ -1719,7 +1719,7 @@ mod not_wasm_tests {
use tempfile::TempDir;
use crate::local::LocalFileSystem;
- use crate::{ObjectStore, Path, PutPayload};
+ use crate::{ObjectStoreExt, Path, PutPayload};
#[tokio::test]
async fn test_cleanup_intermediate_files() {
diff --git a/src/multipart.rs b/src/multipart.rs
index d94e7f1..c084725 100644
--- a/src/multipart.rs
+++ b/src/multipart.rs
@@ -35,11 +35,11 @@ pub struct PartId {
/// A low-level interface for interacting with multipart upload APIs
///
-/// Most use-cases should prefer [`ObjectStore::put_multipart`] as this is
supported by more
+/// Most use-cases should prefer [`ObjectStore::put_multipart_opts`] as this
is supported by more
/// backends, including [`LocalFileSystem`], and automatically handles
uploading fixed
/// size parts of sufficient size in parallel
///
-/// [`ObjectStore::put_multipart`]: crate::ObjectStore::put_multipart
+/// [`ObjectStore::put_multipart_opts`]: crate::ObjectStore::put_multipart_opts
/// [`LocalFileSystem`]: crate::local::LocalFileSystem
#[async_trait]
pub trait MultipartStore: Send + Sync + 'static {
diff --git a/src/prefix.rs b/src/prefix.rs
index 559cd3b..cf4a72d 100644
--- a/src/prefix.rs
+++ b/src/prefix.rs
@@ -104,11 +104,6 @@ impl<T: ObjectStore> ObjectStore for PrefixStore<T> {
self.inner.put_opts(&full_path, payload, opts).await
}
- async fn put_multipart(&self, location: &Path) -> Result<Box<dyn
MultipartUpload>> {
- let full_path = self.full_path(location);
- self.inner.put_multipart(&full_path).await
- }
-
async fn put_multipart_opts(
&self,
location: &Path,
diff --git a/src/throttle.rs b/src/throttle.rs
index 18b37b8..f6a5389 100644
--- a/src/throttle.rs
+++ b/src/throttle.rs
@@ -158,14 +158,6 @@ impl<T: ObjectStore> ObjectStore for ThrottledStore<T> {
self.inner.put_opts(location, payload, opts).await
}
- async fn put_multipart(&self, location: &Path) -> Result<Box<dyn
MultipartUpload>> {
- let upload = self.inner.put_multipart(location).await?;
- Ok(Box::new(ThrottledUpload {
- upload,
- sleep: self.config().wait_put_per_call,
- }))
- }
-
async fn put_multipart_opts(
&self,
location: &Path,
diff --git a/src/upload.rs b/src/upload.rs
index a27d0dd..33698a4 100644
--- a/src/upload.rs
+++ b/src/upload.rs
@@ -248,7 +248,7 @@ mod tests {
use rand::prelude::StdRng;
use rand::{Rng, SeedableRng};
- use crate::ObjectStore;
+ use crate::ObjectStoreExt;
use crate::memory::InMemory;
use crate::path::Path;
use crate::throttle::{ThrottleConfig, ThrottledStore};