This is an automated email from the ASF dual-hosted git repository.
mneumann pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs-object-store.git
The following commit(s) were added to refs/heads/main by this push:
new 179a087 refactor: introduce `ObjectStoreExt` trait (#405)
179a087 is described below
commit 179a0875ab071ed83b22809d2955348e7e2ccb9a
Author: Marco Neumann <[email protected]>
AuthorDate: Wed Nov 5 15:18:32 2025 +0100
refactor: introduce `ObjectStoreExt` trait (#405)
See #385.
Co-authored-by: Andrew Lamb <[email protected]>
---
src/aws/mod.rs | 1 +
src/azure/mod.rs | 1 +
src/buffered.rs | 8 +++----
src/chunked.rs | 1 +
src/gcp/mod.rs | 1 +
src/integration.rs | 2 +-
src/lib.rs | 68 +++++++++++++++++++++++++++++++++++++-----------------
src/limit.rs | 5 ----
src/local.rs | 2 +-
src/memory.rs | 2 +-
src/prefix.rs | 7 +-----
src/throttle.rs | 8 ++-----
12 files changed, 61 insertions(+), 45 deletions(-)
diff --git a/src/aws/mod.rs b/src/aws/mod.rs
index 586d40b..c4e35eb 100644
--- a/src/aws/mod.rs
+++ b/src/aws/mod.rs
@@ -498,6 +498,7 @@ impl PaginatedListStore for AmazonS3 {
mod tests {
use super::*;
use crate::ClientOptions;
+ use crate::ObjectStoreExt;
use crate::client::SpawnedReqwestConnector;
use crate::client::get::GetClient;
use crate::client::retry::RetryContext;
diff --git a/src/azure/mod.rs b/src/azure/mod.rs
index 237ef25..2bbee67 100644
--- a/src/azure/mod.rs
+++ b/src/azure/mod.rs
@@ -311,6 +311,7 @@ impl PaginatedListStore for MicrosoftAzure {
#[cfg(test)]
mod tests {
use super::*;
+ use crate::ObjectStoreExt;
use crate::integration::*;
use crate::tests::*;
use bytes::Bytes;
diff --git a/src/buffered.rs b/src/buffered.rs
index 9ec3285..fc235f7 100644
--- a/src/buffered.rs
+++ b/src/buffered.rs
@@ -210,12 +210,12 @@ impl AsyncBufRead for BufReader {
/// An async buffered writer compatible with the tokio IO traits
///
-/// This writer adaptively uses [`ObjectStore::put`] or
+/// This writer adaptively uses [`ObjectStore::put_opts`] or
/// [`ObjectStore::put_multipart`] depending on the amount of data that has
/// been written.
///
/// Up to `capacity` bytes will be buffered in memory, and flushed on shutdown
-/// using [`ObjectStore::put`]. If `capacity` is exceeded, data will instead be
+/// using [`ObjectStore::put_opts`]. If `capacity` is exceeded, data will
instead be
/// streamed using [`ObjectStore::put_multipart`]
pub struct BufWriter {
capacity: usize,
@@ -242,7 +242,7 @@ enum BufWriterState {
Prepare(BoxFuture<'static, crate::Result<WriteMultipart>>),
/// Write to a multipart upload
Write(Option<WriteMultipart>),
- /// [`ObjectStore::put`]
+ /// [`ObjectStore::put_opts`]
Flush(BoxFuture<'static, crate::Result<()>>),
}
@@ -489,7 +489,7 @@ mod tests {
use super::*;
use crate::memory::InMemory;
use crate::path::Path;
- use crate::{Attribute, GetOptions};
+ use crate::{Attribute, GetOptions, ObjectStoreExt};
use itertools::Itertools;
use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncSeekExt,
AsyncWriteExt};
diff --git a/src/chunked.rs b/src/chunked.rs
index eacb667..6f5416c 100644
--- a/src/chunked.rs
+++ b/src/chunked.rs
@@ -186,6 +186,7 @@ impl ObjectStore for ChunkedStore {
mod tests {
use futures::StreamExt;
+ use crate::ObjectStoreExt;
#[cfg(feature = "fs")]
use crate::integration::*;
#[cfg(feature = "fs")]
diff --git a/src/gcp/mod.rs b/src/gcp/mod.rs
index 0b18108..f8a2e0f 100644
--- a/src/gcp/mod.rs
+++ b/src/gcp/mod.rs
@@ -302,6 +302,7 @@ impl PaginatedListStore for GoogleCloudStorage {
mod test {
use credential::DEFAULT_GCS_BASE_URL;
+ use crate::ObjectStoreExt;
use crate::integration::*;
use crate::tests::*;
diff --git a/src/integration.rs b/src/integration.rs
index e4e73e9..bc79f69 100644
--- a/src/integration.rs
+++ b/src/integration.rs
@@ -29,7 +29,7 @@ use crate::multipart::MultipartStore;
use crate::path::Path;
use crate::{
Attribute, Attributes, DynObjectStore, Error, GetOptions, GetRange,
MultipartUpload,
- ObjectStore, PutMode, PutPayload, UpdateVersion, WriteMultipart,
+ ObjectStore, ObjectStoreExt, PutMode, PutPayload, UpdateVersion,
WriteMultipart,
};
use bytes::Bytes;
use futures::stream::FuturesUnordered;
diff --git a/src/lib.rs b/src/lib.rs
index 2e059fc..2c672e7 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -252,11 +252,11 @@
//!
//! # Put Object
//!
-//! Use the [`ObjectStore::put`] method to atomically write data.
+//! Use the [`ObjectStoreExt::put`] method to atomically write data.
//!
//! ```ignore-wasm32
//! # use object_store::local::LocalFileSystem;
-//! # use object_store::{ObjectStore, PutPayload};
+//! # use object_store::{ObjectStore, ObjectStoreExt, PutPayload};
//! # use std::sync::Arc;
//! # use object_store::path::Path;
//! # fn get_object_store() -> Arc<dyn ObjectStore> {
@@ -364,7 +364,7 @@
//!
//! ```ignore-wasm32
//! # use object_store::local::LocalFileSystem;
-//! # use object_store::{ObjectStore, PutPayloadMut};
+//! # use object_store::{ObjectStore, ObjectStoreExt, PutPayloadMut};
//! # use std::sync::Arc;
//! # use bytes::Bytes;
//! # use tokio::io::AsyncWriteExt;
@@ -613,19 +613,24 @@ pub type DynObjectStore = dyn ObjectStore;
pub type MultipartId = String;
/// Universal API to multiple object store services.
+///
+/// For more convience methods, check [`ObjectStoreExt`].
+///
+/// # Contract
+/// This trait is meant as a contract between object store implementations
+/// (e.g. providers, wrappers) and the `object_store` crate itself and is
+/// intended to be the minimum API required for an object store.
+///
+/// The [`ObjectStoreExt`] acts as an API/contract between `object_store`
+/// and the store users and provides additional methods that may be simpler to
use but overlap
+/// in functionality with `ObjectStore`
#[async_trait]
pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static {
- /// Save the provided bytes to the specified location
+ /// Save the provided `payload` to `location` with the given options
///
/// The operation is guaranteed to be atomic, it will either successfully
/// write the entirety of `payload` to `location`, or fail. No clients
/// should be able to observe a partially written object
- async fn put(&self, location: &Path, payload: PutPayload) ->
Result<PutResult> {
- self.put_opts(location, payload, PutOptions::default())
- .await
- }
-
- /// Save the provided `payload` to `location` with the given options
async fn put_opts(
&self,
location: &Path,
@@ -635,7 +640,7 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync +
Debug + 'static {
/// Perform a multipart upload
///
- /// Client should prefer [`ObjectStore::put`] for small payloads, as
streaming uploads
+ /// Client should prefer [`ObjectStoreExt::put`] for small payloads, as
streaming uploads
/// typically require multiple separate requests. See [`MultipartUpload`]
for more information
///
/// For more advanced multipart uploads see
[`MultipartStore`](multipart::MultipartStore)
@@ -646,7 +651,7 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync +
Debug + 'static {
/// Perform a multipart upload with options
///
- /// Client should prefer [`ObjectStore::put`] for small payloads, as
streaming uploads
+ /// Client should prefer [`ObjectStore::put_opts`] for small payloads, as
streaming uploads
/// typically require multiple separate requests. See [`MultipartUpload`]
for more information
///
/// For more advanced multipart uploads see
[`MultipartStore`](multipart::MultipartStore)
@@ -665,7 +670,7 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync +
Debug + 'static {
/// ```ignore-wasm32
/// # use object_store::local::LocalFileSystem;
/// # use tempfile::tempdir;
- /// # use object_store::{path::Path, ObjectStore};
+ /// # use object_store::{path::Path, ObjectStore, ObjectStoreExt};
/// async fn get_example() {
/// let tmp = tempdir().unwrap();
/// let store = LocalFileSystem::new_with_prefix(tmp.path()).unwrap();
@@ -699,7 +704,7 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync +
Debug + 'static {
/// ```ignore-wasm32
/// # use object_store::local::LocalFileSystem;
/// # use tempfile::tempdir;
- /// # use object_store::{path::Path, ObjectStore, GetOptions};
+ /// # use object_store::{path::Path, ObjectStore, ObjectStoreExt,
GetOptions};
/// async fn get_opts_example() {
/// let tmp = tempdir().unwrap();
/// let store = LocalFileSystem::new_with_prefix(tmp.path()).unwrap();
@@ -756,7 +761,7 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync +
Debug + 'static {
/// ```ignore-wasm32
/// # use object_store::local::LocalFileSystem;
/// # use tempfile::tempdir;
- /// # use object_store::{path::Path, ObjectStore, GetOptions};
+ /// # use object_store::{path::Path, ObjectStore, ObjectStoreExt,
GetOptions};
/// async fn get_opts_range_example() {
/// let tmp = tempdir().unwrap();
/// let store = LocalFileSystem::new_with_prefix(tmp.path()).unwrap();
@@ -824,7 +829,7 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync +
Debug + 'static {
/// ```ignore-wasm32
/// # use object_store::local::LocalFileSystem;
/// # use tempfile::tempdir;
- /// # use object_store::{path::Path, ObjectStore};
+ /// # use object_store::{path::Path, ObjectStore, ObjectStoreExt};
/// async fn get_range_example() {
/// let tmp = tempdir().unwrap();
/// let store = LocalFileSystem::new_with_prefix(tmp.path()).unwrap();
@@ -892,7 +897,7 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync +
Debug + 'static {
/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
/// # let root = tempfile::TempDir::new().unwrap();
/// # let store = LocalFileSystem::new_with_prefix(root.path()).unwrap();
- /// # use object_store::{ObjectStore, ObjectMeta};
+ /// # use object_store::{ObjectStore, ObjectStoreExt, ObjectMeta};
/// # use object_store::path::Path;
/// # use futures::{StreamExt, TryStreamExt};
/// #
@@ -1103,10 +1108,6 @@ macro_rules! as_ref_impl {
($type:ty) => {
#[async_trait]
impl ObjectStore for $type {
- async fn put(&self, location: &Path, payload: PutPayload) ->
Result<PutResult> {
- self.as_ref().put(location, payload).await
- }
-
async fn put_opts(
&self,
location: &Path,
@@ -1201,6 +1202,31 @@ macro_rules! as_ref_impl {
as_ref_impl!(Arc<dyn ObjectStore>);
as_ref_impl!(Box<dyn ObjectStore>);
+/// Extension trait for [`ObjectStore`] with convinience functions.
+///
+/// See "contract" section within the [`ObjectStore`] documentation for more
reasoning.
+///
+/// # Implementation
+/// You MUST NOT implement this trait yourself. It is automatically
implemented for all [`ObjectStore`] implementations.
+pub trait ObjectStoreExt: ObjectStore {
+ /// Save the provided bytes to the specified location
+ ///
+ /// The operation is guaranteed to be atomic, it will either successfully
+ /// write the entirety of `payload` to `location`, or fail. No clients
+ /// should be able to observe a partially written object
+ fn put(&self, location: &Path, payload: PutPayload) -> impl Future<Output
= Result<PutResult>>;
+}
+
+impl<T> ObjectStoreExt for T
+where
+ T: ObjectStore + ?Sized,
+{
+ async fn put(&self, location: &Path, payload: PutPayload) ->
Result<PutResult> {
+ self.put_opts(location, payload, PutOptions::default())
+ .await
+ }
+}
+
/// Result of a list call that includes objects, prefixes (directories) and a
/// token for the next set of results. Individual result sets may be limited to
/// 1,000 objects based on the underlying object storage's limitations.
diff --git a/src/limit.rs b/src/limit.rs
index 7102f06..45764e8 100644
--- a/src/limit.rs
+++ b/src/limit.rs
@@ -71,11 +71,6 @@ impl<T: ObjectStore> std::fmt::Display for LimitStore<T> {
#[async_trait]
impl<T: ObjectStore> ObjectStore for LimitStore<T> {
- async fn put(&self, location: &Path, payload: PutPayload) ->
Result<PutResult> {
- let _permit = self.semaphore.acquire().await.unwrap();
- self.inner.put(location, payload).await
- }
-
async fn put_opts(
&self,
location: &Path,
diff --git a/src/local.rs b/src/local.rs
index 31db111..44ed654 100644
--- a/src/local.rs
+++ b/src/local.rs
@@ -1124,7 +1124,7 @@ mod tests {
#[cfg(target_family = "unix")]
use tempfile::NamedTempFile;
- use crate::integration::*;
+ use crate::{ObjectStoreExt, integration::*};
use super::*;
diff --git a/src/memory.rs b/src/memory.rs
index 55d75fd..de9113e 100644
--- a/src/memory.rs
+++ b/src/memory.rs
@@ -550,7 +550,7 @@ impl MultipartUpload for InMemoryUpload {
#[cfg(test)]
mod tests {
- use crate::integration::*;
+ use crate::{ObjectStoreExt, integration::*};
use super::*;
diff --git a/src/prefix.rs b/src/prefix.rs
index a455826..559cd3b 100644
--- a/src/prefix.rs
+++ b/src/prefix.rs
@@ -94,11 +94,6 @@ fn strip_meta(prefix: &Path, meta: ObjectMeta) -> ObjectMeta
{
#[async_trait::async_trait]
impl<T: ObjectStore> ObjectStore for PrefixStore<T> {
- async fn put(&self, location: &Path, payload: PutPayload) ->
Result<PutResult> {
- let full_path = self.full_path(location);
- self.inner.put(&full_path, payload).await
- }
-
async fn put_opts(
&self,
location: &Path,
@@ -238,8 +233,8 @@ mod tests {
use std::slice;
use super::*;
- use crate::integration::*;
use crate::local::LocalFileSystem;
+ use crate::{ObjectStoreExt, integration::*};
use tempfile::TempDir;
diff --git a/src/throttle.rs b/src/throttle.rs
index 9419ae8..18b37b8 100644
--- a/src/throttle.rs
+++ b/src/throttle.rs
@@ -90,7 +90,7 @@ pub struct ThrottleConfig {
///
[`wait_list_with_delimiter_per_call`](Self::wait_list_with_delimiter_per_call).
pub wait_list_with_delimiter_per_entry: Duration,
- /// Sleep duration for every call to [`put`](ThrottledStore::put).
+ /// Sleep duration for every call to
[`put_opts`](ThrottledStore::put_opts).
///
/// Sleeping is done before the underlying store is called and
independently of the success of
/// the operation.
@@ -148,11 +148,6 @@ impl<T: ObjectStore> std::fmt::Display for
ThrottledStore<T> {
#[async_trait]
impl<T: ObjectStore> ObjectStore for ThrottledStore<T> {
- async fn put(&self, location: &Path, payload: PutPayload) ->
Result<PutResult> {
- sleep(self.config().wait_put_per_call).await;
- self.inner.put(location, payload).await
- }
-
async fn put_opts(
&self,
location: &Path,
@@ -419,6 +414,7 @@ mod tests {
use super::*;
#[cfg(target_os = "linux")]
use crate::GetResultPayload;
+ use crate::ObjectStoreExt;
use crate::{integration::*, memory::InMemory};
use futures::TryStreamExt;
use tokio::time::Duration;