This is an automated email from the ASF dual-hosted git repository.
mneumann pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs-object-store.git
The following commit(s) were added to refs/heads/main by this push:
new fa48f9a refactor!: move `get_range` to `ObjectStoreExt` (#536)
fa48f9a is described below
commit fa48f9af5aaa434a34db69e49235e6f94d3bc5c8
Author: Marco Neumann <[email protected]>
AuthorDate: Mon Nov 10 12:42:56 2025 +0100
refactor!: move `get_range` to `ObjectStoreExt` (#536)
---
src/buffered.rs | 8 +++---
src/chunked.rs | 5 +---
src/lib.rs | 86 ++++++++++++++++++++++++++++-----------------------------
src/limit.rs | 5 ----
src/local.rs | 9 ------
src/prefix.rs | 5 ----
src/throttle.rs | 11 --------
7 files changed, 47 insertions(+), 82 deletions(-)
diff --git a/src/buffered.rs b/src/buffered.rs
index 4502082..d22ecda 100644
--- a/src/buffered.rs
+++ b/src/buffered.rs
@@ -19,8 +19,8 @@
use crate::path::Path;
use crate::{
- Attributes, Extensions, ObjectMeta, ObjectStore, PutMultipartOptions,
PutOptions,
- PutPayloadMut, TagSet, WriteMultipart,
+ Attributes, Extensions, ObjectMeta, ObjectStore, ObjectStoreExt,
PutMultipartOptions,
+ PutOptions, PutPayloadMut, TagSet, WriteMultipart,
};
use bytes::Bytes;
use futures::future::{BoxFuture, FutureExt};
@@ -37,7 +37,7 @@ pub const DEFAULT_BUFFER_SIZE: usize = 1024 * 1024;
/// An async-buffered reader compatible with the tokio IO traits
///
-/// Internally this maintains a buffer of the requested size, and uses
[`ObjectStore::get_range`]
+/// Internally this maintains a buffer of the requested size, and uses
[`ObjectStoreExt::get_range`]
/// to populate its internal buffer once depleted. This buffer is cleared on
seek.
///
/// Whilst simple, this interface will typically be outperformed by the native
[`ObjectStore`]
@@ -46,7 +46,7 @@ pub const DEFAULT_BUFFER_SIZE: usize = 1024 * 1024;
/// round-trips is critical to throughput.
///
/// Systems looking to sequentially scan a file should instead consider using
[`ObjectStoreExt::get`],
-/// or [`ObjectStore::get_opts`], or [`ObjectStore::get_range`] to read a
particular range.
+/// or [`ObjectStore::get_opts`], or [`ObjectStoreExt::get_range`] to read a
particular range.
///
/// Systems looking to read multiple ranges of a file should instead consider
using
/// [`ObjectStore::get_ranges`], which will optimise the vectored IO.
diff --git a/src/chunked.rs b/src/chunked.rs
index cbd66f6..4db6071 100644
--- a/src/chunked.rs
+++ b/src/chunked.rs
@@ -135,13 +135,10 @@ impl ObjectStore for ChunkedStore {
})
}
- async fn get_range(&self, location: &Path, range: Range<u64>) ->
Result<Bytes> {
- self.inner.get_range(location, range).await
- }
-
async fn get_ranges(&self, location: &Path, ranges: &[Range<u64>]) ->
Result<Vec<Bytes>> {
self.inner.get_ranges(location, ranges).await
}
+
async fn head(&self, location: &Path) -> Result<ObjectMeta> {
self.inner.head(location).await
}
diff --git a/src/lib.rs b/src/lib.rs
index c4c39dc..dca9bcd 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -823,46 +823,6 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync +
Debug + 'static {
/// ```
async fn get_opts(&self, location: &Path, options: GetOptions) ->
Result<GetResult>;
- /// Return the bytes that are stored at the specified location
- /// in the given byte range.
- ///
- /// See [`GetRange::Bounded`] for more details on how `range` gets
interpreted.
- ///
- /// To retrieve a range of bytes from a versioned object, use
[`ObjectStore::get_opts`] by specifying the range in the [`GetOptions`].
- ///
- /// ## Examples
- ///
- /// This example uses a basic local filesystem object store to get a byte
range from an object.
- ///
- /// ```ignore-wasm32
- /// # use object_store::local::LocalFileSystem;
- /// # use tempfile::tempdir;
- /// # use object_store::{path::Path, ObjectStore, ObjectStoreExt};
- /// async fn get_range_example() {
- /// let tmp = tempdir().unwrap();
- /// let store = LocalFileSystem::new_with_prefix(tmp.path()).unwrap();
- /// let location = Path::from("example.txt");
- /// let content = b"Hello, Object Store!";
- ///
- /// // Put the object into the store
- /// store
- /// .put(&location, content.as_ref().into())
- /// .await
- /// .expect("Failed to put object");
- ///
- /// // Get the object from the store
- /// let bytes = store
- /// .get_range(&location, 0..5)
- /// .await
- /// .expect("Failed to get object");
- /// println!("Retrieved range [0-5]: {}",
String::from_utf8_lossy(&bytes));
- /// }
- /// ```
- async fn get_range(&self, location: &Path, range: Range<u64>) ->
Result<Bytes> {
- let options = GetOptions::new().with_range(Some(range));
- self.get_opts(location, options).await?.bytes().await
- }
-
/// Return the bytes that are stored at the specified location
/// in the given byte ranges
async fn get_ranges(&self, location: &Path, ranges: &[Range<u64>]) ->
Result<Vec<Bytes>> {
@@ -1138,10 +1098,6 @@ macro_rules! as_ref_impl {
self.as_ref().get_opts(location, options).await
}
- async fn get_range(&self, location: &Path, range: Range<u64>) ->
Result<Bytes> {
- self.as_ref().get_range(location, range).await
- }
-
async fn get_ranges(
&self,
location: &Path,
@@ -1257,6 +1213,43 @@ pub trait ObjectStoreExt: ObjectStore {
/// }
/// ```
fn get(&self, location: &Path) -> impl Future<Output = Result<GetResult>>;
+
+ /// Return the bytes that are stored at the specified location
+ /// in the given byte range.
+ ///
+ /// See [`GetRange::Bounded`] for more details on how `range` gets
interpreted.
+ ///
+ /// To retrieve a range of bytes from a versioned object, use
[`ObjectStore::get_opts`] by specifying the range in the [`GetOptions`].
+ ///
+ /// ## Examples
+ ///
+ /// This example uses a basic local filesystem object store to get a byte
range from an object.
+ ///
+ /// ```ignore-wasm32
+ /// # use object_store::local::LocalFileSystem;
+ /// # use tempfile::tempdir;
+ /// # use object_store::{path::Path, ObjectStore, ObjectStoreExt};
+ /// async fn get_range_example() {
+ /// let tmp = tempdir().unwrap();
+ /// let store = LocalFileSystem::new_with_prefix(tmp.path()).unwrap();
+ /// let location = Path::from("example.txt");
+ /// let content = b"Hello, Object Store!";
+ ///
+ /// // Put the object into the store
+ /// store
+ /// .put(&location, content.as_ref().into())
+ /// .await
+ /// .expect("Failed to put object");
+ ///
+ /// // Get the object from the store
+ /// let bytes = store
+ /// .get_range(&location, 0..5)
+ /// .await
+ /// .expect("Failed to get object");
+ /// println!("Retrieved range [0-5]: {}",
String::from_utf8_lossy(&bytes));
+ /// }
+ /// ```
+ fn get_range(&self, location: &Path, range: Range<u64>) -> impl
Future<Output = Result<Bytes>>;
}
impl<T> ObjectStoreExt for T
@@ -1276,6 +1269,11 @@ where
async fn get(&self, location: &Path) -> Result<GetResult> {
self.get_opts(location, GetOptions::default()).await
}
+
+ async fn get_range(&self, location: &Path, range: Range<u64>) ->
Result<Bytes> {
+ let options = GetOptions::new().with_range(Some(range));
+ self.get_opts(location, options).await?.bytes().await
+ }
}
/// Result of a list call that includes objects, prefixes (directories) and a
diff --git a/src/limit.rs b/src/limit.rs
index a46ba56..99f7d97 100644
--- a/src/limit.rs
+++ b/src/limit.rs
@@ -100,11 +100,6 @@ impl<T: ObjectStore> ObjectStore for LimitStore<T> {
Ok(permit_get_result(r, permit))
}
- async fn get_range(&self, location: &Path, range: Range<u64>) ->
Result<Bytes> {
- let _permit = self.semaphore.acquire().await.unwrap();
- self.inner.get_range(location, range).await
- }
-
async fn get_ranges(&self, location: &Path, ranges: &[Range<u64>]) ->
Result<Vec<Bytes>> {
let _permit = self.semaphore.acquire().await.unwrap();
self.inner.get_ranges(location, ranges).await
diff --git a/src/local.rs b/src/local.rs
index ee26171..9eccd5f 100644
--- a/src/local.rs
+++ b/src/local.rs
@@ -429,15 +429,6 @@ impl ObjectStore for LocalFileSystem {
.await
}
- async fn get_range(&self, location: &Path, range: Range<u64>) ->
Result<Bytes> {
- let path = self.path_to_filesystem(location)?;
- maybe_spawn_blocking(move || {
- let (mut file, _) = open_file(&path)?;
- read_range(&mut file, &path, range)
- })
- .await
- }
-
async fn get_ranges(&self, location: &Path, ranges: &[Range<u64>]) ->
Result<Vec<Bytes>> {
let path = self.path_to_filesystem(location)?;
let ranges = ranges.to_vec();
diff --git a/src/prefix.rs b/src/prefix.rs
index 19d2ada..029ecb9 100644
--- a/src/prefix.rs
+++ b/src/prefix.rs
@@ -114,11 +114,6 @@ impl<T: ObjectStore> ObjectStore for PrefixStore<T> {
self.inner.put_multipart_opts(&full_path, opts).await
}
- async fn get_range(&self, location: &Path, range: Range<u64>) ->
Result<Bytes> {
- let full_path = self.full_path(location);
- self.inner.get_range(&full_path, range).await
- }
-
async fn get_opts(&self, location: &Path, options: GetOptions) ->
Result<GetResult> {
let full_path = self.full_path(location);
self.inner.get_opts(&full_path, options).await
diff --git a/src/throttle.rs b/src/throttle.rs
index 8b6196a..b31d454 100644
--- a/src/throttle.rs
+++ b/src/throttle.rs
@@ -181,17 +181,6 @@ impl<T: ObjectStore> ObjectStore for ThrottledStore<T> {
Ok(throttle_get(result, wait_get_per_byte))
}
- async fn get_range(&self, location: &Path, range: Range<u64>) ->
Result<Bytes> {
- let config = self.config();
-
- let sleep_duration =
- config.wait_get_per_call + config.wait_get_per_byte * (range.end -
range.start) as u32;
-
- sleep(sleep_duration).await;
-
- self.inner.get_range(location, range).await
- }
-
async fn get_ranges(&self, location: &Path, ranges: &[Range<u64>]) ->
Result<Vec<Bytes>> {
let config = self.config();