This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 820e40a278 Add range and ObjectMeta to GetResult (#4352) (#4495)
(#4677)
820e40a278 is described below
commit 820e40a27863f3eb8b1e95856107c2c0e4d81722
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Mon Aug 14 12:11:43 2023 +0100
Add range and ObjectMeta to GetResult (#4352) (#4495) (#4677)
* Add range and ObjectMeta to GetResult (#4352) (#4495)
* Review feedback
* Fix docs
---
object_store/src/chunked.rs | 126 +++++++++++++++++------------------------
object_store/src/client/get.rs | 15 ++++-
object_store/src/http/mod.rs | 19 ++++++-
object_store/src/lib.rs | 103 +++++++++++++--------------------
object_store/src/limit.rs | 30 +++++-----
object_store/src/local.rs | 80 ++++++++++++++++++++------
object_store/src/memory.rs | 50 ++++++++++------
object_store/src/throttle.rs | 27 +++++----
8 files changed, 252 insertions(+), 198 deletions(-)
diff --git a/object_store/src/chunked.rs b/object_store/src/chunked.rs
index c639d7e898..008dec6794 100644
--- a/object_store/src/chunked.rs
+++ b/object_store/src/chunked.rs
@@ -18,7 +18,6 @@
//! A [`ChunkedStore`] that can be used to test streaming behaviour
use std::fmt::{Debug, Display, Formatter};
-use std::io::{BufReader, Read};
use std::ops::Range;
use std::sync::Arc;
@@ -29,8 +28,9 @@ use futures::StreamExt;
use tokio::io::AsyncWrite;
use crate::path::Path;
-use crate::util::maybe_spawn_blocking;
-use crate::{GetOptions, GetResult, ListResult, ObjectMeta, ObjectStore};
+use crate::{
+ GetOptions, GetResult, GetResultPayload, ListResult, ObjectMeta,
ObjectStore,
+};
use crate::{MultipartId, Result};
/// Wraps a [`ObjectStore`] and makes its get response return chunks
@@ -82,77 +82,57 @@ impl ObjectStore for ChunkedStore {
}
async fn get_opts(&self, location: &Path, options: GetOptions) ->
Result<GetResult> {
- match self.inner.get_opts(location, options).await? {
- GetResult::File(std_file, ..) => {
- let reader = BufReader::new(std_file);
- let chunk_size = self.chunk_size;
- Ok(GetResult::Stream(
- futures::stream::try_unfold(reader, move |mut reader|
async move {
- let (r, out, reader) = maybe_spawn_blocking(move || {
- let mut out = Vec::with_capacity(chunk_size);
- let r = (&mut reader)
- .take(chunk_size as u64)
- .read_to_end(&mut out)
- .map_err(|err| crate::Error::Generic {
- store: "ChunkedStore",
- source: Box::new(err),
- })?;
- Ok((r, out, reader))
- })
- .await?;
-
- match r {
- 0 => Ok(None),
- _ => Ok(Some((out.into(), reader))),
- }
- })
- .boxed(),
- ))
+ let r = self.inner.get_opts(location, options).await?;
+ let stream = match r.payload {
+ GetResultPayload::File(file, path) => {
+ crate::local::chunked_stream(file, path, r.range.clone(),
self.chunk_size)
}
- GetResult::Stream(stream) => {
+ GetResultPayload::Stream(stream) => {
let buffer = BytesMut::new();
- Ok(GetResult::Stream(
- futures::stream::unfold(
- (stream, buffer, false, self.chunk_size),
- |(mut stream, mut buffer, mut exhausted, chunk_size)|
async move {
- // Keep accumulating bytes until we reach capacity
as long as
- // the stream can provide them:
- if exhausted {
- return None;
- }
- while buffer.len() < chunk_size {
- match stream.next().await {
- None => {
- exhausted = true;
- let slice =
buffer.split_off(0).freeze();
- return Some((
- Ok(slice),
- (stream, buffer, exhausted,
chunk_size),
- ));
- }
- Some(Ok(bytes)) => {
- buffer.put(bytes);
- }
- Some(Err(e)) => {
- return Some((
- Err(crate::Error::Generic {
- store: "ChunkedStore",
- source: Box::new(e),
- }),
- (stream, buffer, exhausted,
chunk_size),
- ))
- }
- };
- }
- // Return the chunked values as the next value in
the stream
- let slice = buffer.split_to(chunk_size).freeze();
- Some((Ok(slice), (stream, buffer, exhausted,
chunk_size)))
- },
- )
- .boxed(),
- ))
+ futures::stream::unfold(
+ (stream, buffer, false, self.chunk_size),
+ |(mut stream, mut buffer, mut exhausted, chunk_size)|
async move {
+ // Keep accumulating bytes until we reach capacity as
long as
+ // the stream can provide them:
+ if exhausted {
+ return None;
+ }
+ while buffer.len() < chunk_size {
+ match stream.next().await {
+ None => {
+ exhausted = true;
+ let slice = buffer.split_off(0).freeze();
+ return Some((
+ Ok(slice),
+ (stream, buffer, exhausted,
chunk_size),
+ ));
+ }
+ Some(Ok(bytes)) => {
+ buffer.put(bytes);
+ }
+ Some(Err(e)) => {
+ return Some((
+ Err(crate::Error::Generic {
+ store: "ChunkedStore",
+ source: Box::new(e),
+ }),
+ (stream, buffer, exhausted,
chunk_size),
+ ))
+ }
+ };
+ }
+ // Return the chunked values as the next value in the
stream
+ let slice = buffer.split_to(chunk_size).freeze();
+ Some((Ok(slice), (stream, buffer, exhausted,
chunk_size)))
+ },
+ )
+ .boxed()
}
- }
+ };
+ Ok(GetResult {
+ payload: GetResultPayload::Stream(stream),
+ ..r
+ })
}
async fn get_range(&self, location: &Path, range: Range<usize>) ->
Result<Bytes> {
@@ -217,8 +197,8 @@ mod tests {
for chunk_size in [10, 20, 31] {
let store = ChunkedStore::new(Arc::clone(&store), chunk_size);
- let mut s = match store.get(&location).await.unwrap() {
- GetResult::Stream(s) => s,
+ let mut s = match store.get(&location).await.unwrap().payload {
+ GetResultPayload::Stream(s) => s,
_ => unreachable!(),
};
diff --git a/object_store/src/client/get.rs b/object_store/src/client/get.rs
index 3c66a72d82..6b2d60ae56 100644
--- a/object_store/src/client/get.rs
+++ b/object_store/src/client/get.rs
@@ -17,8 +17,8 @@
use crate::client::header::header_meta;
use crate::path::Path;
-use crate::Result;
use crate::{Error, GetOptions, GetResult, ObjectMeta};
+use crate::{GetResultPayload, Result};
use async_trait::async_trait;
use futures::{StreamExt, TryStreamExt};
use reqwest::Response;
@@ -47,7 +47,14 @@ pub trait GetClientExt {
#[async_trait]
impl<T: GetClient> GetClientExt for T {
async fn get_opts(&self, location: &Path, options: GetOptions) ->
Result<GetResult> {
+ let range = options.range.clone();
let response = self.get_request(location, options, false).await?;
+ let meta =
+ header_meta(location, response.headers()).map_err(|e|
Error::Generic {
+ store: T::STORE,
+ source: Box::new(e),
+ })?;
+
let stream = response
.bytes_stream()
.map_err(|source| Error::Generic {
@@ -56,7 +63,11 @@ impl<T: GetClient> GetClientExt for T {
})
.boxed();
- Ok(GetResult::Stream(stream))
+ Ok(GetResult {
+ range: range.unwrap_or(0..meta.size),
+ payload: GetResultPayload::Stream(stream),
+ meta,
+ })
}
async fn head(&self, location: &Path) -> Result<ObjectMeta> {
diff --git a/object_store/src/http/mod.rs b/object_store/src/http/mod.rs
index 6927f1b883..e8e7b459e1 100644
--- a/object_store/src/http/mod.rs
+++ b/object_store/src/http/mod.rs
@@ -40,11 +40,12 @@ use snafu::{OptionExt, ResultExt, Snafu};
use tokio::io::AsyncWrite;
use url::Url;
+use crate::client::header::header_meta;
use crate::http::client::Client;
use crate::path::Path;
use crate::{
- ClientConfigKey, ClientOptions, GetOptions, GetResult, ListResult,
MultipartId,
- ObjectMeta, ObjectStore, Result, RetryConfig,
+ ClientConfigKey, ClientOptions, GetOptions, GetResult, GetResultPayload,
ListResult,
+ MultipartId, ObjectMeta, ObjectStore, Result, RetryConfig,
};
mod client;
@@ -60,6 +61,11 @@ enum Error {
url: String,
},
+ #[snafu(display("Unable to extract metadata from headers: {}", source))]
+ Metadata {
+ source: crate::client::header::Error,
+ },
+
#[snafu(display("Request error: {}", source))]
Reqwest { source: reqwest::Error },
}
@@ -109,13 +115,20 @@ impl ObjectStore for HttpStore {
}
async fn get_opts(&self, location: &Path, options: GetOptions) ->
Result<GetResult> {
+ let range = options.range.clone();
let response = self.client.get(location, options).await?;
+ let meta = header_meta(location,
response.headers()).context(MetadataSnafu)?;
+
let stream = response
.bytes_stream()
.map_err(|source| Error::Reqwest { source }.into())
.boxed();
- Ok(GetResult::Stream(stream))
+ Ok(GetResult {
+ payload: GetResultPayload::Stream(stream),
+ range: range.unwrap_or(0..meta.size),
+ meta,
+ })
}
async fn head(&self, location: &Path) -> Result<ObjectMeta> {
diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs
index cf7e47998a..7496b589cd 100644
--- a/object_store/src/lib.rs
+++ b/object_store/src/lib.rs
@@ -374,8 +374,6 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync +
Debug + 'static {
}
/// Perform a get request with options
- ///
- /// Note: options.range will be ignored if [`GetResult::File`]
async fn get_opts(&self, location: &Path, options: GetOptions) ->
Result<GetResult>;
/// Return the bytes that are stored at the specified location
@@ -385,17 +383,7 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync +
Debug + 'static {
range: Some(range.clone()),
..Default::default()
};
- // Temporary until GetResult::File supports range (#4352)
- match self.get_opts(location, options).await? {
- GetResult::Stream(s) => collect_bytes(s, None).await,
- #[cfg(not(target_arch = "wasm32"))]
- GetResult::File(mut file, path) => {
- maybe_spawn_blocking(move || local::read_range(&mut file,
&path, range))
- .await
- }
- #[cfg(target_arch = "wasm32")]
- _ => unimplemented!("File IO not implemented on wasm32."),
- }
+ self.get_opts(location, options).await?.bytes().await
}
/// Return the bytes that are stored at the specified location
@@ -751,21 +739,32 @@ impl GetOptions {
}
/// Result for a get request
+#[derive(Debug)]
+pub struct GetResult {
+ /// The [`GetResultPayload`]
+ pub payload: GetResultPayload,
+ /// The [`ObjectMeta`] for this object
+ pub meta: ObjectMeta,
+ /// The range of bytes returned by this request
+ pub range: Range<usize>,
+}
+
+/// The kind of a [`GetResult`]
///
/// This special cases the case of a local file, as some systems may
/// be able to optimise the case of a file already present on local disk
-pub enum GetResult {
- /// A file and its path on the local filesystem
+pub enum GetResultPayload {
+ /// The file, path
File(std::fs::File, std::path::PathBuf),
- /// An asynchronous stream
+ /// An opaque stream of bytes
Stream(BoxStream<'static, Result<Bytes>>),
}
-impl Debug for GetResult {
+impl Debug for GetResultPayload {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
- Self::File(_, _) => write!(f, "GetResult(File)"),
- Self::Stream(_) => write!(f, "GetResult(Stream)"),
+ Self::File(_, _) => write!(f, "GetResultPayload(File)"),
+ Self::Stream(_) => write!(f, "GetResultPayload(Stream)"),
}
}
}
@@ -773,32 +772,31 @@ impl Debug for GetResult {
impl GetResult {
/// Collects the data into a [`Bytes`]
pub async fn bytes(self) -> Result<Bytes> {
- match self {
+ let len = self.range.end - self.range.start;
+ match self.payload {
#[cfg(not(target_arch = "wasm32"))]
- Self::File(mut file, path) => {
+ GetResultPayload::File(mut file, path) => {
maybe_spawn_blocking(move || {
- let len = file.seek(SeekFrom::End(0)).map_err(|source| {
- local::Error::Seek {
+ file.seek(SeekFrom::Start(self.range.start as _)).map_err(
+ |source| local::Error::Seek {
source,
path: path.clone(),
- }
- })?;
-
- file.rewind().map_err(|source| local::Error::Seek {
- source,
- path: path.clone(),
- })?;
+ },
+ )?;
- let mut buffer = Vec::with_capacity(len as usize);
- file.read_to_end(&mut buffer).map_err(|source| {
- local::Error::UnableToReadBytes { source, path }
- })?;
+ let mut buffer = Vec::with_capacity(len);
+ file.take(len as _)
+ .read_to_end(&mut buffer)
+ .map_err(|source| local::Error::UnableToReadBytes {
+ source,
+ path,
+ })?;
Ok(buffer.into())
})
.await
}
- Self::Stream(s) => collect_bytes(s, None).await,
+ GetResultPayload::Stream(s) => collect_bytes(s, Some(len)).await,
#[cfg(target_arch = "wasm32")]
_ => unimplemented!("File IO not implemented on wasm32."),
}
@@ -806,8 +804,8 @@ impl GetResult {
/// Converts this into a byte stream
///
- /// If the result is [`Self::File`] will perform chunked reads of the
file, otherwise
- /// will return the [`Self::Stream`].
+ /// If the `self.kind` is [`GetResultPayload::File`] will perform chunked
reads of the file,
+ /// otherwise will return the [`GetResultPayload::Stream`].
///
/// # Tokio Compatibility
///
@@ -819,36 +817,13 @@ impl GetResult {
/// If not called from a tokio context, this will perform IO on the
current thread with
/// no additional complexity or overheads
pub fn into_stream(self) -> BoxStream<'static, Result<Bytes>> {
- match self {
+ match self.payload {
#[cfg(not(target_arch = "wasm32"))]
- Self::File(file, path) => {
+ GetResultPayload::File(file, path) => {
const CHUNK_SIZE: usize = 8 * 1024;
-
- futures::stream::try_unfold(
- (file, path, false),
- |(mut file, path, finished)| {
- maybe_spawn_blocking(move || {
- if finished {
- return Ok(None);
- }
-
- let mut buffer = Vec::with_capacity(CHUNK_SIZE);
- let read = file
- .by_ref()
- .take(CHUNK_SIZE as u64)
- .read_to_end(&mut buffer)
- .map_err(|e| local::Error::UnableToReadBytes {
- source: e,
- path: path.clone(),
- })?;
-
- Ok(Some((buffer.into(), (file, path, read !=
CHUNK_SIZE))))
- })
- },
- )
- .boxed()
+ local::chunked_stream(file, path, self.range, CHUNK_SIZE)
}
- Self::Stream(s) => s,
+ GetResultPayload::Stream(s) => s,
#[cfg(target_arch = "wasm32")]
_ => unimplemented!("File IO not implemented on wasm32."),
}
diff --git a/object_store/src/limit.rs b/object_store/src/limit.rs
index 630fd145b7..a9b8c4b050 100644
--- a/object_store/src/limit.rs
+++ b/object_store/src/limit.rs
@@ -18,8 +18,8 @@
//! An object store that limits the maximum concurrency of the wrapped
implementation
use crate::{
- BoxStream, GetOptions, GetResult, ListResult, MultipartId, ObjectMeta,
ObjectStore,
- Path, Result, StreamExt,
+ BoxStream, GetOptions, GetResult, GetResultPayload, ListResult,
MultipartId,
+ ObjectMeta, ObjectStore, Path, Result, StreamExt,
};
use async_trait::async_trait;
use bytes::Bytes;
@@ -106,22 +106,14 @@ impl<T: ObjectStore> ObjectStore for LimitStore<T> {
async fn get(&self, location: &Path) -> Result<GetResult> {
let permit =
Arc::clone(&self.semaphore).acquire_owned().await.unwrap();
- match self.inner.get(location).await? {
- r @ GetResult::File(_, _) => Ok(r),
- GetResult::Stream(s) => {
- Ok(GetResult::Stream(PermitWrapper::new(s, permit).boxed()))
- }
- }
+ let r = self.inner.get(location).await?;
+ Ok(permit_get_result(r, permit))
}
async fn get_opts(&self, location: &Path, options: GetOptions) ->
Result<GetResult> {
let permit =
Arc::clone(&self.semaphore).acquire_owned().await.unwrap();
- match self.inner.get_opts(location, options).await? {
- r @ GetResult::File(_, _) => Ok(r),
- GetResult::Stream(s) => {
- Ok(GetResult::Stream(PermitWrapper::new(s, permit).boxed()))
- }
- }
+ let r = self.inner.get_opts(location, options).await?;
+ Ok(permit_get_result(r, permit))
}
async fn get_range(&self, location: &Path, range: Range<usize>) ->
Result<Bytes> {
@@ -200,6 +192,16 @@ impl<T: ObjectStore> ObjectStore for LimitStore<T> {
}
}
+fn permit_get_result(r: GetResult, permit: OwnedSemaphorePermit) -> GetResult {
+ let payload = match r.payload {
+ v @ GetResultPayload::File(_, _) => v,
+ GetResultPayload::Stream(s) => {
+ GetResultPayload::Stream(PermitWrapper::new(s, permit).boxed())
+ }
+ };
+ GetResult { payload, ..r }
+}
+
/// Combines an [`OwnedSemaphorePermit`] with some other type
struct PermitWrapper<T> {
inner: T,
diff --git a/object_store/src/local.rs b/object_store/src/local.rs
index a0933cc617..4d57ef1b79 100644
--- a/object_store/src/local.rs
+++ b/object_store/src/local.rs
@@ -19,16 +19,17 @@
use crate::{
maybe_spawn_blocking,
path::{absolute_path_to_url, Path},
- GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore,
Result,
+ GetOptions, GetResult, GetResultPayload, ListResult, MultipartId,
ObjectMeta,
+ ObjectStore, Result,
};
use async_trait::async_trait;
use bytes::Bytes;
use chrono::{DateTime, Utc};
use futures::future::BoxFuture;
-use futures::FutureExt;
use futures::{stream::BoxStream, StreamExt};
+use futures::{FutureExt, TryStreamExt};
use snafu::{ensure, OptionExt, ResultExt, Snafu};
-use std::fs::{metadata, symlink_metadata, File, OpenOptions};
+use std::fs::{metadata, symlink_metadata, File, Metadata, OpenOptions};
use std::io::{ErrorKind, Read, Seek, SeekFrom, Write};
use std::ops::Range;
use std::pin::Pin;
@@ -370,18 +371,20 @@ impl ObjectStore for LocalFileSystem {
let location = location.clone();
let path = self.config.path_to_filesystem(&location)?;
maybe_spawn_blocking(move || {
- let file = open_file(&path)?;
+ let (file, metadata) = open_file(&path)?;
if options.if_unmodified_since.is_some()
|| options.if_modified_since.is_some()
{
- let metadata = file.metadata().map_err(|e| Error::Metadata {
- source: e.into(),
- path: location.to_string(),
- })?;
options.check_modified(&location, last_modified(&metadata))?;
}
- Ok(GetResult::File(file, path))
+ let meta = convert_metadata(metadata, location)?;
+
+ Ok(GetResult {
+ payload: GetResultPayload::File(file, path),
+ range: options.range.unwrap_or(0..meta.size),
+ meta,
+ })
})
.await
}
@@ -389,7 +392,7 @@ impl ObjectStore for LocalFileSystem {
async fn get_range(&self, location: &Path, range: Range<usize>) ->
Result<Bytes> {
let path = self.config.path_to_filesystem(location)?;
maybe_spawn_blocking(move || {
- let mut file = open_file(&path)?;
+ let (mut file, _) = open_file(&path)?;
read_range(&mut file, &path, range)
})
.await
@@ -404,7 +407,7 @@ impl ObjectStore for LocalFileSystem {
let ranges = ranges.to_vec();
maybe_spawn_blocking(move || {
// Vectored IO might be faster
- let mut file = open_file(&path)?;
+ let (mut file, _) = open_file(&path)?;
ranges
.into_iter()
.map(|r| read_range(&mut file, &path, r))
@@ -863,6 +866,51 @@ impl AsyncWrite for LocalUpload {
}
}
+pub(crate) fn chunked_stream(
+ mut file: File,
+ path: PathBuf,
+ range: Range<usize>,
+ chunk_size: usize,
+) -> BoxStream<'static, Result<Bytes, super::Error>> {
+ futures::stream::once(async move {
+ let (file, path) = maybe_spawn_blocking(move || {
+ file.seek(SeekFrom::Start(range.start as _))
+ .map_err(|source| Error::Seek {
+ source,
+ path: path.clone(),
+ })?;
+ Ok((file, path))
+ })
+ .await?;
+
+ let stream = futures::stream::try_unfold(
+ (file, path, range.end - range.start),
+ move |(mut file, path, remaining)| {
+ maybe_spawn_blocking(move || {
+ if remaining == 0 {
+ return Ok(None);
+ }
+
+ let to_read = remaining.min(chunk_size);
+ let mut buffer = Vec::with_capacity(to_read);
+ let read = (&mut file)
+ .take(to_read as u64)
+ .read_to_end(&mut buffer)
+ .map_err(|e| Error::UnableToReadBytes {
+ source: e,
+ path: path.clone(),
+ })?;
+
+ Ok(Some((buffer.into(), (file, path, remaining - read))))
+ })
+ },
+ );
+ Ok::<_, super::Error>(stream)
+ })
+ .try_flatten()
+ .boxed()
+}
+
pub(crate) fn read_range(
file: &mut File,
path: &PathBuf,
@@ -889,8 +937,8 @@ pub(crate) fn read_range(
Ok(buf.into())
}
-fn open_file(path: &PathBuf) -> Result<File> {
- let file = match File::open(path).and_then(|f| Ok((f.metadata()?, f))) {
+fn open_file(path: &PathBuf) -> Result<(File, Metadata)> {
+ let ret = match File::open(path).and_then(|f| Ok((f.metadata()?, f))) {
Err(e) => Err(match e.kind() {
ErrorKind::NotFound => Error::NotFound {
path: path.clone(),
@@ -902,14 +950,14 @@ fn open_file(path: &PathBuf) -> Result<File> {
},
}),
Ok((metadata, file)) => match !metadata.is_dir() {
- true => Ok(file),
+ true => Ok((file, metadata)),
false => Err(Error::NotFound {
path: path.clone(),
source: io::Error::new(ErrorKind::NotFound, "is directory"),
}),
},
}?;
- Ok(file)
+ Ok(ret)
}
fn convert_entry(entry: DirEntry, location: Path) -> Result<ObjectMeta> {
@@ -927,7 +975,7 @@ fn last_modified(metadata: &std::fs::Metadata) ->
DateTime<Utc> {
.into()
}
-fn convert_metadata(metadata: std::fs::Metadata, location: Path) ->
Result<ObjectMeta> {
+fn convert_metadata(metadata: Metadata, location: Path) -> Result<ObjectMeta> {
let last_modified = last_modified(&metadata);
let size =
usize::try_from(metadata.len()).context(FileSizeOverflowedUsizeSnafu {
path: location.as_ref(),
diff --git a/object_store/src/memory.rs b/object_store/src/memory.rs
index cfc2ac8230..1e8e3c1fd0 100644
--- a/object_store/src/memory.rs
+++ b/object_store/src/memory.rs
@@ -16,7 +16,9 @@
// under the License.
//! An in-memory object store implementation
-use crate::{path::Path, GetResult, ListResult, ObjectMeta, ObjectStore,
Result};
+use crate::{
+ path::Path, GetResult, GetResultPayload, ListResult, ObjectMeta,
ObjectStore, Result,
+};
use crate::{GetOptions, MultipartId};
use async_trait::async_trait;
use bytes::Bytes;
@@ -43,11 +45,13 @@ enum Error {
#[snafu(display("No data in memory found. Location: {path}"))]
NoDataInMemory { path: String },
- #[snafu(display("Out of range"))]
- OutOfRange,
+ #[snafu(display(
+ "Requested range {}..{} is out of bounds for object with length {}",
range.start, range.end, len
+ ))]
+ OutOfRange { range: Range<usize>, len: usize },
- #[snafu(display("Bad range"))]
- BadRange,
+ #[snafu(display("Invalid range: {}..{}", range.start, range.end))]
+ BadRange { range: Range<usize> },
#[snafu(display("Object already exists at that location: {path}"))]
AlreadyExists { path: String },
@@ -136,17 +140,29 @@ impl ObjectStore for InMemory {
}
let (data, last_modified) = self.entry(location).await?;
options.check_modified(location, last_modified)?;
+ let meta = ObjectMeta {
+ location: location.clone(),
+ last_modified,
+ size: data.len(),
+ e_tag: None,
+ };
+ let (range, data) = match options.range {
+ Some(range) => {
+ let len = data.len();
+ ensure!(range.end <= len, OutOfRangeSnafu { range, len });
+ ensure!(range.start <= range.end, BadRangeSnafu { range });
+ (range.clone(), data.slice(range))
+ }
+ None => (0..data.len(), data),
+ };
let stream = futures::stream::once(futures::future::ready(Ok(data)));
- Ok(GetResult::Stream(stream.boxed()))
- }
-
- async fn get_range(&self, location: &Path, range: Range<usize>) ->
Result<Bytes> {
- let data = self.entry(location).await?;
- ensure!(range.end <= data.0.len(), OutOfRangeSnafu);
- ensure!(range.start <= range.end, BadRangeSnafu);
- Ok(data.0.slice(range))
+ Ok(GetResult {
+ payload: GetResultPayload::Stream(stream.boxed()),
+ meta,
+ range,
+ })
}
async fn get_ranges(
@@ -158,9 +174,11 @@ impl ObjectStore for InMemory {
ranges
.iter()
.map(|range| {
- ensure!(range.end <= data.0.len(), OutOfRangeSnafu);
- ensure!(range.start <= range.end, BadRangeSnafu);
- Ok(data.0.slice(range.clone()))
+ let range = range.clone();
+ let len = data.0.len();
+ ensure!(range.end <= data.0.len(), OutOfRangeSnafu { range,
len });
+ ensure!(range.start <= range.end, BadRangeSnafu { range });
+ Ok(data.0.slice(range))
})
.collect()
}
diff --git a/object_store/src/throttle.rs b/object_store/src/throttle.rs
index fb90afcec9..58c476ab45 100644
--- a/object_store/src/throttle.rs
+++ b/object_store/src/throttle.rs
@@ -20,7 +20,9 @@ use parking_lot::Mutex;
use std::ops::Range;
use std::{convert::TryInto, sync::Arc};
-use crate::{path::Path, GetResult, ListResult, ObjectMeta, ObjectStore,
Result};
+use crate::{
+ path::Path, GetResult, GetResultPayload, ListResult, ObjectMeta,
ObjectStore, Result,
+};
use crate::{GetOptions, MultipartId};
use async_trait::async_trait;
use bytes::Bytes;
@@ -301,15 +303,20 @@ fn usize_to_u32_saturate(x: usize) -> u32 {
}
fn throttle_get(result: GetResult, wait_get_per_byte: Duration) -> GetResult {
- let s = match result {
- GetResult::Stream(s) => s,
- GetResult::File(_, _) => unimplemented!(),
+ let s = match result.payload {
+ GetResultPayload::Stream(s) => s,
+ GetResultPayload::File(_, _) => unimplemented!(),
};
- GetResult::Stream(throttle_stream(s, move |bytes| {
+ let stream = throttle_stream(s, move |bytes| {
let bytes_len: u32 = usize_to_u32_saturate(bytes.len());
wait_get_per_byte * bytes_len
- }))
+ });
+
+ GetResult {
+ payload: GetResultPayload::Stream(stream),
+ ..result
+ }
}
fn throttle_stream<T: Send + 'static, E: Send + 'static, F>(
@@ -330,7 +337,7 @@ where
#[cfg(test)]
mod tests {
use super::*;
- use crate::{memory::InMemory, tests::*};
+ use crate::{memory::InMemory, tests::*, GetResultPayload};
use bytes::Bytes;
use futures::TryStreamExt;
use tokio::time::Duration;
@@ -550,9 +557,9 @@ mod tests {
let res = store.get(&path).await;
if n_bytes.is_some() {
// need to consume bytes to provoke sleep times
- let s = match res.unwrap() {
- GetResult::Stream(s) => s,
- GetResult::File(_, _) => unimplemented!(),
+ let s = match res.unwrap().payload {
+ GetResultPayload::Stream(s) => s,
+ GetResultPayload::File(_, _) => unimplemented!(),
};
s.map_ok(|b| bytes::BytesMut::from(&b[..]))