This is an automated email from the ASF dual-hosted git repository. xuanwo pushed a commit to branch lazy-reader in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
commit e52ba54ad93b25a3545c24ec1593fef296318dd8 Author: Xuanwo <[email protected]> AuthorDate: Wed Oct 25 17:47:00 2023 +0800 Polish API Signed-off-by: Xuanwo <[email protected]> --- core/src/layers/complete.rs | 10 +-- core/src/raw/oio/read/mod.rs | 5 +- ...nto_seekable_read_by_range.rs => range_read.rs} | 92 +++++++++++----------- 3 files changed, 55 insertions(+), 52 deletions(-) diff --git a/core/src/layers/complete.rs b/core/src/layers/complete.rs index c0fe7b480..903b64e6b 100644 --- a/core/src/layers/complete.rs +++ b/core/src/layers/complete.rs @@ -29,10 +29,10 @@ use bytes::Bytes; use crate::raw::oio::into_flat_page; use crate::raw::oio::into_hierarchy_page; -use crate::raw::oio::ByRangeSeekableReader; use crate::raw::oio::Entry; use crate::raw::oio::FlatPager; use crate::raw::oio::HierarchyPager; +use crate::raw::oio::RangeReader; use crate::raw::oio::StreamableReader; use crate::raw::*; use crate::*; @@ -171,7 +171,7 @@ impl<A: Accessor> CompleteAccessor<A> { Ok((rp, CompleteReader::NeedStreamable(r))) } _ => { - let r = oio::into_seekable_read_by_range(self.inner.clone(), path, args); + let r = RangeReader::new(self.inner.clone(), path, args); if streamable { Ok((rp, CompleteReader::NeedSeekable(r))) @@ -205,7 +205,7 @@ impl<A: Accessor> CompleteAccessor<A> { Ok((rp, CompleteReader::NeedStreamable(r))) } _ => { - let r = oio::into_seekable_read_by_range(self.inner.clone(), path, args); + let r = RangeReader::new(self.inner.clone(), path, args); if streamable { Ok((rp, CompleteReader::NeedSeekable(r))) @@ -547,9 +547,9 @@ impl<A: Accessor> LayeredAccessor for CompleteAccessor<A> { pub enum CompleteReader<A: Accessor, R> { AlreadyComplete(R), - NeedSeekable(ByRangeSeekableReader<A, R>), + NeedSeekable(RangeReader<A, R>), NeedStreamable(StreamableReader<R>), - NeedBoth(StreamableReader<ByRangeSeekableReader<A, R>>), + NeedBoth(StreamableReader<RangeReader<A, R>>), } impl<A, R> oio::Read for CompleteReader<A, R> diff --git a/core/src/raw/oio/read/mod.rs b/core/src/raw/oio/read/mod.rs index 1415d630e..f7d971782 100644 --- a/core/src/raw/oio/read/mod.rs +++ b/core/src/raw/oio/read/mod.rs @@ -27,9 +27,8 @@ mod into_streamable_read; pub use into_streamable_read::into_streamable_read; pub use into_streamable_read::StreamableReader; -mod into_seekable_read_by_range; -pub use into_seekable_read_by_range::into_seekable_read_by_range; -pub use into_seekable_read_by_range::ByRangeSeekableReader; +mod range_read; +pub use range_read::RangeReader; mod into_read_from_file; pub use into_read_from_file::into_read_from_file; diff --git a/core/src/raw/oio/read/into_seekable_read_by_range.rs b/core/src/raw/oio/read/range_read.rs similarity index 93% rename from core/src/raw/oio/read/into_seekable_read_by_range.rs rename to core/src/raw/oio/read/range_read.rs index 0092863a6..870285e42 100644 --- a/core/src/raw/oio/read/into_seekable_read_by_range.rs +++ b/core/src/raw/oio/read/range_read.rs @@ -29,45 +29,16 @@ use futures::future::BoxFuture; use crate::raw::*; use crate::*; -/// Convert given reader into [`oio::Reader`] by range. +/// RangeReader that can do seek on non-seekable reader. /// -/// # Input +/// `oio::Reader` requires the underlying reader to be seekable, but some services like s3, gcs +/// doesn't support seek natively. RangeReader implement seek by read_with_range. We will start +/// a new read request with the correct range when seek is called. /// -/// The input is an Accessor will may return a non-seekable reader. -/// -/// # Output -/// -/// The output is a reader that can be seek by range. -/// -/// # Notes -/// -/// This operation is not zero cost. If the accessor already returns a -/// seekable reader, please don't use this. -pub fn into_seekable_read_by_range<A: Accessor, R>( - acc: Arc<A>, - path: &str, - op: OpRead, -) -> ByRangeSeekableReader<A, R> { - // Normalize range like `..` into `0..` to make sure offset is valid. - let (offset, size) = match (op.range().offset(), op.range().size()) { - (None, None) => (Some(0), None), - v => v, - }; - - ByRangeSeekableReader { - acc, - path: Arc::new(path.to_string()), - op, - - offset, - size, - cur: 0, - state: State::<R>::Idle, - } -} - -/// ByRangeReader that can do seek on non-seekable reader. -pub struct ByRangeSeekableReader<A: Accessor, R> { +/// The `seek` operation on `RangeReader` is zero cost and purely in-memory. But calling `seek` +/// while there is a pending read request will cancel the request and start a new one. This could +/// add extra cost to the read operation. +pub struct RangeReader<A: Accessor, R> { acc: Arc<A>, path: Arc<String>, op: OpRead, @@ -88,10 +59,43 @@ enum State<R> { /// Safety: State will only be accessed under &mut. unsafe impl<R> Sync for State<R> {} -impl<A, R> ByRangeSeekableReader<A, R> +impl<A, R> RangeReader<A, R> where A: Accessor, { + /// Create a new [`oio::Reader`] by range support. + /// + /// # Input + /// + /// The input is an Accessor will may return a non-seekable reader. + /// + /// # Output + /// + /// The output is a reader that can be seek by range. + /// + /// # Notes + /// + /// This operation is not zero cost. If the accessor already returns a + /// seekable reader, please don't use this. + pub fn new(acc: Arc<A>, path: &str, op: OpRead) -> RangeReader<A, R> { + // Normalize range like `..` into `0..` to make sure offset is valid. + let (offset, size) = match (op.range().offset(), op.range().size()) { + (None, None) => (Some(0), None), + v => v, + }; + + RangeReader { + acc, + path: Arc::new(path.to_string()), + op, + + offset, + size, + cur: 0, + state: State::<R>::Idle, + } + } + /// Fill current reader's range by total_size. fn fill_range(&mut self, total_size: u64) -> Result<()> { (self.offset, self.size) = match (self.offset, self.size) { @@ -138,7 +142,7 @@ where } } -impl<A, R> ByRangeSeekableReader<A, R> +impl<A, R> RangeReader<A, R> where A: Accessor<Reader = R>, R: oio::Read, @@ -179,7 +183,7 @@ where } } -impl<A, R> ByRangeSeekableReader<A, R> +impl<A, R> RangeReader<A, R> where A: Accessor<BlockingReader = R>, R: oio::BlockingRead, @@ -220,7 +224,7 @@ where } } -impl<A, R> oio::Read for ByRangeSeekableReader<A, R> +impl<A, R> oio::Read for RangeReader<A, R> where A: Accessor<Reader = R>, R: oio::Read, @@ -415,7 +419,7 @@ where } } -impl<A, R> oio::BlockingRead for ByRangeSeekableReader<A, R> +impl<A, R> oio::BlockingRead for RangeReader<A, R> where A: Accessor<BlockingReader = R>, R: oio::BlockingRead, @@ -682,7 +686,7 @@ mod tests { let (bs, _) = gen_bytes(); let acc = Arc::new(MockReadService::new(bs.clone())); - let mut r = Box::new(into_seekable_read_by_range( + let mut r = Box::new(RangeReader::new( acc, "x", OpRead::default().with_range(BytesRange::from(..)), @@ -717,7 +721,7 @@ mod tests { let (bs, _) = gen_bytes(); let acc = Arc::new(MockReadService::new(bs.clone())); - let mut r = Box::new(into_seekable_read_by_range( + let mut r = Box::new(RangeReader::new( acc, "x", OpRead::default().with_range(BytesRange::from(4096..4096 + 4096)),
