This is an automated email from the ASF dual-hosted git repository. xuanwo pushed a commit to branch lazy-reader in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
commit c88362cf94838c2afc6488215dcd5fb0b252db86 Author: Xuanwo <[email protected]> AuthorDate: Thu Oct 26 21:18:18 2023 +0800 Save Signed-off-by: Xuanwo <[email protected]> --- core/src/layers/complete.rs | 41 +++--- core/src/layers/madsim.rs | 5 +- core/src/layers/prometheus.rs | 38 ++--- core/src/layers/retry.rs | 2 +- core/src/raw/adapters/kv/backend.rs | 5 +- core/src/raw/adapters/typed_kv/backend.rs | 5 +- core/src/raw/oio/read/api.rs | 130 ++++++++++++++++- core/src/raw/oio/read/lazy_read.rs | 198 ++++++++++++++++++++++++++ core/src/raw/oio/read/mod.rs | 3 + core/src/raw/oio/read/range_read.rs | 2 +- core/src/raw/rps.rs | 25 +--- core/src/services/azblob/backend.rs | 6 +- core/src/services/azdls/backend.rs | 5 +- core/src/services/azfile/backend.rs | 5 +- core/src/services/cos/backend.rs | 5 +- core/src/services/dropbox/backend.rs | 5 +- core/src/services/fs/backend.rs | 4 +- core/src/services/ftp/backend.rs | 14 +- core/src/services/gcs/backend.rs | 3 +- core/src/services/gdrive/backend.rs | 18 +-- core/src/services/ghac/backend.rs | 5 +- core/src/services/hdfs/backend.rs | 4 +- core/src/services/http/backend.rs | 5 +- core/src/services/ipfs/backend.rs | 5 +- core/src/services/ipmfs/backend.rs | 5 +- core/src/services/obs/backend.rs | 5 +- core/src/services/onedrive/backend.rs | 5 +- core/src/services/oss/backend.rs | 5 +- core/src/services/s3/backend.rs | 5 +- core/src/services/sftp/backend.rs | 2 +- core/src/services/supabase/backend.rs | 5 +- core/src/services/vercel_artifacts/backend.rs | 5 +- core/src/services/wasabi/backend.rs | 5 +- core/src/services/webdav/backend.rs | 5 +- core/src/services/webhdfs/backend.rs | 5 +- core/src/types/operator/blocking_operator.rs | 23 +-- core/src/types/operator/operator.rs | 33 +---- 37 files changed, 417 insertions(+), 229 deletions(-) diff --git a/core/src/layers/complete.rs b/core/src/layers/complete.rs index 903b64e6b..0a0a0bd3f 100644 --- a/core/src/layers/complete.rs +++ b/core/src/layers/complete.rs @@ -27,13 +27,13 @@ use std::task::Poll; use async_trait::async_trait; use bytes::Bytes; -use crate::raw::oio::into_flat_page; -use crate::raw::oio::into_hierarchy_page; use crate::raw::oio::Entry; use crate::raw::oio::FlatPager; use crate::raw::oio::HierarchyPager; use crate::raw::oio::RangeReader; use crate::raw::oio::StreamableReader; +use crate::raw::oio::{into_flat_page, FileReader}; +use crate::raw::oio::{into_hierarchy_page, LazyReader}; use crate::raw::*; use crate::*; @@ -162,22 +162,24 @@ impl<A: Accessor> CompleteAccessor<A> { let seekable = capability.read_can_seek; let streamable = capability.read_can_next; - let (rp, r) = self.inner.read(path, args.clone()).await?; - match (seekable, streamable) { - (true, true) => Ok((rp, CompleteReader::AlreadyComplete(r))), + (true, true) => { + let r = LazyReader::new(self.inner.clone(), path, args); + Ok((RpRead::new(), CompleteReader::AlreadyComplete(r))) + } (true, false) => { - let r = oio::into_streamable_read(r, 256 * 1024); - Ok((rp, CompleteReader::NeedStreamable(r))) + let r = FileReader::new(self.inner.clone(), path, args); + + Ok((RpRead::new(), CompleteReader::NeedStreamable(r))) } _ => { let r = RangeReader::new(self.inner.clone(), path, args); if streamable { - Ok((rp, CompleteReader::NeedSeekable(r))) + Ok((RpRead::new(), CompleteReader::NeedSeekable(r))) } else { let r = oio::into_streamable_read(r, 256 * 1024); - Ok((rp, CompleteReader::NeedBoth(r))) + Ok((RpRead::new(), CompleteReader::NeedBoth(r))) } } } @@ -196,22 +198,23 @@ impl<A: Accessor> CompleteAccessor<A> { let seekable = capability.read_can_seek; let streamable = capability.read_can_next; - let (rp, r) = self.inner.blocking_read(path, args.clone())?; - match (seekable, streamable) { - (true, true) => Ok((rp, CompleteReader::AlreadyComplete(r))), + (true, true) => { + let r = LazyReader::new(self.inner.clone(), path, args); + Ok((RpRead::new(), CompleteReader::AlreadyComplete(r))) + } (true, false) => { - let r = oio::into_streamable_read(r, 256 * 1024); - Ok((rp, CompleteReader::NeedStreamable(r))) + let r = FileReader::new(self.inner.clone(), path, args); + Ok((RpRead::new(), CompleteReader::NeedStreamable(r))) } _ => { let r = RangeReader::new(self.inner.clone(), path, args); if streamable { - Ok((rp, CompleteReader::NeedSeekable(r))) + Ok((RpRead::new(), CompleteReader::NeedSeekable(r))) } else { let r = oio::into_streamable_read(r, 256 * 1024); - Ok((rp, CompleteReader::NeedBoth(r))) + Ok((RpRead::new(), CompleteReader::NeedBoth(r))) } } } @@ -546,9 +549,9 @@ impl<A: Accessor> LayeredAccessor for CompleteAccessor<A> { } pub enum CompleteReader<A: Accessor, R> { - AlreadyComplete(R), + AlreadyComplete(LazyReader<A, R>), NeedSeekable(RangeReader<A, R>), - NeedStreamable(StreamableReader<R>), + NeedStreamable(FileReader<A, R>), NeedBoth(StreamableReader<RangeReader<A, R>>), } @@ -788,7 +791,7 @@ mod tests { } async fn read(&self, _: &str, _: OpRead) -> Result<(RpRead, Self::Reader)> { - Ok((RpRead::new(0), Box::new(()))) + Ok((RpRead::new(), Box::new(()))) } async fn write(&self, _: &str, _: OpWrite) -> Result<(RpWrite, Self::Writer)> { diff --git a/core/src/layers/madsim.rs b/core/src/layers/madsim.rs index 6e2a3b6a3..1d1253baf 100644 --- a/core/src/layers/madsim.rs +++ b/core/src/layers/madsim.rs @@ -191,10 +191,7 @@ impl LayeredAccessor for MadsimAccessor { .downcast::<ReadResponse>() .expect("fail to downcast response to ReadResponse"); let content_length = resp.data.as_ref().map(|b| b.len()).unwrap_or(0); - Ok(( - RpRead::new(content_length as u64), - MadsimReader { data: resp.data }, - )) + Ok((RpRead::new(), MadsimReader { data: resp.data })) } #[cfg(not(madsim))] { diff --git a/core/src/layers/prometheus.rs b/core/src/layers/prometheus.rs index 4692da0a6..f26a4af2b 100644 --- a/core/src/layers/prometheus.rs +++ b/core/src/layers/prometheus.rs @@ -320,28 +320,18 @@ impl<A: Accessor> LayeredAccessor for PrometheusAccessor<A> { .with_label_values(&labels) .start_timer(); - let read_res = self - .inner - .read(path, args) - .map(|v| { - v.map(|(rp, r)| { - self.stats - .bytes_total - .with_label_values(&labels) - .observe(rp.metadata().content_length() as f64); - ( - rp, - PrometheusMetricWrapper::new( - r, - Operation::Read, - self.stats.clone(), - self.scheme, - &path.to_string(), - ), - ) - }) - }) - .await; + let read_res = self.inner.read(path, args).await.map(|(rp, r)| { + ( + rp, + PrometheusMetricWrapper::new( + r, + Operation::Read, + self.stats.clone(), + self.scheme, + &path.to_string(), + ), + ) + }); timer.observe_duration(); read_res.map_err(|e| { self.stats.increment_errors_total(Operation::Read, e.kind()); @@ -546,10 +536,6 @@ impl<A: Accessor> LayeredAccessor for PrometheusAccessor<A> { .with_label_values(&labels) .start_timer(); let result = self.inner.blocking_read(path, args).map(|(rp, r)| { - self.stats - .bytes_total - .with_label_values(&labels) - .observe(rp.metadata().content_length() as f64); ( rp, PrometheusMetricWrapper::new( diff --git a/core/src/layers/retry.rs b/core/src/layers/retry.rs index 67239d3fd..9ae6c877d 100644 --- a/core/src/layers/retry.rs +++ b/core/src/layers/retry.rs @@ -1164,7 +1164,7 @@ mod tests { async fn read(&self, _: &str, _: OpRead) -> Result<(RpRead, Self::Reader)> { Ok(( - RpRead::new(13), + RpRead::new(), MockReader { attempt: self.attempt.clone(), pos: 0, diff --git a/core/src/raw/adapters/kv/backend.rs b/core/src/raw/adapters/kv/backend.rs index 94db4de8d..799dd1be5 100644 --- a/core/src/raw/adapters/kv/backend.rs +++ b/core/src/raw/adapters/kv/backend.rs @@ -129,8 +129,7 @@ impl<S: Adapter> Accessor for Backend<S> { let bs = self.apply_range(bs, args.range()); - let length = bs.len(); - Ok((RpRead::new(length as u64), oio::Cursor::from(bs))) + Ok((RpRead::new(), oio::Cursor::from(bs))) } fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> { @@ -142,7 +141,7 @@ impl<S: Adapter> Accessor for Backend<S> { }; let bs = self.apply_range(bs, args.range()); - Ok((RpRead::new(bs.len() as u64), oio::Cursor::from(bs))) + Ok((RpRead::new(), oio::Cursor::from(bs))) } async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, Self::Writer)> { diff --git a/core/src/raw/adapters/typed_kv/backend.rs b/core/src/raw/adapters/typed_kv/backend.rs index d5313b8e0..ca872346a 100644 --- a/core/src/raw/adapters/typed_kv/backend.rs +++ b/core/src/raw/adapters/typed_kv/backend.rs @@ -135,8 +135,7 @@ impl<S: Adapter> Accessor for Backend<S> { let bs = self.apply_range(bs, args.range()); - let length = bs.len(); - Ok((RpRead::new(length as u64), oio::Cursor::from(bs))) + Ok((RpRead::new(), oio::Cursor::from(bs))) } fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> { @@ -149,7 +148,7 @@ impl<S: Adapter> Accessor for Backend<S> { }; let bs = self.apply_range(bs, args.range()); - Ok((RpRead::new(bs.len() as u64), oio::Cursor::from(bs))) + Ok((RpRead::new(), oio::Cursor::from(bs))) } async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { diff --git a/core/src/raw/oio/read/api.rs b/core/src/raw/oio/read/api.rs index 79fd77c33..436c8447f 100644 --- a/core/src/raw/oio/read/api.rs +++ b/core/src/raw/oio/read/api.rs @@ -17,10 +17,10 @@ use std::fmt::Display; use std::fmt::Formatter; -use std::io; use std::pin::Pin; -use std::task::Context; use std::task::Poll; +use std::task::{ready, Context}; +use std::{cmp, io}; use bytes::Bytes; use futures::Future; @@ -198,6 +198,16 @@ pub trait ReadExt: Read { fn next(&mut self) -> NextFuture<'_, Self> { NextFuture { reader: self } } + + /// Build a future for `read_to_end`. + fn read_to_end<'a>(&'a mut self, buf: &'a mut Vec<u8>) -> ReadToEndFuture<'a, Self> { + let start = buf.len(); + ReadToEndFuture { + reader: self, + buf, + start, + } + } } /// Make this future `!Unpin` for compatibility with async trait methods. @@ -256,6 +266,82 @@ where } } +/// Make this future `!Unpin` for compatibility with async trait methods. +#[pin_project(!Unpin)] +pub struct ReadToEndFuture<'a, R: Read + Unpin + ?Sized> { + reader: &'a mut R, + buf: &'a mut Vec<u8>, + start: usize, +} + +impl<R> Future for ReadToEndFuture<'_, R> +where + R: Read + Unpin + ?Sized, +{ + type Output = Result<usize>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<usize>> { + let this = self.project(); + + let mut g = ReadToEndGuard { + len: this.buf.len(), + buf: this.buf, + next: MIN_READ_TO_END_GROW_SIZE, + }; + + loop { + if g.buf.capacity() - g.buf.len() < g.next { + g.buf.reserve(g.next); + unsafe { + g.buf.set_len(g.buf.capacity()); + } + } + + let buf = &mut g.buf[g.len..]; + match ready!(this.reader.poll_read(cx, buf)) { + Ok(0) => return Poll::Ready(Ok(g.len - *this.start)), + Ok(n) => { + g.next = if n >= g.next { + cmp::min(g.next.saturating_mul(2), MAX_READ_TO_END_GROW_SIZE) + } else if n >= g.next / 2 { + g.next + } else { + cmp::max(g.next.saturating_div(2), MIN_READ_TO_END_GROW_SIZE) + }; + // We can't allow bogus values from read. If it is too large, the returned vec could have its length + // set past its capacity, or if it overflows the vec could be shortened which could create an invalid + // string if this is called via read_to_string. + assert!(n <= buf.len()); + g.len += n; + } + Err(e) => return Poll::Ready(Err(e)), + } + } + } +} + +const MIN_READ_TO_END_GROW_SIZE: usize = 8 * 1024; +const MAX_READ_TO_END_GROW_SIZE: usize = 4 * 1024 * 1024; + +/// ReadToEndGuard makes sure that the buf length is maintained correctly. +struct ReadToEndGuard<'a> { + buf: &'a mut Vec<u8>, + /// Store the real length of buf. + len: usize, + next: usize, +} + +impl Drop for ReadToEndGuard<'_> { + /// # Safety + /// + /// We make sure that the length of buf is maintained correctly. + fn drop(&mut self) { + unsafe { + self.buf.set_len(self.len); + } + } +} + /// BlockingReader is a boxed dyn `BlockingRead`. pub type BlockingReader = Box<dyn BlockingRead>; @@ -278,6 +364,46 @@ pub trait BlockingRead: Send + Sync { /// Iterating [`Bytes`] from underlying reader. fn next(&mut self) -> Option<Result<Bytes>>; + + /// Read all data of current reader to the end of buf. + fn read_to_end(&mut self, buf: &mut Vec<u8>) -> Result<usize> { + let start_len = buf.len(); + let mut g = ReadToEndGuard { + len: buf.len(), + buf, + next: MIN_READ_TO_END_GROW_SIZE, + }; + + loop { + if g.buf.capacity() - g.buf.len() < g.next { + g.buf.reserve(g.next); + unsafe { + g.buf.set_len(g.buf.capacity()); + } + } + + let buf = &mut g.buf[g.len..]; + match self.read(buf) { + Ok(0) => return Ok(g.len - start_len), + Ok(n) => { + g.next = if n >= g.next { + cmp::min(g.next.saturating_mul(2), MAX_READ_TO_END_GROW_SIZE) + } else if n >= g.next / 2 { + g.next + } else { + cmp::max(g.next.saturating_div(2), MIN_READ_TO_END_GROW_SIZE) + }; + + // We can't allow bogus values from read. If it is too large, the returned vec could have its length + // set past its capacity, or if it overflows the vec could be shortened which could create an invalid + // string if this is called via read_to_string. + assert!(n <= buf.len()); + g.len += n; + } + Err(e) => return Err(e), + } + } + } } impl BlockingRead for () { diff --git a/core/src/raw/oio/read/lazy_read.rs b/core/src/raw/oio/read/lazy_read.rs new file mode 100644 index 000000000..89705deff --- /dev/null +++ b/core/src/raw/oio/read/lazy_read.rs @@ -0,0 +1,198 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::raw::*; +use crate::*; +use bytes::Bytes; +use futures::future::BoxFuture; +use futures::Future; +use std::io::SeekFrom; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{ready, Context, Poll}; + +/// LazyReader implements [`oio::Read`] in a lazy way. +/// +/// The real requests are send when users calling read or seek. +pub struct LazyReader<A: Accessor, R> { + acc: Arc<A>, + path: Arc<String>, + op: OpRead, + state: State<R>, +} + +enum State<R> { + Idle, + Send(BoxFuture<'static, Result<(RpRead, R)>>), + Read(R), +} + +/// Safety: State will only be accessed under &mut. +unsafe impl<R> Sync for State<R> {} + +impl<A, R> LazyReader<A, R> +where + A: Accessor, +{ + /// Create a new [`oio::Reader`] with lazy support. + pub fn new(acc: Arc<A>, path: &str, op: OpRead) -> LazyReader<A, R> { + LazyReader { + acc, + path: Arc::new(path.to_string()), + op, + + state: State::<R>::Idle, + } + } +} + +impl<A, R> LazyReader<A, R> +where + A: Accessor<Reader = R>, + R: oio::Read, +{ + fn read_future(&self) -> BoxFuture<'static, Result<(RpRead, R)>> { + let acc = self.acc.clone(); + let path = self.path.clone(); + let op = self.op.clone(); + + Box::pin(async move { acc.read(&path, op).await }) + } +} + +impl<A, R> oio::Read for LazyReader<A, R> +where + A: Accessor<Reader = R>, + R: oio::Read, +{ + fn poll_read(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<Result<usize>> { + match &mut self.state { + State::Idle => { + self.state = State::Send(self.read_future()); + self.poll_read(cx, buf) + } + State::Send(fut) => { + let (_, r) = ready!(Pin::new(fut).poll(cx)).map_err(|err| { + // If read future returns an error, we should reset + // state to Idle so that we can retry it. + self.state = State::Idle; + err + })?; + self.state = State::Read(r); + self.poll_read(cx, buf) + } + State::Read(r) => r.poll_read(cx, buf), + } + } + + fn poll_seek(&mut self, cx: &mut Context<'_>, pos: SeekFrom) -> Poll<Result<u64>> { + match &mut self.state { + State::Idle => { + self.state = State::Send(self.read_future()); + self.poll_seek(cx, pos) + } + State::Send(fut) => { + let (_, r) = ready!(Pin::new(fut).poll(cx)).map_err(|err| { + // If read future returns an error, we should reset + // state to Idle so that we can retry it. + self.state = State::Idle; + err + })?; + self.state = State::Read(r); + self.poll_seek(cx, pos) + } + State::Read(r) => r.poll_seek(cx, pos), + } + } + + fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes>>> { + match &mut self.state { + State::Idle => { + self.state = State::Send(self.read_future()); + self.poll_next(cx) + } + State::Send(fut) => { + let (_, r) = ready!(Pin::new(fut).poll(cx)).map_err(|err| { + // If read future returns an error, we should reset + // state to Idle so that we can retry it. + self.state = State::Idle; + err + })?; + self.state = State::Read(r); + self.poll_next(cx) + } + State::Read(r) => r.poll_next(cx), + } + } +} + +impl<A, R> oio::BlockingRead for LazyReader<A, R> +where + A: Accessor<BlockingReader = R>, + R: oio::BlockingRead, +{ + fn read(&mut self, buf: &mut [u8]) -> Result<usize> { + match &mut self.state { + State::Idle => { + let (_, r) = self.acc.blocking_read(&self.path, self.op.clone())?; + self.state = State::Read(r); + self.read(buf) + } + State::Read(r) => r.read(buf), + State::Send(_) => { + unreachable!( + "It's invalid to go into State::Send for BlockingRead, please report this bug" + ) + } + } + } + + fn seek(&mut self, pos: SeekFrom) -> Result<u64> { + match &mut self.state { + State::Idle => { + let (_, r) = self.acc.blocking_read(&self.path, self.op.clone())?; + self.state = State::Read(r); + self.seek(pos) + } + State::Read(r) => r.seek(pos), + State::Send(_) => { + unreachable!( + "It's invalid to go into State::Send for BlockingRead, please report this bug" + ) + } + } + } + + fn next(&mut self) -> Option<Result<Bytes>> { + match &mut self.state { + State::Idle => { + let r = match self.acc.blocking_read(&self.path, self.op.clone()) { + Ok((_, r)) => r, + Err(err) => return Some(Err(err)), + }; + self.state = State::Read(r); + self.next() + } + State::Read(r) => r.next(), + State::Send(_) => { + unreachable!( + "It's invalid to go into State::Send for BlockingRead, please report this bug" + ) + } + } + } +} diff --git a/core/src/raw/oio/read/mod.rs b/core/src/raw/oio/read/mod.rs index 16510636a..5f7d5d93a 100644 --- a/core/src/raw/oio/read/mod.rs +++ b/core/src/raw/oio/read/mod.rs @@ -45,3 +45,6 @@ pub use tokio_read::TokioReader; mod std_read; pub use std_read::StdReader; + +mod lazy_read; +pub use lazy_read::LazyReader; diff --git a/core/src/raw/oio/read/range_read.rs b/core/src/raw/oio/read/range_read.rs index 870285e42..e6967cea5 100644 --- a/core/src/raw/oio/read/range_read.rs +++ b/core/src/raw/oio/read/range_read.rs @@ -637,7 +637,7 @@ mod tests { let bs = args.range().apply_on_bytes(self.data.clone()); Ok(( - RpRead::new(bs.len() as u64), + RpRead::new(), MockReader { inner: futures::io::Cursor::new(bs.into()), }, diff --git a/core/src/raw/rps.rs b/core/src/raw/rps.rs index bc45d1457..17e4470ae 100644 --- a/core/src/raw/rps.rs +++ b/core/src/raw/rps.rs @@ -98,31 +98,12 @@ impl<T: Default> From<PresignedRequest> for Request<T> { /// Reply for `read` operation. #[derive(Debug, Clone)] -pub struct RpRead { - meta: Metadata, -} +pub struct RpRead {} impl RpRead { /// Create a new reply for `read`. - pub fn new(content_length: u64) -> Self { - RpRead { - meta: Metadata::new(EntryMode::FILE).with_content_length(content_length), - } - } - - /// Create reply read with existing metadata. - pub fn with_metadata(meta: Metadata) -> Self { - RpRead { meta } - } - - /// Get a ref of metadata. - pub fn metadata(&self) -> &Metadata { - &self.meta - } - - /// Consume reply to get the meta. - pub fn into_metadata(self) -> Metadata { - self.meta + pub fn new() -> Self { + RpRead {} } } diff --git a/core/src/services/azblob/backend.rs b/core/src/services/azblob/backend.rs index de6d5e3e9..375a375d8 100644 --- a/core/src/services/azblob/backend.rs +++ b/core/src/services/azblob/backend.rs @@ -586,11 +586,7 @@ impl Accessor for AzblobBackend { let status = resp.status(); match status { - StatusCode::OK | StatusCode::PARTIAL_CONTENT => { - let meta = parse_into_metadata(path, resp.headers())?; - - Ok((RpRead::with_metadata(meta), resp.into_body())) - } + StatusCode::OK | StatusCode::PARTIAL_CONTENT => Ok((RpRead::new(), resp.into_body())), _ => Err(parse_error(resp).await?), } } diff --git a/core/src/services/azdls/backend.rs b/core/src/services/azdls/backend.rs index 9351a2522..1ab176255 100644 --- a/core/src/services/azdls/backend.rs +++ b/core/src/services/azdls/backend.rs @@ -292,10 +292,7 @@ impl Accessor for AzdlsBackend { let status = resp.status(); match status { - StatusCode::OK | StatusCode::PARTIAL_CONTENT => { - let meta = parse_into_metadata(path, resp.headers())?; - Ok((RpRead::with_metadata(meta), resp.into_body())) - } + StatusCode::OK | StatusCode::PARTIAL_CONTENT => Ok((RpRead::new(), resp.into_body())), _ => Err(parse_error(resp).await?), } } diff --git a/core/src/services/azfile/backend.rs b/core/src/services/azfile/backend.rs index c7d8ad41c..78af5035d 100644 --- a/core/src/services/azfile/backend.rs +++ b/core/src/services/azfile/backend.rs @@ -310,10 +310,7 @@ impl Accessor for AzfileBackend { let status = resp.status(); match status { - StatusCode::OK | StatusCode::PARTIAL_CONTENT => { - let meta = parse_into_metadata(path, resp.headers())?; - Ok((RpRead::with_metadata(meta), resp.into_body())) - } + StatusCode::OK | StatusCode::PARTIAL_CONTENT => Ok((RpRead::new(), resp.into_body())), _ => Err(parse_error(resp).await?), } } diff --git a/core/src/services/cos/backend.rs b/core/src/services/cos/backend.rs index d376abcb7..5ffe4be23 100644 --- a/core/src/services/cos/backend.rs +++ b/core/src/services/cos/backend.rs @@ -332,10 +332,7 @@ impl Accessor for CosBackend { let status = resp.status(); match status { - StatusCode::OK | StatusCode::PARTIAL_CONTENT => { - let meta = parse_into_metadata(path, resp.headers())?; - Ok((RpRead::with_metadata(meta), resp.into_body())) - } + StatusCode::OK | StatusCode::PARTIAL_CONTENT => Ok((RpRead::new(), resp.into_body())), _ => Err(parse_error(resp).await?), } } diff --git a/core/src/services/dropbox/backend.rs b/core/src/services/dropbox/backend.rs index 8400b3300..bbe3b0b18 100644 --- a/core/src/services/dropbox/backend.rs +++ b/core/src/services/dropbox/backend.rs @@ -97,10 +97,7 @@ impl Accessor for DropboxBackend { let resp = self.core.dropbox_get(path, args).await?; let status = resp.status(); match status { - StatusCode::OK | StatusCode::PARTIAL_CONTENT => { - let meta = parse_into_metadata(path, resp.headers())?; - Ok((RpRead::with_metadata(meta), resp.into_body())) - } + StatusCode::OK | StatusCode::PARTIAL_CONTENT => Ok((RpRead::new(), resp.into_body())), _ => Err(parse_error(resp).await?), } } diff --git a/core/src/services/fs/backend.rs b/core/src/services/fs/backend.rs index 034d75e03..4b0d14207 100644 --- a/core/src/services/fs/backend.rs +++ b/core/src/services/fs/backend.rs @@ -327,7 +327,7 @@ impl Accessor for FsBackend { } let r = oio::TokioReader::new(f); - Ok((RpRead::new(0), r)) + Ok((RpRead::new(), r)) } async fn write(&self, path: &str, op: OpWrite) -> Result<(RpWrite, Self::Writer)> { @@ -500,7 +500,7 @@ impl Accessor for FsBackend { let r = oio::StdReader::new(f); - Ok((RpRead::new(0), r)) + Ok((RpRead::new(), r)) } fn blocking_write(&self, path: &str, op: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> { diff --git a/core/src/services/ftp/backend.rs b/core/src/services/ftp/backend.rs index 2fe8ffe9a..0fe5a1778 100644 --- a/core/src/services/ftp/backend.rs +++ b/core/src/services/ftp/backend.rs @@ -15,7 +15,6 @@ // specific language governing permissions and limitations // under the License. -use std::cmp::min; use std::collections::HashMap; use std::fmt::Debug; use std::fmt::Formatter; @@ -319,37 +318,38 @@ impl Accessor for FtpBackend { return Ok(RpCreateDir::default()); } + /// TODO: migrate to FileReader maybe? async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> { let mut ftp_stream = self.ftp_connect(Operation::Read).await?; let meta = self.ftp_stat(path).await?; let br = args.range(); - let (r, size): (Box<dyn AsyncRead + Send + Unpin>, _) = match (br.offset(), br.size()) { + let r: Box<dyn AsyncRead + Send + Unpin> = match (br.offset(), br.size()) { (Some(offset), Some(size)) => { ftp_stream.resume_transfer(offset as usize).await?; let ds = ftp_stream.retr_as_stream(path).await?.take(size); - (Box::new(ds), min(size, meta.size() as u64 - offset)) + Box::new(ds) } (Some(offset), None) => { ftp_stream.resume_transfer(offset as usize).await?; let ds = ftp_stream.retr_as_stream(path).await?; - (Box::new(ds), meta.size() as u64 - offset) + Box::new(ds) } (None, Some(size)) => { ftp_stream .resume_transfer((meta.size() as u64 - size) as usize) .await?; let ds = ftp_stream.retr_as_stream(path).await?; - (Box::new(ds), size) + Box::new(ds) } (None, None) => { let ds = ftp_stream.retr_as_stream(path).await?; - (Box::new(ds), meta.size() as u64) + Box::new(ds) } }; - Ok((RpRead::new(size), FtpReader::new(r, ftp_stream))) + Ok((RpRead::new(), FtpReader::new(r, ftp_stream))) } async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, Self::Writer)> { diff --git a/core/src/services/gcs/backend.rs b/core/src/services/gcs/backend.rs index e65fb155d..c667920d9 100644 --- a/core/src/services/gcs/backend.rs +++ b/core/src/services/gcs/backend.rs @@ -389,8 +389,7 @@ impl Accessor for GcsBackend { let resp = self.core.gcs_get_object(path, &args).await?; if resp.status().is_success() { - let meta = parse_into_metadata(path, resp.headers())?; - Ok((RpRead::with_metadata(meta), resp.into_body())) + Ok((RpRead::new(), resp.into_body())) } else { Err(parse_error(resp).await?) } diff --git a/core/src/services/gdrive/backend.rs b/core/src/services/gdrive/backend.rs index 879eab7cc..53ab999ea 100644 --- a/core/src/services/gdrive/backend.rs +++ b/core/src/services/gdrive/backend.rs @@ -118,26 +118,12 @@ impl Accessor for GdriveBackend { } async fn read(&self, path: &str, _args: OpRead) -> Result<(RpRead, Self::Reader)> { - // We need to request for metadata and body separately here. - // Request for metadata first to check if the file exists. - let resp = self.core.gdrive_stat(path).await?; + let resp = self.core.gdrive_get(path).await?; let status = resp.status(); match status { - StatusCode::OK => { - let body = resp.into_body().bytes().await?; - let meta = self.parse_metadata(body)?; - - let resp = self.core.gdrive_get(path).await?; - - let status = resp.status(); - - match status { - StatusCode::OK => Ok((RpRead::with_metadata(meta), resp.into_body())), - _ => Err(parse_error(resp).await?), - } - } + StatusCode::OK => Ok((RpRead::new(), resp.into_body())), _ => Err(parse_error(resp).await?), } } diff --git a/core/src/services/ghac/backend.rs b/core/src/services/ghac/backend.rs index 83c6a46d6..941d89ce7 100644 --- a/core/src/services/ghac/backend.rs +++ b/core/src/services/ghac/backend.rs @@ -350,10 +350,7 @@ impl Accessor for GhacBackend { let status = resp.status(); match status { - StatusCode::OK | StatusCode::PARTIAL_CONTENT => { - let meta = parse_into_metadata(path, resp.headers())?; - Ok((RpRead::with_metadata(meta), resp.into_body())) - } + StatusCode::OK | StatusCode::PARTIAL_CONTENT => Ok((RpRead::new(), resp.into_body())), _ => Err(parse_error(resp).await?), } } diff --git a/core/src/services/hdfs/backend.rs b/core/src/services/hdfs/backend.rs index fed635b15..a52d9c284 100644 --- a/core/src/services/hdfs/backend.rs +++ b/core/src/services/hdfs/backend.rs @@ -213,7 +213,7 @@ impl Accessor for HdfsBackend { let r = oio::FuturesReader::new(f); - Ok((RpRead::new(0), r)) + Ok((RpRead::new(), r)) } async fn write(&self, path: &str, op: OpWrite) -> Result<(RpWrite, Self::Writer)> { @@ -335,7 +335,7 @@ impl Accessor for HdfsBackend { let r = oio::StdReader::new(f); - Ok((RpRead::new(0), r)) + Ok((RpRead::new(), r)) } fn blocking_write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> { diff --git a/core/src/services/http/backend.rs b/core/src/services/http/backend.rs index af50631cc..2877d5aa2 100644 --- a/core/src/services/http/backend.rs +++ b/core/src/services/http/backend.rs @@ -237,10 +237,7 @@ impl Accessor for HttpBackend { let status = resp.status(); match status { - StatusCode::OK | StatusCode::PARTIAL_CONTENT => { - let meta = parse_into_metadata(path, resp.headers())?; - Ok((RpRead::with_metadata(meta), resp.into_body())) - } + StatusCode::OK | StatusCode::PARTIAL_CONTENT => Ok((RpRead::new(), resp.into_body())), _ => Err(parse_error(resp).await?), } } diff --git a/core/src/services/ipfs/backend.rs b/core/src/services/ipfs/backend.rs index ebf1e327e..6a557d6a4 100644 --- a/core/src/services/ipfs/backend.rs +++ b/core/src/services/ipfs/backend.rs @@ -194,10 +194,7 @@ impl Accessor for IpfsBackend { let status = resp.status(); match status { - StatusCode::OK | StatusCode::PARTIAL_CONTENT => { - let meta = parse_into_metadata(path, resp.headers())?; - Ok((RpRead::with_metadata(meta), resp.into_body())) - } + StatusCode::OK | StatusCode::PARTIAL_CONTENT => Ok((RpRead::new(), resp.into_body())), _ => Err(parse_error(resp).await?), } } diff --git a/core/src/services/ipmfs/backend.rs b/core/src/services/ipmfs/backend.rs index e7999767b..303b1db8d 100644 --- a/core/src/services/ipmfs/backend.rs +++ b/core/src/services/ipmfs/backend.rs @@ -112,10 +112,7 @@ impl Accessor for IpmfsBackend { let status = resp.status(); match status { - StatusCode::OK => { - let meta = parse_into_metadata(path, resp.headers())?; - Ok((RpRead::with_metadata(meta), resp.into_body())) - } + StatusCode::OK => Ok((RpRead::new(), resp.into_body())), _ => Err(parse_error(resp).await?), } } diff --git a/core/src/services/obs/backend.rs b/core/src/services/obs/backend.rs index aa5a86864..18bcefb52 100644 --- a/core/src/services/obs/backend.rs +++ b/core/src/services/obs/backend.rs @@ -359,10 +359,7 @@ impl Accessor for ObsBackend { let status = resp.status(); match status { - StatusCode::OK | StatusCode::PARTIAL_CONTENT => { - let meta = parse_into_metadata(path, resp.headers())?; - Ok((RpRead::with_metadata(meta), resp.into_body())) - } + StatusCode::OK | StatusCode::PARTIAL_CONTENT => Ok((RpRead::new(), resp.into_body())), _ => Err(parse_error(resp).await?), } } diff --git a/core/src/services/onedrive/backend.rs b/core/src/services/onedrive/backend.rs index 0a7952e4a..8b1959b9b 100644 --- a/core/src/services/onedrive/backend.rs +++ b/core/src/services/onedrive/backend.rs @@ -93,10 +93,7 @@ impl Accessor for OnedriveBackend { let status = resp.status(); match status { - StatusCode::OK | StatusCode::PARTIAL_CONTENT => { - let meta = parse_into_metadata(path, resp.headers())?; - Ok((RpRead::with_metadata(meta), resp.into_body())) - } + StatusCode::OK | StatusCode::PARTIAL_CONTENT => Ok((RpRead::new(), resp.into_body())), _ => Err(parse_error(resp).await?), } diff --git a/core/src/services/oss/backend.rs b/core/src/services/oss/backend.rs index 8ce0d65d8..043993699 100644 --- a/core/src/services/oss/backend.rs +++ b/core/src/services/oss/backend.rs @@ -472,10 +472,7 @@ impl Accessor for OssBackend { let status = resp.status(); match status { - StatusCode::OK | StatusCode::PARTIAL_CONTENT => { - let meta = parse_into_metadata(path, resp.headers())?; - Ok((RpRead::with_metadata(meta), resp.into_body())) - } + StatusCode::OK | StatusCode::PARTIAL_CONTENT => Ok((RpRead::new(), resp.into_body())), _ => Err(parse_error(resp).await?), } } diff --git a/core/src/services/s3/backend.rs b/core/src/services/s3/backend.rs index f9e160502..e6b862982 100644 --- a/core/src/services/s3/backend.rs +++ b/core/src/services/s3/backend.rs @@ -969,10 +969,7 @@ impl Accessor for S3Backend { let status = resp.status(); match status { - StatusCode::OK | StatusCode::PARTIAL_CONTENT => { - let meta = parse_into_metadata(path, resp.headers())?; - Ok((RpRead::with_metadata(meta), resp.into_body())) - } + StatusCode::OK | StatusCode::PARTIAL_CONTENT => Ok((RpRead::new(), resp.into_body())), _ => Err(parse_error(resp).await?), } } diff --git a/core/src/services/sftp/backend.rs b/core/src/services/sftp/backend.rs index 5c67dab76..792e27f49 100644 --- a/core/src/services/sftp/backend.rs +++ b/core/src/services/sftp/backend.rs @@ -300,7 +300,7 @@ impl Accessor for SftpBackend { // - `oio::TokioReader::new(x)` makes it a `oio::TokioReader` which implements `oio::Read`. let r = oio::TokioReader::new(Box::pin(TokioCompatFile::new(f))); - Ok((RpRead::new(0), r)) + Ok((RpRead::new(), r)) } async fn write(&self, path: &str, op: OpWrite) -> Result<(RpWrite, Self::Writer)> { diff --git a/core/src/services/supabase/backend.rs b/core/src/services/supabase/backend.rs index a5d0d3db1..b96bbd3ef 100644 --- a/core/src/services/supabase/backend.rs +++ b/core/src/services/supabase/backend.rs @@ -215,10 +215,7 @@ impl Accessor for SupabaseBackend { let status = resp.status(); match status { - StatusCode::OK | StatusCode::PARTIAL_CONTENT => { - let meta = parse_into_metadata(path, resp.headers())?; - Ok((RpRead::with_metadata(meta), resp.into_body())) - } + StatusCode::OK | StatusCode::PARTIAL_CONTENT => Ok((RpRead::new(), resp.into_body())), _ => Err(parse_error(resp).await?), } } diff --git a/core/src/services/vercel_artifacts/backend.rs b/core/src/services/vercel_artifacts/backend.rs index 9a0ae95cf..432b82b68 100644 --- a/core/src/services/vercel_artifacts/backend.rs +++ b/core/src/services/vercel_artifacts/backend.rs @@ -74,10 +74,7 @@ impl Accessor for VercelArtifactsBackend { let status = resp.status(); match status { - StatusCode::OK | StatusCode::PARTIAL_CONTENT => { - let meta = parse_into_metadata(path, resp.headers())?; - Ok((RpRead::with_metadata(meta), resp.into_body())) - } + StatusCode::OK | StatusCode::PARTIAL_CONTENT => Ok((RpRead::new(), resp.into_body())), _ => Err(parse_error(resp).await?), } diff --git a/core/src/services/wasabi/backend.rs b/core/src/services/wasabi/backend.rs index a7a41042f..c2f746e77 100644 --- a/core/src/services/wasabi/backend.rs +++ b/core/src/services/wasabi/backend.rs @@ -738,10 +738,7 @@ impl Accessor for WasabiBackend { let status = resp.status(); match status { - StatusCode::OK | StatusCode::PARTIAL_CONTENT => { - let meta = parse_into_metadata(path, resp.headers())?; - Ok((RpRead::with_metadata(meta), resp.into_body())) - } + StatusCode::OK | StatusCode::PARTIAL_CONTENT => Ok((RpRead::new(), resp.into_body())), _ => Err(parse_error(resp).await?), } } diff --git a/core/src/services/webdav/backend.rs b/core/src/services/webdav/backend.rs index 4c4cf6b83..24c80a3da 100644 --- a/core/src/services/webdav/backend.rs +++ b/core/src/services/webdav/backend.rs @@ -267,10 +267,7 @@ impl Accessor for WebdavBackend { let resp = self.webdav_get(path, args).await?; let status = resp.status(); match status { - StatusCode::OK | StatusCode::PARTIAL_CONTENT => { - let meta = parse_into_metadata(path, resp.headers())?; - Ok((RpRead::with_metadata(meta), resp.into_body())) - } + StatusCode::OK | StatusCode::PARTIAL_CONTENT => Ok((RpRead::new(), resp.into_body())), _ => Err(parse_error(resp).await?), } } diff --git a/core/src/services/webhdfs/backend.rs b/core/src/services/webhdfs/backend.rs index 4adbc6ab9..3863b2dd3 100644 --- a/core/src/services/webhdfs/backend.rs +++ b/core/src/services/webhdfs/backend.rs @@ -465,10 +465,7 @@ impl Accessor for WebhdfsBackend { let range = args.range(); let resp = self.webhdfs_read_file(path, range).await?; match resp.status() { - StatusCode::OK | StatusCode::PARTIAL_CONTENT => { - let meta = parse_into_metadata(path, resp.headers())?; - Ok((RpRead::with_metadata(meta), resp.into_body())) - } + StatusCode::OK | StatusCode::PARTIAL_CONTENT => Ok((RpRead::new(), resp.into_body())), _ => Err(parse_error(resp).await?), } } diff --git a/core/src/types/operator/blocking_operator.rs b/core/src/types/operator/blocking_operator.rs index 3dfa0d121..f55a1c48a 100644 --- a/core/src/types/operator/blocking_operator.rs +++ b/core/src/types/operator/blocking_operator.rs @@ -15,11 +15,10 @@ // specific language governing permissions and limitations // under the License. -use std::io::Read; - use bytes::Bytes; use super::operator_functions::*; +use crate::raw::oio::BlockingRead; use crate::raw::oio::WriteBuf; use crate::raw::*; use crate::*; @@ -339,22 +338,12 @@ impl BlockingOperator { ); } - let (rp, mut s) = inner.blocking_read(&path, args)?; - let mut buffer = Vec::with_capacity(rp.into_metadata().content_length() as usize); + let (_, mut s) = inner.blocking_read(&path, args)?; - match s.read_to_end(&mut buffer) { - Ok(n) => { - buffer.truncate(n); - Ok(buffer) - } - Err(err) => Err( - Error::new(ErrorKind::Unexpected, "blocking read_with failed") - .with_operation("BlockingOperator::read_with") - .with_context("service", inner.info().scheme().into_static()) - .with_context("path", &path) - .set_source(err), - ), - } + let mut buf = Vec::new(); + s.read_to_end(&mut buf)?; + + Ok(buf) }, )) } diff --git a/core/src/types/operator/operator.rs b/core/src/types/operator/operator.rs index 00bb13e10..1abb1f027 100644 --- a/core/src/types/operator/operator.rs +++ b/core/src/types/operator/operator.rs @@ -20,15 +20,13 @@ use std::time::Duration; use bytes::Buf; use bytes::Bytes; use futures::stream; -use futures::AsyncReadExt; use futures::Stream; use futures::StreamExt; use futures::TryStreamExt; -use tokio::io::ReadBuf; use super::BlockingOperator; use crate::operator_futures::*; -use crate::raw::oio::WriteExt; +use crate::raw::oio::{ReadExt, WriteExt}; use crate::raw::*; use crate::*; @@ -368,32 +366,11 @@ impl Operator { .with_context("path", &path)); } - let br = args.range(); - let (rp, mut s) = inner.read(&path, args).await?; + let (_, mut s) = inner.read(&path, args).await?; + let mut buf = Vec::new(); + s.read_to_end(&mut buf).await?; - let length = rp.into_metadata().content_length() as usize; - let mut buffer = Vec::with_capacity(length); - - let dst = buffer.spare_capacity_mut(); - let mut buf = ReadBuf::uninit(dst); - - // Safety: the input buffer is created with_capacity(length). - unsafe { buf.assume_init(length) }; - - // TODO: use native read api - s.read_exact(buf.initialized_mut()).await.map_err(|err| { - Error::new(ErrorKind::Unexpected, "read from storage") - .with_operation("read") - .with_context("service", inner.info().scheme().into_static()) - .with_context("path", &path) - .with_context("range", br.to_string()) - .set_source(err) - })?; - - // Safety: read_exact makes sure this buffer has been filled. - unsafe { buffer.set_len(length) } - - Ok(buffer) + Ok(buf) }; Box::pin(fut)
