This is an automated email from the ASF dual-hosted git repository.

mneumann pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs-object-store.git


The following commit(s) were added to refs/heads/main by this push:
     new 179a087  refactor: introduce `ObjectStoreExt` trait (#405)
179a087 is described below

commit 179a0875ab071ed83b22809d2955348e7e2ccb9a
Author: Marco Neumann <[email protected]>
AuthorDate: Wed Nov 5 15:18:32 2025 +0100

    refactor: introduce `ObjectStoreExt` trait (#405)
    
    See #385.
    
    Co-authored-by: Andrew Lamb <[email protected]>
---
 src/aws/mod.rs     |  1 +
 src/azure/mod.rs   |  1 +
 src/buffered.rs    |  8 +++----
 src/chunked.rs     |  1 +
 src/gcp/mod.rs     |  1 +
 src/integration.rs |  2 +-
 src/lib.rs         | 68 +++++++++++++++++++++++++++++++++++++-----------------
 src/limit.rs       |  5 ----
 src/local.rs       |  2 +-
 src/memory.rs      |  2 +-
 src/prefix.rs      |  7 +-----
 src/throttle.rs    |  8 ++-----
 12 files changed, 61 insertions(+), 45 deletions(-)

diff --git a/src/aws/mod.rs b/src/aws/mod.rs
index 586d40b..c4e35eb 100644
--- a/src/aws/mod.rs
+++ b/src/aws/mod.rs
@@ -498,6 +498,7 @@ impl PaginatedListStore for AmazonS3 {
 mod tests {
     use super::*;
     use crate::ClientOptions;
+    use crate::ObjectStoreExt;
     use crate::client::SpawnedReqwestConnector;
     use crate::client::get::GetClient;
     use crate::client::retry::RetryContext;
diff --git a/src/azure/mod.rs b/src/azure/mod.rs
index 237ef25..2bbee67 100644
--- a/src/azure/mod.rs
+++ b/src/azure/mod.rs
@@ -311,6 +311,7 @@ impl PaginatedListStore for MicrosoftAzure {
 #[cfg(test)]
 mod tests {
     use super::*;
+    use crate::ObjectStoreExt;
     use crate::integration::*;
     use crate::tests::*;
     use bytes::Bytes;
diff --git a/src/buffered.rs b/src/buffered.rs
index 9ec3285..fc235f7 100644
--- a/src/buffered.rs
+++ b/src/buffered.rs
@@ -210,12 +210,12 @@ impl AsyncBufRead for BufReader {
 
 /// An async buffered writer compatible with the tokio IO traits
 ///
-/// This writer adaptively uses [`ObjectStore::put`] or
+/// This writer adaptively uses [`ObjectStore::put_opts`] or
 /// [`ObjectStore::put_multipart`] depending on the amount of data that has
 /// been written.
 ///
 /// Up to `capacity` bytes will be buffered in memory, and flushed on shutdown
-/// using [`ObjectStore::put`]. If `capacity` is exceeded, data will instead be
+/// using [`ObjectStore::put_opts`]. If `capacity` is exceeded, data will 
instead be
 /// streamed using [`ObjectStore::put_multipart`]
 pub struct BufWriter {
     capacity: usize,
@@ -242,7 +242,7 @@ enum BufWriterState {
     Prepare(BoxFuture<'static, crate::Result<WriteMultipart>>),
     /// Write to a multipart upload
     Write(Option<WriteMultipart>),
-    /// [`ObjectStore::put`]
+    /// [`ObjectStore::put_opts`]
     Flush(BoxFuture<'static, crate::Result<()>>),
 }
 
@@ -489,7 +489,7 @@ mod tests {
     use super::*;
     use crate::memory::InMemory;
     use crate::path::Path;
-    use crate::{Attribute, GetOptions};
+    use crate::{Attribute, GetOptions, ObjectStoreExt};
     use itertools::Itertools;
     use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncSeekExt, 
AsyncWriteExt};
 
diff --git a/src/chunked.rs b/src/chunked.rs
index eacb667..6f5416c 100644
--- a/src/chunked.rs
+++ b/src/chunked.rs
@@ -186,6 +186,7 @@ impl ObjectStore for ChunkedStore {
 mod tests {
     use futures::StreamExt;
 
+    use crate::ObjectStoreExt;
     #[cfg(feature = "fs")]
     use crate::integration::*;
     #[cfg(feature = "fs")]
diff --git a/src/gcp/mod.rs b/src/gcp/mod.rs
index 0b18108..f8a2e0f 100644
--- a/src/gcp/mod.rs
+++ b/src/gcp/mod.rs
@@ -302,6 +302,7 @@ impl PaginatedListStore for GoogleCloudStorage {
 mod test {
     use credential::DEFAULT_GCS_BASE_URL;
 
+    use crate::ObjectStoreExt;
     use crate::integration::*;
     use crate::tests::*;
 
diff --git a/src/integration.rs b/src/integration.rs
index e4e73e9..bc79f69 100644
--- a/src/integration.rs
+++ b/src/integration.rs
@@ -29,7 +29,7 @@ use crate::multipart::MultipartStore;
 use crate::path::Path;
 use crate::{
     Attribute, Attributes, DynObjectStore, Error, GetOptions, GetRange, 
MultipartUpload,
-    ObjectStore, PutMode, PutPayload, UpdateVersion, WriteMultipart,
+    ObjectStore, ObjectStoreExt, PutMode, PutPayload, UpdateVersion, 
WriteMultipart,
 };
 use bytes::Bytes;
 use futures::stream::FuturesUnordered;
diff --git a/src/lib.rs b/src/lib.rs
index 2e059fc..2c672e7 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -252,11 +252,11 @@
 //!
 //! # Put Object
 //!
-//! Use the [`ObjectStore::put`] method to atomically write data.
+//! Use the [`ObjectStoreExt::put`] method to atomically write data.
 //!
 //! ```ignore-wasm32
 //! # use object_store::local::LocalFileSystem;
-//! # use object_store::{ObjectStore, PutPayload};
+//! # use object_store::{ObjectStore, ObjectStoreExt, PutPayload};
 //! # use std::sync::Arc;
 //! # use object_store::path::Path;
 //! # fn get_object_store() -> Arc<dyn ObjectStore> {
@@ -364,7 +364,7 @@
 //!
 //! ```ignore-wasm32
 //! # use object_store::local::LocalFileSystem;
-//! # use object_store::{ObjectStore, PutPayloadMut};
+//! # use object_store::{ObjectStore, ObjectStoreExt, PutPayloadMut};
 //! # use std::sync::Arc;
 //! # use bytes::Bytes;
 //! # use tokio::io::AsyncWriteExt;
@@ -613,19 +613,24 @@ pub type DynObjectStore = dyn ObjectStore;
 pub type MultipartId = String;
 
 /// Universal API to multiple object store services.
+///
+/// For more convience methods, check [`ObjectStoreExt`].
+///
+/// # Contract
+/// This trait is meant as a contract between object store implementations
+/// (e.g. providers, wrappers) and the `object_store` crate itself and is
+/// intended to be the minimum API required for an object store.
+///
+/// The [`ObjectStoreExt`] acts as an API/contract between `object_store`
+/// and the store users and provides additional methods that may be simpler to 
use but overlap
+/// in functionality with `ObjectStore`
 #[async_trait]
 pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static {
-    /// Save the provided bytes to the specified location
+    /// Save the provided `payload` to `location` with the given options
     ///
     /// The operation is guaranteed to be atomic, it will either successfully
     /// write the entirety of `payload` to `location`, or fail. No clients
     /// should be able to observe a partially written object
-    async fn put(&self, location: &Path, payload: PutPayload) -> 
Result<PutResult> {
-        self.put_opts(location, payload, PutOptions::default())
-            .await
-    }
-
-    /// Save the provided `payload` to `location` with the given options
     async fn put_opts(
         &self,
         location: &Path,
@@ -635,7 +640,7 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + 
Debug + 'static {
 
     /// Perform a multipart upload
     ///
-    /// Client should prefer [`ObjectStore::put`] for small payloads, as 
streaming uploads
+    /// Client should prefer [`ObjectStoreExt::put`] for small payloads, as 
streaming uploads
     /// typically require multiple separate requests. See [`MultipartUpload`] 
for more information
     ///
     /// For more advanced multipart uploads see 
[`MultipartStore`](multipart::MultipartStore)
@@ -646,7 +651,7 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + 
Debug + 'static {
 
     /// Perform a multipart upload with options
     ///
-    /// Client should prefer [`ObjectStore::put`] for small payloads, as 
streaming uploads
+    /// Client should prefer [`ObjectStore::put_opts`] for small payloads, as 
streaming uploads
     /// typically require multiple separate requests. See [`MultipartUpload`] 
for more information
     ///
     /// For more advanced multipart uploads see 
[`MultipartStore`](multipart::MultipartStore)
@@ -665,7 +670,7 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + 
Debug + 'static {
     /// ```ignore-wasm32
     /// # use object_store::local::LocalFileSystem;
     /// # use tempfile::tempdir;
-    /// # use object_store::{path::Path, ObjectStore};
+    /// # use object_store::{path::Path, ObjectStore, ObjectStoreExt};
     /// async fn get_example() {
     ///     let tmp = tempdir().unwrap();
     ///     let store = LocalFileSystem::new_with_prefix(tmp.path()).unwrap();
@@ -699,7 +704,7 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + 
Debug + 'static {
     /// ```ignore-wasm32
     /// # use object_store::local::LocalFileSystem;
     /// # use tempfile::tempdir;
-    /// # use object_store::{path::Path, ObjectStore, GetOptions};
+    /// # use object_store::{path::Path, ObjectStore, ObjectStoreExt, 
GetOptions};
     /// async fn get_opts_example() {
     ///     let tmp = tempdir().unwrap();
     ///     let store = LocalFileSystem::new_with_prefix(tmp.path()).unwrap();
@@ -756,7 +761,7 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + 
Debug + 'static {
     /// ```ignore-wasm32
     /// # use object_store::local::LocalFileSystem;
     /// # use tempfile::tempdir;
-    /// # use object_store::{path::Path, ObjectStore, GetOptions};
+    /// # use object_store::{path::Path, ObjectStore, ObjectStoreExt, 
GetOptions};
     /// async fn get_opts_range_example() {
     ///     let tmp = tempdir().unwrap();
     ///     let store = LocalFileSystem::new_with_prefix(tmp.path()).unwrap();
@@ -824,7 +829,7 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + 
Debug + 'static {
     /// ```ignore-wasm32
     /// # use object_store::local::LocalFileSystem;
     /// # use tempfile::tempdir;
-    /// # use object_store::{path::Path, ObjectStore};
+    /// # use object_store::{path::Path, ObjectStore, ObjectStoreExt};
     /// async fn get_range_example() {
     ///     let tmp = tempdir().unwrap();
     ///     let store = LocalFileSystem::new_with_prefix(tmp.path()).unwrap();
@@ -892,7 +897,7 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + 
Debug + 'static {
     /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
     /// # let root = tempfile::TempDir::new().unwrap();
     /// # let store = LocalFileSystem::new_with_prefix(root.path()).unwrap();
-    /// # use object_store::{ObjectStore, ObjectMeta};
+    /// # use object_store::{ObjectStore, ObjectStoreExt, ObjectMeta};
     /// # use object_store::path::Path;
     /// # use futures::{StreamExt, TryStreamExt};
     /// #
@@ -1103,10 +1108,6 @@ macro_rules! as_ref_impl {
     ($type:ty) => {
         #[async_trait]
         impl ObjectStore for $type {
-            async fn put(&self, location: &Path, payload: PutPayload) -> 
Result<PutResult> {
-                self.as_ref().put(location, payload).await
-            }
-
             async fn put_opts(
                 &self,
                 location: &Path,
@@ -1201,6 +1202,31 @@ macro_rules! as_ref_impl {
 as_ref_impl!(Arc<dyn ObjectStore>);
 as_ref_impl!(Box<dyn ObjectStore>);
 
+/// Extension trait for [`ObjectStore`] with convinience functions.
+///
+/// See "contract" section within the [`ObjectStore`] documentation for more 
reasoning.
+///
+/// # Implementation
+/// You MUST NOT implement this trait yourself. It is automatically 
implemented for all [`ObjectStore`] implementations.
+pub trait ObjectStoreExt: ObjectStore {
+    /// Save the provided bytes to the specified location
+    ///
+    /// The operation is guaranteed to be atomic, it will either successfully
+    /// write the entirety of `payload` to `location`, or fail. No clients
+    /// should be able to observe a partially written object
+    fn put(&self, location: &Path, payload: PutPayload) -> impl Future<Output 
= Result<PutResult>>;
+}
+
+impl<T> ObjectStoreExt for T
+where
+    T: ObjectStore + ?Sized,
+{
+    async fn put(&self, location: &Path, payload: PutPayload) -> 
Result<PutResult> {
+        self.put_opts(location, payload, PutOptions::default())
+            .await
+    }
+}
+
 /// Result of a list call that includes objects, prefixes (directories) and a
 /// token for the next set of results. Individual result sets may be limited to
 /// 1,000 objects based on the underlying object storage's limitations.
diff --git a/src/limit.rs b/src/limit.rs
index 7102f06..45764e8 100644
--- a/src/limit.rs
+++ b/src/limit.rs
@@ -71,11 +71,6 @@ impl<T: ObjectStore> std::fmt::Display for LimitStore<T> {
 
 #[async_trait]
 impl<T: ObjectStore> ObjectStore for LimitStore<T> {
-    async fn put(&self, location: &Path, payload: PutPayload) -> 
Result<PutResult> {
-        let _permit = self.semaphore.acquire().await.unwrap();
-        self.inner.put(location, payload).await
-    }
-
     async fn put_opts(
         &self,
         location: &Path,
diff --git a/src/local.rs b/src/local.rs
index 31db111..44ed654 100644
--- a/src/local.rs
+++ b/src/local.rs
@@ -1124,7 +1124,7 @@ mod tests {
     #[cfg(target_family = "unix")]
     use tempfile::NamedTempFile;
 
-    use crate::integration::*;
+    use crate::{ObjectStoreExt, integration::*};
 
     use super::*;
 
diff --git a/src/memory.rs b/src/memory.rs
index 55d75fd..de9113e 100644
--- a/src/memory.rs
+++ b/src/memory.rs
@@ -550,7 +550,7 @@ impl MultipartUpload for InMemoryUpload {
 
 #[cfg(test)]
 mod tests {
-    use crate::integration::*;
+    use crate::{ObjectStoreExt, integration::*};
 
     use super::*;
 
diff --git a/src/prefix.rs b/src/prefix.rs
index a455826..559cd3b 100644
--- a/src/prefix.rs
+++ b/src/prefix.rs
@@ -94,11 +94,6 @@ fn strip_meta(prefix: &Path, meta: ObjectMeta) -> ObjectMeta 
{
 
 #[async_trait::async_trait]
 impl<T: ObjectStore> ObjectStore for PrefixStore<T> {
-    async fn put(&self, location: &Path, payload: PutPayload) -> 
Result<PutResult> {
-        let full_path = self.full_path(location);
-        self.inner.put(&full_path, payload).await
-    }
-
     async fn put_opts(
         &self,
         location: &Path,
@@ -238,8 +233,8 @@ mod tests {
     use std::slice;
 
     use super::*;
-    use crate::integration::*;
     use crate::local::LocalFileSystem;
+    use crate::{ObjectStoreExt, integration::*};
 
     use tempfile::TempDir;
 
diff --git a/src/throttle.rs b/src/throttle.rs
index 9419ae8..18b37b8 100644
--- a/src/throttle.rs
+++ b/src/throttle.rs
@@ -90,7 +90,7 @@ pub struct ThrottleConfig {
     /// 
[`wait_list_with_delimiter_per_call`](Self::wait_list_with_delimiter_per_call).
     pub wait_list_with_delimiter_per_entry: Duration,
 
-    /// Sleep duration for every call to [`put`](ThrottledStore::put).
+    /// Sleep duration for every call to 
[`put_opts`](ThrottledStore::put_opts).
     ///
     /// Sleeping is done before the underlying store is called and 
independently of the success of
     /// the operation.
@@ -148,11 +148,6 @@ impl<T: ObjectStore> std::fmt::Display for 
ThrottledStore<T> {
 
 #[async_trait]
 impl<T: ObjectStore> ObjectStore for ThrottledStore<T> {
-    async fn put(&self, location: &Path, payload: PutPayload) -> 
Result<PutResult> {
-        sleep(self.config().wait_put_per_call).await;
-        self.inner.put(location, payload).await
-    }
-
     async fn put_opts(
         &self,
         location: &Path,
@@ -419,6 +414,7 @@ mod tests {
     use super::*;
     #[cfg(target_os = "linux")]
     use crate::GetResultPayload;
+    use crate::ObjectStoreExt;
     use crate::{integration::*, memory::InMemory};
     use futures::TryStreamExt;
     use tokio::time::Duration;

Reply via email to