This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 820e40a278 Add range and ObjectMeta to GetResult (#4352) (#4495) 
(#4677)
820e40a278 is described below

commit 820e40a27863f3eb8b1e95856107c2c0e4d81722
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Mon Aug 14 12:11:43 2023 +0100

    Add range and ObjectMeta to GetResult (#4352) (#4495) (#4677)
    
    * Add range and ObjectMeta to GetResult (#4352) (#4495)
    
    * Review feedback
    
    * Fix docs
---
 object_store/src/chunked.rs    | 126 +++++++++++++++++------------------------
 object_store/src/client/get.rs |  15 ++++-
 object_store/src/http/mod.rs   |  19 ++++++-
 object_store/src/lib.rs        | 103 +++++++++++++--------------------
 object_store/src/limit.rs      |  30 +++++-----
 object_store/src/local.rs      |  80 ++++++++++++++++++++------
 object_store/src/memory.rs     |  50 ++++++++++------
 object_store/src/throttle.rs   |  27 +++++----
 8 files changed, 252 insertions(+), 198 deletions(-)

diff --git a/object_store/src/chunked.rs b/object_store/src/chunked.rs
index c639d7e898..008dec6794 100644
--- a/object_store/src/chunked.rs
+++ b/object_store/src/chunked.rs
@@ -18,7 +18,6 @@
 //! A [`ChunkedStore`] that can be used to test streaming behaviour
 
 use std::fmt::{Debug, Display, Formatter};
-use std::io::{BufReader, Read};
 use std::ops::Range;
 use std::sync::Arc;
 
@@ -29,8 +28,9 @@ use futures::StreamExt;
 use tokio::io::AsyncWrite;
 
 use crate::path::Path;
-use crate::util::maybe_spawn_blocking;
-use crate::{GetOptions, GetResult, ListResult, ObjectMeta, ObjectStore};
+use crate::{
+    GetOptions, GetResult, GetResultPayload, ListResult, ObjectMeta, 
ObjectStore,
+};
 use crate::{MultipartId, Result};
 
 /// Wraps a [`ObjectStore`] and makes its get response return chunks
@@ -82,77 +82,57 @@ impl ObjectStore for ChunkedStore {
     }
 
     async fn get_opts(&self, location: &Path, options: GetOptions) -> 
Result<GetResult> {
-        match self.inner.get_opts(location, options).await? {
-            GetResult::File(std_file, ..) => {
-                let reader = BufReader::new(std_file);
-                let chunk_size = self.chunk_size;
-                Ok(GetResult::Stream(
-                    futures::stream::try_unfold(reader, move |mut reader| 
async move {
-                        let (r, out, reader) = maybe_spawn_blocking(move || {
-                            let mut out = Vec::with_capacity(chunk_size);
-                            let r = (&mut reader)
-                                .take(chunk_size as u64)
-                                .read_to_end(&mut out)
-                                .map_err(|err| crate::Error::Generic {
-                                    store: "ChunkedStore",
-                                    source: Box::new(err),
-                                })?;
-                            Ok((r, out, reader))
-                        })
-                        .await?;
-
-                        match r {
-                            0 => Ok(None),
-                            _ => Ok(Some((out.into(), reader))),
-                        }
-                    })
-                    .boxed(),
-                ))
+        let r = self.inner.get_opts(location, options).await?;
+        let stream = match r.payload {
+            GetResultPayload::File(file, path) => {
+                crate::local::chunked_stream(file, path, r.range.clone(), 
self.chunk_size)
             }
-            GetResult::Stream(stream) => {
+            GetResultPayload::Stream(stream) => {
                 let buffer = BytesMut::new();
-                Ok(GetResult::Stream(
-                    futures::stream::unfold(
-                        (stream, buffer, false, self.chunk_size),
-                        |(mut stream, mut buffer, mut exhausted, chunk_size)| 
async move {
-                            // Keep accumulating bytes until we reach capacity 
as long as
-                            // the stream can provide them:
-                            if exhausted {
-                                return None;
-                            }
-                            while buffer.len() < chunk_size {
-                                match stream.next().await {
-                                    None => {
-                                        exhausted = true;
-                                        let slice = 
buffer.split_off(0).freeze();
-                                        return Some((
-                                            Ok(slice),
-                                            (stream, buffer, exhausted, 
chunk_size),
-                                        ));
-                                    }
-                                    Some(Ok(bytes)) => {
-                                        buffer.put(bytes);
-                                    }
-                                    Some(Err(e)) => {
-                                        return Some((
-                                            Err(crate::Error::Generic {
-                                                store: "ChunkedStore",
-                                                source: Box::new(e),
-                                            }),
-                                            (stream, buffer, exhausted, 
chunk_size),
-                                        ))
-                                    }
-                                };
-                            }
-                            // Return the chunked values as the next value in 
the stream
-                            let slice = buffer.split_to(chunk_size).freeze();
-                            Some((Ok(slice), (stream, buffer, exhausted, 
chunk_size)))
-                        },
-                    )
-                    .boxed(),
-                ))
+                futures::stream::unfold(
+                    (stream, buffer, false, self.chunk_size),
+                    |(mut stream, mut buffer, mut exhausted, chunk_size)| 
async move {
+                        // Keep accumulating bytes until we reach capacity as 
long as
+                        // the stream can provide them:
+                        if exhausted {
+                            return None;
+                        }
+                        while buffer.len() < chunk_size {
+                            match stream.next().await {
+                                None => {
+                                    exhausted = true;
+                                    let slice = buffer.split_off(0).freeze();
+                                    return Some((
+                                        Ok(slice),
+                                        (stream, buffer, exhausted, 
chunk_size),
+                                    ));
+                                }
+                                Some(Ok(bytes)) => {
+                                    buffer.put(bytes);
+                                }
+                                Some(Err(e)) => {
+                                    return Some((
+                                        Err(crate::Error::Generic {
+                                            store: "ChunkedStore",
+                                            source: Box::new(e),
+                                        }),
+                                        (stream, buffer, exhausted, 
chunk_size),
+                                    ))
+                                }
+                            };
+                        }
+                        // Return the chunked values as the next value in the 
stream
+                        let slice = buffer.split_to(chunk_size).freeze();
+                        Some((Ok(slice), (stream, buffer, exhausted, 
chunk_size)))
+                    },
+                )
+                .boxed()
             }
-        }
+        };
+        Ok(GetResult {
+            payload: GetResultPayload::Stream(stream),
+            ..r
+        })
     }
 
     async fn get_range(&self, location: &Path, range: Range<usize>) -> 
Result<Bytes> {
@@ -217,8 +197,8 @@ mod tests {
 
         for chunk_size in [10, 20, 31] {
             let store = ChunkedStore::new(Arc::clone(&store), chunk_size);
-            let mut s = match store.get(&location).await.unwrap() {
-                GetResult::Stream(s) => s,
+            let mut s = match store.get(&location).await.unwrap().payload {
+                GetResultPayload::Stream(s) => s,
                 _ => unreachable!(),
             };
 
diff --git a/object_store/src/client/get.rs b/object_store/src/client/get.rs
index 3c66a72d82..6b2d60ae56 100644
--- a/object_store/src/client/get.rs
+++ b/object_store/src/client/get.rs
@@ -17,8 +17,8 @@
 
 use crate::client::header::header_meta;
 use crate::path::Path;
-use crate::Result;
 use crate::{Error, GetOptions, GetResult, ObjectMeta};
+use crate::{GetResultPayload, Result};
 use async_trait::async_trait;
 use futures::{StreamExt, TryStreamExt};
 use reqwest::Response;
@@ -47,7 +47,14 @@ pub trait GetClientExt {
 #[async_trait]
 impl<T: GetClient> GetClientExt for T {
     async fn get_opts(&self, location: &Path, options: GetOptions) -> 
Result<GetResult> {
+        let range = options.range.clone();
         let response = self.get_request(location, options, false).await?;
+        let meta =
+            header_meta(location, response.headers()).map_err(|e| 
Error::Generic {
+                store: T::STORE,
+                source: Box::new(e),
+            })?;
+
         let stream = response
             .bytes_stream()
             .map_err(|source| Error::Generic {
@@ -56,7 +63,11 @@ impl<T: GetClient> GetClientExt for T {
             })
             .boxed();
 
-        Ok(GetResult::Stream(stream))
+        Ok(GetResult {
+            range: range.unwrap_or(0..meta.size),
+            payload: GetResultPayload::Stream(stream),
+            meta,
+        })
     }
 
     async fn head(&self, location: &Path) -> Result<ObjectMeta> {
diff --git a/object_store/src/http/mod.rs b/object_store/src/http/mod.rs
index 6927f1b883..e8e7b459e1 100644
--- a/object_store/src/http/mod.rs
+++ b/object_store/src/http/mod.rs
@@ -40,11 +40,12 @@ use snafu::{OptionExt, ResultExt, Snafu};
 use tokio::io::AsyncWrite;
 use url::Url;
 
+use crate::client::header::header_meta;
 use crate::http::client::Client;
 use crate::path::Path;
 use crate::{
-    ClientConfigKey, ClientOptions, GetOptions, GetResult, ListResult, 
MultipartId,
-    ObjectMeta, ObjectStore, Result, RetryConfig,
+    ClientConfigKey, ClientOptions, GetOptions, GetResult, GetResultPayload, 
ListResult,
+    MultipartId, ObjectMeta, ObjectStore, Result, RetryConfig,
 };
 
 mod client;
@@ -60,6 +61,11 @@ enum Error {
         url: String,
     },
 
+    #[snafu(display("Unable to extract metadata from headers: {}", source))]
+    Metadata {
+        source: crate::client::header::Error,
+    },
+
     #[snafu(display("Request error: {}", source))]
     Reqwest { source: reqwest::Error },
 }
@@ -109,13 +115,20 @@ impl ObjectStore for HttpStore {
     }
 
     async fn get_opts(&self, location: &Path, options: GetOptions) -> 
Result<GetResult> {
+        let range = options.range.clone();
         let response = self.client.get(location, options).await?;
+        let meta = header_meta(location, 
response.headers()).context(MetadataSnafu)?;
+
         let stream = response
             .bytes_stream()
             .map_err(|source| Error::Reqwest { source }.into())
             .boxed();
 
-        Ok(GetResult::Stream(stream))
+        Ok(GetResult {
+            payload: GetResultPayload::Stream(stream),
+            range: range.unwrap_or(0..meta.size),
+            meta,
+        })
     }
 
     async fn head(&self, location: &Path) -> Result<ObjectMeta> {
diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs
index cf7e47998a..7496b589cd 100644
--- a/object_store/src/lib.rs
+++ b/object_store/src/lib.rs
@@ -374,8 +374,6 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + 
Debug + 'static {
     }
 
     /// Perform a get request with options
-    ///
-    /// Note: options.range will be ignored if [`GetResult::File`]
     async fn get_opts(&self, location: &Path, options: GetOptions) -> 
Result<GetResult>;
 
     /// Return the bytes that are stored at the specified location
@@ -385,17 +383,7 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + 
Debug + 'static {
             range: Some(range.clone()),
             ..Default::default()
         };
-        // Temporary until GetResult::File supports range (#4352)
-        match self.get_opts(location, options).await? {
-            GetResult::Stream(s) => collect_bytes(s, None).await,
-            #[cfg(not(target_arch = "wasm32"))]
-            GetResult::File(mut file, path) => {
-                maybe_spawn_blocking(move || local::read_range(&mut file, 
&path, range))
-                    .await
-            }
-            #[cfg(target_arch = "wasm32")]
-            _ => unimplemented!("File IO not implemented on wasm32."),
-        }
+        self.get_opts(location, options).await?.bytes().await
     }
 
     /// Return the bytes that are stored at the specified location
@@ -751,21 +739,32 @@ impl GetOptions {
 }
 
 /// Result for a get request
+#[derive(Debug)]
+pub struct GetResult {
+    /// The [`GetResultPayload`]
+    pub payload: GetResultPayload,
+    /// The [`ObjectMeta`] for this object
+    pub meta: ObjectMeta,
+    /// The range of bytes returned by this request
+    pub range: Range<usize>,
+}
+
+/// The kind of a [`GetResult`]
 ///
 /// This special cases the case of a local file, as some systems may
 /// be able to optimise the case of a file already present on local disk
-pub enum GetResult {
-    /// A file and its path on the local filesystem
+pub enum GetResultPayload {
+    /// The file, path
     File(std::fs::File, std::path::PathBuf),
-    /// An asynchronous stream
+    /// An opaque stream of bytes
     Stream(BoxStream<'static, Result<Bytes>>),
 }
 
-impl Debug for GetResult {
+impl Debug for GetResultPayload {
     fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
         match self {
-            Self::File(_, _) => write!(f, "GetResult(File)"),
-            Self::Stream(_) => write!(f, "GetResult(Stream)"),
+            Self::File(_, _) => write!(f, "GetResultPayload(File)"),
+            Self::Stream(_) => write!(f, "GetResultPayload(Stream)"),
         }
     }
 }
@@ -773,32 +772,31 @@ impl Debug for GetResult {
 impl GetResult {
     /// Collects the data into a [`Bytes`]
     pub async fn bytes(self) -> Result<Bytes> {
-        match self {
+        let len = self.range.end - self.range.start;
+        match self.payload {
             #[cfg(not(target_arch = "wasm32"))]
-            Self::File(mut file, path) => {
+            GetResultPayload::File(mut file, path) => {
                 maybe_spawn_blocking(move || {
-                    let len = file.seek(SeekFrom::End(0)).map_err(|source| {
-                        local::Error::Seek {
+                    file.seek(SeekFrom::Start(self.range.start as _)).map_err(
+                        |source| local::Error::Seek {
                             source,
                             path: path.clone(),
-                        }
-                    })?;
-
-                    file.rewind().map_err(|source| local::Error::Seek {
-                        source,
-                        path: path.clone(),
-                    })?;
+                        },
+                    )?;
 
-                    let mut buffer = Vec::with_capacity(len as usize);
-                    file.read_to_end(&mut buffer).map_err(|source| {
-                        local::Error::UnableToReadBytes { source, path }
-                    })?;
+                    let mut buffer = Vec::with_capacity(len);
+                    file.take(len as _)
+                        .read_to_end(&mut buffer)
+                        .map_err(|source| local::Error::UnableToReadBytes {
+                            source,
+                            path,
+                        })?;
 
                     Ok(buffer.into())
                 })
                 .await
             }
-            Self::Stream(s) => collect_bytes(s, None).await,
+            GetResultPayload::Stream(s) => collect_bytes(s, Some(len)).await,
             #[cfg(target_arch = "wasm32")]
             _ => unimplemented!("File IO not implemented on wasm32."),
         }
@@ -806,8 +804,8 @@ impl GetResult {
 
     /// Converts this into a byte stream
     ///
-    /// If the result is [`Self::File`] will perform chunked reads of the 
file, otherwise
-    /// will return the [`Self::Stream`].
+    /// If the `self.kind` is [`GetResultPayload::File`] will perform chunked 
reads of the file,
+    /// otherwise will return the [`GetResultPayload::Stream`].
     ///
     /// # Tokio Compatibility
     ///
@@ -819,36 +817,13 @@ impl GetResult {
     /// If not called from a tokio context, this will perform IO on the 
current thread with
     /// no additional complexity or overheads
     pub fn into_stream(self) -> BoxStream<'static, Result<Bytes>> {
-        match self {
+        match self.payload {
             #[cfg(not(target_arch = "wasm32"))]
-            Self::File(file, path) => {
+            GetResultPayload::File(file, path) => {
                 const CHUNK_SIZE: usize = 8 * 1024;
-
-                futures::stream::try_unfold(
-                    (file, path, false),
-                    |(mut file, path, finished)| {
-                        maybe_spawn_blocking(move || {
-                            if finished {
-                                return Ok(None);
-                            }
-
-                            let mut buffer = Vec::with_capacity(CHUNK_SIZE);
-                            let read = file
-                                .by_ref()
-                                .take(CHUNK_SIZE as u64)
-                                .read_to_end(&mut buffer)
-                                .map_err(|e| local::Error::UnableToReadBytes {
-                                    source: e,
-                                    path: path.clone(),
-                                })?;
-
-                            Ok(Some((buffer.into(), (file, path, read != 
CHUNK_SIZE))))
-                        })
-                    },
-                )
-                .boxed()
+                local::chunked_stream(file, path, self.range, CHUNK_SIZE)
             }
-            Self::Stream(s) => s,
+            GetResultPayload::Stream(s) => s,
             #[cfg(target_arch = "wasm32")]
             _ => unimplemented!("File IO not implemented on wasm32."),
         }
diff --git a/object_store/src/limit.rs b/object_store/src/limit.rs
index 630fd145b7..a9b8c4b050 100644
--- a/object_store/src/limit.rs
+++ b/object_store/src/limit.rs
@@ -18,8 +18,8 @@
 //! An object store that limits the maximum concurrency of the wrapped 
implementation
 
 use crate::{
-    BoxStream, GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, 
ObjectStore,
-    Path, Result, StreamExt,
+    BoxStream, GetOptions, GetResult, GetResultPayload, ListResult, 
MultipartId,
+    ObjectMeta, ObjectStore, Path, Result, StreamExt,
 };
 use async_trait::async_trait;
 use bytes::Bytes;
@@ -106,22 +106,14 @@ impl<T: ObjectStore> ObjectStore for LimitStore<T> {
 
     async fn get(&self, location: &Path) -> Result<GetResult> {
         let permit = 
Arc::clone(&self.semaphore).acquire_owned().await.unwrap();
-        match self.inner.get(location).await? {
-            r @ GetResult::File(_, _) => Ok(r),
-            GetResult::Stream(s) => {
-                Ok(GetResult::Stream(PermitWrapper::new(s, permit).boxed()))
-            }
-        }
+        let r = self.inner.get(location).await?;
+        Ok(permit_get_result(r, permit))
     }
 
     async fn get_opts(&self, location: &Path, options: GetOptions) -> 
Result<GetResult> {
         let permit = 
Arc::clone(&self.semaphore).acquire_owned().await.unwrap();
-        match self.inner.get_opts(location, options).await? {
-            r @ GetResult::File(_, _) => Ok(r),
-            GetResult::Stream(s) => {
-                Ok(GetResult::Stream(PermitWrapper::new(s, permit).boxed()))
-            }
-        }
+        let r = self.inner.get_opts(location, options).await?;
+        Ok(permit_get_result(r, permit))
     }
 
     async fn get_range(&self, location: &Path, range: Range<usize>) -> 
Result<Bytes> {
@@ -200,6 +192,16 @@ impl<T: ObjectStore> ObjectStore for LimitStore<T> {
     }
 }
 
+fn permit_get_result(r: GetResult, permit: OwnedSemaphorePermit) -> GetResult {
+    let payload = match r.payload {
+        v @ GetResultPayload::File(_, _) => v,
+        GetResultPayload::Stream(s) => {
+            GetResultPayload::Stream(PermitWrapper::new(s, permit).boxed())
+        }
+    };
+    GetResult { payload, ..r }
+}
+
 /// Combines an [`OwnedSemaphorePermit`] with some other type
 struct PermitWrapper<T> {
     inner: T,
diff --git a/object_store/src/local.rs b/object_store/src/local.rs
index a0933cc617..4d57ef1b79 100644
--- a/object_store/src/local.rs
+++ b/object_store/src/local.rs
@@ -19,16 +19,17 @@
 use crate::{
     maybe_spawn_blocking,
     path::{absolute_path_to_url, Path},
-    GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, 
Result,
+    GetOptions, GetResult, GetResultPayload, ListResult, MultipartId, 
ObjectMeta,
+    ObjectStore, Result,
 };
 use async_trait::async_trait;
 use bytes::Bytes;
 use chrono::{DateTime, Utc};
 use futures::future::BoxFuture;
-use futures::FutureExt;
 use futures::{stream::BoxStream, StreamExt};
+use futures::{FutureExt, TryStreamExt};
 use snafu::{ensure, OptionExt, ResultExt, Snafu};
-use std::fs::{metadata, symlink_metadata, File, OpenOptions};
+use std::fs::{metadata, symlink_metadata, File, Metadata, OpenOptions};
 use std::io::{ErrorKind, Read, Seek, SeekFrom, Write};
 use std::ops::Range;
 use std::pin::Pin;
@@ -370,18 +371,20 @@ impl ObjectStore for LocalFileSystem {
         let location = location.clone();
         let path = self.config.path_to_filesystem(&location)?;
         maybe_spawn_blocking(move || {
-            let file = open_file(&path)?;
+            let (file, metadata) = open_file(&path)?;
             if options.if_unmodified_since.is_some()
                 || options.if_modified_since.is_some()
             {
-                let metadata = file.metadata().map_err(|e| Error::Metadata {
-                    source: e.into(),
-                    path: location.to_string(),
-                })?;
                 options.check_modified(&location, last_modified(&metadata))?;
             }
 
-            Ok(GetResult::File(file, path))
+            let meta = convert_metadata(metadata, location)?;
+
+            Ok(GetResult {
+                payload: GetResultPayload::File(file, path),
+                range: options.range.unwrap_or(0..meta.size),
+                meta,
+            })
         })
         .await
     }
@@ -389,7 +392,7 @@ impl ObjectStore for LocalFileSystem {
     async fn get_range(&self, location: &Path, range: Range<usize>) -> 
Result<Bytes> {
         let path = self.config.path_to_filesystem(location)?;
         maybe_spawn_blocking(move || {
-            let mut file = open_file(&path)?;
+            let (mut file, _) = open_file(&path)?;
             read_range(&mut file, &path, range)
         })
         .await
@@ -404,7 +407,7 @@ impl ObjectStore for LocalFileSystem {
         let ranges = ranges.to_vec();
         maybe_spawn_blocking(move || {
             // Vectored IO might be faster
-            let mut file = open_file(&path)?;
+            let (mut file, _) = open_file(&path)?;
             ranges
                 .into_iter()
                 .map(|r| read_range(&mut file, &path, r))
@@ -863,6 +866,51 @@ impl AsyncWrite for LocalUpload {
     }
 }
 
+pub(crate) fn chunked_stream(
+    mut file: File,
+    path: PathBuf,
+    range: Range<usize>,
+    chunk_size: usize,
+) -> BoxStream<'static, Result<Bytes, super::Error>> {
+    futures::stream::once(async move {
+        let (file, path) = maybe_spawn_blocking(move || {
+            file.seek(SeekFrom::Start(range.start as _))
+                .map_err(|source| Error::Seek {
+                    source,
+                    path: path.clone(),
+                })?;
+            Ok((file, path))
+        })
+        .await?;
+
+        let stream = futures::stream::try_unfold(
+            (file, path, range.end - range.start),
+            move |(mut file, path, remaining)| {
+                maybe_spawn_blocking(move || {
+                    if remaining == 0 {
+                        return Ok(None);
+                    }
+
+                    let to_read = remaining.min(chunk_size);
+                    let mut buffer = Vec::with_capacity(to_read);
+                    let read = (&mut file)
+                        .take(to_read as u64)
+                        .read_to_end(&mut buffer)
+                        .map_err(|e| Error::UnableToReadBytes {
+                            source: e,
+                            path: path.clone(),
+                        })?;
+
+                    Ok(Some((buffer.into(), (file, path, remaining - read))))
+                })
+            },
+        );
+        Ok::<_, super::Error>(stream)
+    })
+    .try_flatten()
+    .boxed()
+}
+
 pub(crate) fn read_range(
     file: &mut File,
     path: &PathBuf,
@@ -889,8 +937,8 @@ pub(crate) fn read_range(
     Ok(buf.into())
 }
 
-fn open_file(path: &PathBuf) -> Result<File> {
-    let file = match File::open(path).and_then(|f| Ok((f.metadata()?, f))) {
+fn open_file(path: &PathBuf) -> Result<(File, Metadata)> {
+    let ret = match File::open(path).and_then(|f| Ok((f.metadata()?, f))) {
         Err(e) => Err(match e.kind() {
             ErrorKind::NotFound => Error::NotFound {
                 path: path.clone(),
@@ -902,14 +950,14 @@ fn open_file(path: &PathBuf) -> Result<File> {
             },
         }),
         Ok((metadata, file)) => match !metadata.is_dir() {
-            true => Ok(file),
+            true => Ok((file, metadata)),
             false => Err(Error::NotFound {
                 path: path.clone(),
                 source: io::Error::new(ErrorKind::NotFound, "is directory"),
             }),
         },
     }?;
-    Ok(file)
+    Ok(ret)
 }
 
 fn convert_entry(entry: DirEntry, location: Path) -> Result<ObjectMeta> {
@@ -927,7 +975,7 @@ fn last_modified(metadata: &std::fs::Metadata) -> 
DateTime<Utc> {
         .into()
 }
 
-fn convert_metadata(metadata: std::fs::Metadata, location: Path) -> 
Result<ObjectMeta> {
+fn convert_metadata(metadata: Metadata, location: Path) -> Result<ObjectMeta> {
     let last_modified = last_modified(&metadata);
     let size = 
usize::try_from(metadata.len()).context(FileSizeOverflowedUsizeSnafu {
         path: location.as_ref(),
diff --git a/object_store/src/memory.rs b/object_store/src/memory.rs
index cfc2ac8230..1e8e3c1fd0 100644
--- a/object_store/src/memory.rs
+++ b/object_store/src/memory.rs
@@ -16,7 +16,9 @@
 // under the License.
 
 //! An in-memory object store implementation
-use crate::{path::Path, GetResult, ListResult, ObjectMeta, ObjectStore, 
Result};
+use crate::{
+    path::Path, GetResult, GetResultPayload, ListResult, ObjectMeta, 
ObjectStore, Result,
+};
 use crate::{GetOptions, MultipartId};
 use async_trait::async_trait;
 use bytes::Bytes;
@@ -43,11 +45,13 @@ enum Error {
     #[snafu(display("No data in memory found. Location: {path}"))]
     NoDataInMemory { path: String },
 
-    #[snafu(display("Out of range"))]
-    OutOfRange,
+    #[snafu(display(
+        "Requested range {}..{} is out of bounds for object with length {}", 
range.start, range.end, len
+    ))]
+    OutOfRange { range: Range<usize>, len: usize },
 
-    #[snafu(display("Bad range"))]
-    BadRange,
+    #[snafu(display("Invalid range: {}..{}", range.start, range.end))]
+    BadRange { range: Range<usize> },
 
     #[snafu(display("Object already exists at that location: {path}"))]
     AlreadyExists { path: String },
@@ -136,17 +140,29 @@ impl ObjectStore for InMemory {
         }
         let (data, last_modified) = self.entry(location).await?;
         options.check_modified(location, last_modified)?;
+        let meta = ObjectMeta {
+            location: location.clone(),
+            last_modified,
+            size: data.len(),
+            e_tag: None,
+        };
 
+        let (range, data) = match options.range {
+            Some(range) => {
+                let len = data.len();
+                ensure!(range.end <= len, OutOfRangeSnafu { range, len });
+                ensure!(range.start <= range.end, BadRangeSnafu { range });
+                (range.clone(), data.slice(range))
+            }
+            None => (0..data.len(), data),
+        };
         let stream = futures::stream::once(futures::future::ready(Ok(data)));
-        Ok(GetResult::Stream(stream.boxed()))
-    }
-
-    async fn get_range(&self, location: &Path, range: Range<usize>) -> 
Result<Bytes> {
-        let data = self.entry(location).await?;
-        ensure!(range.end <= data.0.len(), OutOfRangeSnafu);
-        ensure!(range.start <= range.end, BadRangeSnafu);
 
-        Ok(data.0.slice(range))
+        Ok(GetResult {
+            payload: GetResultPayload::Stream(stream.boxed()),
+            meta,
+            range,
+        })
     }
 
     async fn get_ranges(
@@ -158,9 +174,11 @@ impl ObjectStore for InMemory {
         ranges
             .iter()
             .map(|range| {
-                ensure!(range.end <= data.0.len(), OutOfRangeSnafu);
-                ensure!(range.start <= range.end, BadRangeSnafu);
-                Ok(data.0.slice(range.clone()))
+                let range = range.clone();
+                let len = data.0.len();
+                ensure!(range.end <= data.0.len(), OutOfRangeSnafu { range, 
len });
+                ensure!(range.start <= range.end, BadRangeSnafu { range });
+                Ok(data.0.slice(range))
             })
             .collect()
     }
diff --git a/object_store/src/throttle.rs b/object_store/src/throttle.rs
index fb90afcec9..58c476ab45 100644
--- a/object_store/src/throttle.rs
+++ b/object_store/src/throttle.rs
@@ -20,7 +20,9 @@ use parking_lot::Mutex;
 use std::ops::Range;
 use std::{convert::TryInto, sync::Arc};
 
-use crate::{path::Path, GetResult, ListResult, ObjectMeta, ObjectStore, 
Result};
+use crate::{
+    path::Path, GetResult, GetResultPayload, ListResult, ObjectMeta, 
ObjectStore, Result,
+};
 use crate::{GetOptions, MultipartId};
 use async_trait::async_trait;
 use bytes::Bytes;
@@ -301,15 +303,20 @@ fn usize_to_u32_saturate(x: usize) -> u32 {
 }
 
 fn throttle_get(result: GetResult, wait_get_per_byte: Duration) -> GetResult {
-    let s = match result {
-        GetResult::Stream(s) => s,
-        GetResult::File(_, _) => unimplemented!(),
+    let s = match result.payload {
+        GetResultPayload::Stream(s) => s,
+        GetResultPayload::File(_, _) => unimplemented!(),
     };
 
-    GetResult::Stream(throttle_stream(s, move |bytes| {
+    let stream = throttle_stream(s, move |bytes| {
         let bytes_len: u32 = usize_to_u32_saturate(bytes.len());
         wait_get_per_byte * bytes_len
-    }))
+    });
+
+    GetResult {
+        payload: GetResultPayload::Stream(stream),
+        ..result
+    }
 }
 
 fn throttle_stream<T: Send + 'static, E: Send + 'static, F>(
@@ -330,7 +337,7 @@ where
 #[cfg(test)]
 mod tests {
     use super::*;
-    use crate::{memory::InMemory, tests::*};
+    use crate::{memory::InMemory, tests::*, GetResultPayload};
     use bytes::Bytes;
     use futures::TryStreamExt;
     use tokio::time::Duration;
@@ -550,9 +557,9 @@ mod tests {
         let res = store.get(&path).await;
         if n_bytes.is_some() {
             // need to consume bytes to provoke sleep times
-            let s = match res.unwrap() {
-                GetResult::Stream(s) => s,
-                GetResult::File(_, _) => unimplemented!(),
+            let s = match res.unwrap().payload {
+                GetResultPayload::Stream(s) => s,
+                GetResultPayload::File(_, _) => unimplemented!(),
             };
 
             s.map_ok(|b| bytes::BytesMut::from(&b[..]))

Reply via email to