This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch lazy-reader
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git

commit e52ba54ad93b25a3545c24ec1593fef296318dd8
Author: Xuanwo <[email protected]>
AuthorDate: Wed Oct 25 17:47:00 2023 +0800

    Polish API
    
    Signed-off-by: Xuanwo <[email protected]>
---
 core/src/layers/complete.rs                        | 10 +--
 core/src/raw/oio/read/mod.rs                       |  5 +-
 ...nto_seekable_read_by_range.rs => range_read.rs} | 92 +++++++++++-----------
 3 files changed, 55 insertions(+), 52 deletions(-)

diff --git a/core/src/layers/complete.rs b/core/src/layers/complete.rs
index c0fe7b480..903b64e6b 100644
--- a/core/src/layers/complete.rs
+++ b/core/src/layers/complete.rs
@@ -29,10 +29,10 @@ use bytes::Bytes;
 
 use crate::raw::oio::into_flat_page;
 use crate::raw::oio::into_hierarchy_page;
-use crate::raw::oio::ByRangeSeekableReader;
 use crate::raw::oio::Entry;
 use crate::raw::oio::FlatPager;
 use crate::raw::oio::HierarchyPager;
+use crate::raw::oio::RangeReader;
 use crate::raw::oio::StreamableReader;
 use crate::raw::*;
 use crate::*;
