alamb commented on code in PR #4677:
URL: https://github.com/apache/arrow-rs/pull/4677#discussion_r1291185652
##########
object_store/src/local.rs:
##########
@@ -863,7 +866,56 @@ impl AsyncWrite for LocalUpload {
}
}
-pub(crate) fn read_range(file: &mut File, path: &PathBuf, range: Range<usize>)
-> Result<Bytes> {
+pub(crate) fn chunked_stream(
+ mut file: File,
+ path: PathBuf,
+ range: Range<usize>,
+ chunk_size: usize,
+) -> BoxStream<'static, Result<Bytes, super::Error>> {
+ futures::stream::once(async move {
+ let (file, path) = maybe_spawn_blocking(move || {
+ file.seek(SeekFrom::Start(range.start as _))
+ .map_err(|source| Error::Seek {
+ source,
+ path: path.clone(),
+ })?;
+ Ok((file, path))
+ })
+ .await?;
+
+ let stream = futures::stream::try_unfold(
Review Comment:
Do you know if the object_store tests have coverage for files that are
greater than 8KB in size? Aka is this code covered by tests?
##########
object_store/src/lib.rs:
##########
@@ -729,54 +719,64 @@ impl GetOptions {
}
/// Result for a get request
+#[derive(Debug)]
+pub struct GetResult {
Review Comment:
What is the reason for making the fields in this struct `pub`? If they are
all `pub` we can't add fields to `GetResult` in the future (such as optional
object store specific metadata, for example) without it being a breaking
change.
What do you think about leave the fields as non `pub` and add accessors /
and a
```rust
fn into_parts(self) -> (GetResultPayload, ObjectMeta) {
...
}
```
🤔
##########
object_store/src/local.rs:
##########
@@ -863,7 +866,56 @@ impl AsyncWrite for LocalUpload {
}
}
-pub(crate) fn read_range(file: &mut File, path: &PathBuf, range: Range<usize>)
-> Result<Bytes> {
+pub(crate) fn chunked_stream(
+ mut file: File,
+ path: PathBuf,
+ range: Range<usize>,
+ chunk_size: usize,
+) -> BoxStream<'static, Result<Bytes, super::Error>> {
+ futures::stream::once(async move {
Review Comment:
I was wondering about using
[tokio::fs](https://docs.rs/tokio/latest/tokio/fs/index.html) but it seems like
the warnings on that page are still fairly significant
##########
object_store/src/lib.rs:
##########
@@ -729,54 +719,64 @@ impl GetOptions {
}
/// Result for a get request
+#[derive(Debug)]
+pub struct GetResult {
Review Comment:
What is the reason for making the fields in this struct `pub`? If they are
all `pub` we can't add fields to `GetResult` in the future (such as optional
object store specific metadata, for example) without it being a breaking
change.
What do you think about leave the fields as non `pub` and add accessors /
and a
```rust
fn into_parts(self) -> (GetResultPayload, ObjectMeta) {
...
}
```
🤔
##########
object_store/src/lib.rs:
##########
@@ -729,54 +719,64 @@ impl GetOptions {
}
/// Result for a get request
+#[derive(Debug)]
+pub struct GetResult {
+ /// The [`GetResultPayload`]
+ pub payload: GetResultPayload,
+ /// The [`ObjectMeta`] for this object
+ pub meta: ObjectMeta,
+ /// The range of bytes returned by this request
+ pub range: Range<usize>,
Review Comment:
The alternate would be to make it optional (and read file length directly
from the metadata if it was set to `None`)?
If that is the tradeoff I agree that always including the sometimes
redundant `range` is a good choice
##########
object_store/src/memory.rs:
##########
@@ -136,17 +138,28 @@ impl ObjectStore for InMemory {
}
let (data, last_modified) = self.entry(location).await?;
options.check_modified(location, last_modified)?;
+ let meta = ObjectMeta {
+ location: location.clone(),
+ last_modified,
+ size: data.len(),
+ e_tag: None,
+ };
+ let (range, data) = match options.range {
+ Some(range) => {
+ ensure!(range.end <= data.len(), OutOfRangeSnafu);
Review Comment:
it occurs to me that these errors would be improved if the included the
ranges and lengths as values. I understand that this PR doesn't change the
behavior
##########
object_store/src/lib.rs:
##########
@@ -797,36 +797,12 @@ impl GetResult {
/// If not called from a tokio context, this will perform IO on the
current thread with
/// no additional complexity or overheads
pub fn into_stream(self) -> BoxStream<'static, Result<Bytes>> {
- match self {
+ match self.payload {
#[cfg(not(target_arch = "wasm32"))]
- Self::File(file, path) => {
- const CHUNK_SIZE: usize = 8 * 1024;
-
- futures::stream::try_unfold(
- (file, path, false),
- |(mut file, path, finished)| {
- maybe_spawn_blocking(move || {
- if finished {
- return Ok(None);
- }
-
- let mut buffer = Vec::with_capacity(CHUNK_SIZE);
- let read = file
- .by_ref()
- .take(CHUNK_SIZE as u64)
- .read_to_end(&mut buffer)
- .map_err(|e| local::Error::UnableToReadBytes {
- source: e,
- path: path.clone(),
- })?;
-
- Ok(Some((buffer.into(), (file, path, read !=
CHUNK_SIZE))))
- })
- },
- )
- .boxed()
+ GetResultPayload::File(file, path) => {
+ local::chunked_stream(file, path, self.range, 8 * 1024)
Review Comment:
I think keeping the name of `CHUNK_SIZE` for the `8 * 1024` would increase
this code's readability
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]