@@ -171,7 +171,7 @@ impl<A: Accessor> CompleteAccessor<A> {
                 Ok((rp, CompleteReader::NeedStreamable(r)))
             }
             _ => {
-                let r = oio::into_seekable_read_by_range(self.inner.clone(), 
path, args);
+                let r = RangeReader::new(self.inner.clone(), path, args);
 
                 if streamable {
                     Ok((rp, CompleteReader::NeedSeekable(r)))
@@ -205,7 +205,7 @@ impl<A: Accessor> CompleteAccessor<A> {
                 Ok((rp, CompleteReader::NeedStreamable(r)))
             }
             _ => {
-                let r = oio::into_seekable_read_by_range(self.inner.clone(), 
path, args);
+                let r = RangeReader::new(self.inner.clone(), path, args);
 
                 if streamable {
                     Ok((rp, CompleteReader::NeedSeekable(r)))
@@ -547,9 +547,9 @@ impl<A: Accessor> LayeredAccessor for CompleteAccessor<A> {
 
 pub enum CompleteReader<A: Accessor, R> {
     AlreadyComplete(R),
-    NeedSeekable(ByRangeSeekableReader<A, R>),
+    NeedSeekable(RangeReader<A, R>),
     NeedStreamable(StreamableReader<R>),
-    NeedBoth(StreamableReader<ByRangeSeekableReader<A, R>>),
+    NeedBoth(StreamableReader<RangeReader<A, R>>),
 }
 
 impl<A, R> oio::Read for CompleteReader<A, R>
diff --git a/core/src/raw/oio/read/mod.rs b/core/src/raw/oio/read/mod.rs
index 1415d630e..f7d971782 100644
--- a/core/src/raw/oio/read/mod.rs
+++ b/core/src/raw/oio/read/mod.rs
@@ -27,9 +27,8 @@ mod into_streamable_read;
 pub use into_streamable_read::into_streamable_read;
 pub use into_streamable_read::StreamableReader;
 
-mod into_seekable_read_by_range;
-pub use into_seekable_read_by_range::into_seekable_read_by_range;
-pub use into_seekable_read_by_range::ByRangeSeekableReader;
+mod range_read;
+pub use range_read::RangeReader;
 
 mod into_read_from_file;
 pub use into_read_from_file::into_read_from_file;
diff --git a/core/src/raw/oio/read/into_seekable_read_by_range.rs 
b/core/src/raw/oio/read/range_read.rs
similarity index 93%
rename from core/src/raw/oio/read/into_seekable_read_by_range.rs
rename to core/src/raw/oio/read/range_read.rs
index 0092863a6..870285e42 100644
--- a/core/src/raw/oio/read/into_seekable_read_by_range.rs
+++ b/core/src/raw/oio/read/range_read.rs
@@ -29,45 +29,16 @@ use futures::future::BoxFuture;
 use crate::raw::*;
 use crate::*;
 
-/// Convert given reader into [`oio::Reader`] by range.
+/// RangeReader that can do seek on non-seekable reader.
 ///
-/// # Input
+/// `oio::Reader` requires the underlying reader to be seekable, but some 
services like s3, gcs
+/// doesn't support seek natively. RangeReader implement seek by 
read_with_range. We will start
+/// a new read request with the correct range when seek is called.
 ///
-/// The input is an Accessor will may return a non-seekable reader.
-///
-/// # Output
-///
-/// The output is a reader that can be seek by range.
-///
-/// # Notes
-///
-/// This operation is not zero cost. If the accessor already returns a
-/// seekable reader, please don't use this.
-pub fn into_seekable_read_by_range<A: Accessor, R>(
-    acc: Arc<A>,
-    path: &str,
-    op: OpRead,
-) -> ByRangeSeekableReader<A, R> {
-    // Normalize range like `..` into `0..` to make sure offset is valid.
-    let (offset, size) = match (op.range().offset(), op.range().size()) {
-        (None, None) => (Some(0), None),
-        v => v,
-    };
-
-    ByRangeSeekableReader {
-        acc,
-        path: Arc::new(path.to_string()),
-        op,
-
-        offset,
-        size,
-        cur: 0,
-        state: State::<R>::Idle,
-    }
-}
-
-/// ByRangeReader that can do seek on non-seekable reader.
-pub struct ByRangeSeekableReader<A: Accessor, R> {
+/// The `seek` operation on `RangeReader` is zero cost and purely in-memory. 
But calling `seek`
+/// while there is a pending read request will cancel the request and start a 
new one. This could
+/// add extra cost to the read operation.
+pub struct RangeReader<A: Accessor, R> {
     acc: Arc<A>,
     path: Arc<String>,
     op: OpRead,
@@ -88,10 +59,43 @@ enum State<R> {
 /// Safety: State will only be accessed under &mut.
 unsafe impl<R> Sync for State<R> {}
 
-impl<A, R> ByRangeSeekableReader<A, R>
+impl<A, R> RangeReader<A, R>
 where
     A: Accessor,
 {
+    /// Create a new [`oio::Reader`] by range support.
+    ///
+    /// # Input
+    ///
+    /// The input is an Accessor will may return a non-seekable reader.
+    ///
+    /// # Output
+    ///
+    /// The output is a reader that can be seek by range.
+    ///
+    /// # Notes
+    ///
+    /// This operation is not zero cost. If the accessor already returns a
+    /// seekable reader, please don't use this.
+    pub fn new(acc: Arc<A>, path: &str, op: OpRead) -> RangeReader<A, R> {
+        // Normalize range like `..` into `0..` to make sure offset is valid.
+        let (offset, size) = match (op.range().offset(), op.range().size()) {
+            (None, None) => (Some(0), None),
+            v => v,
+        };
+
+        RangeReader {
+            acc,
+            path: Arc::new(path.to_string()),
+            op,
+
+            offset,
+            size,
+            cur: 0,
+            state: State::<R>::Idle,
+        }
+    }
+
     /// Fill current reader's range by total_size.
     fn fill_range(&mut self, total_size: u64) -> Result<()> {
         (self.offset, self.size) = match (self.offset, self.size) {
@@ -138,7 +142,7 @@ where
     }
 }
 
-impl<A, R> ByRangeSeekableReader<A, R>
+impl<A, R> RangeReader<A, R>
 where
     A: Accessor<Reader = R>,
     R: oio::Read,
@@ -179,7 +183,7 @@ where
     }
 }
 
-impl<A, R> ByRangeSeekableReader<A, R>
+impl<A, R> RangeReader<A, R>
 where
     A: Accessor<BlockingReader = R>,
     R: oio::BlockingRead,
@@ -220,7 +224,7 @@ where
     }
 }
 
-impl<A, R> oio::Read for ByRangeSeekableReader<A, R>
+impl<A, R> oio::Read for RangeReader<A, R>
 where
     A: Accessor<Reader = R>,
     R: oio::Read,
@@ -415,7 +419,7 @@ where
     }
 }
 
-impl<A, R> oio::BlockingRead for ByRangeSeekableReader<A, R>
+impl<A, R> oio::BlockingRead for RangeReader<A, R>
 where
     A: Accessor<BlockingReader = R>,
     R: oio::BlockingRead,
@@ -682,7 +686,7 @@ mod tests {
         let (bs, _) = gen_bytes();
         let acc = Arc::new(MockReadService::new(bs.clone()));
 
-        let mut r = Box::new(into_seekable_read_by_range(
+        let mut r = Box::new(RangeReader::new(
             acc,
             "x",
             OpRead::default().with_range(BytesRange::from(..)),
@@ -717,7 +721,7 @@ mod tests {
         let (bs, _) = gen_bytes();
         let acc = Arc::new(MockReadService::new(bs.clone()));
 
-        let mut r = Box::new(into_seekable_read_by_range(
+        let mut r = Box::new(RangeReader::new(
             acc,
             "x",
             OpRead::default().with_range(BytesRange::from(4096..4096 + 4096)),

Reply via email to