This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch lazy-reader
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git

commit 0c28f67d3f4970fe29dfe7059cda71ed3f0c71e1
Author: Xuanwo <[email protected]>
AuthorDate: Thu Oct 26 17:01:03 2023 +0800

    Add file reader support
    
    Signed-off-by: Xuanwo <[email protected]>
---
 core/src/raw/oio/buf/adaptive.rs   | 140 ++++++++++
 core/src/raw/oio/buf/mod.rs        |   3 +
 core/src/raw/oio/read/file_read.rs | 546 +++++++++++++++++++++++++++++--------
 core/src/services/fs/backend.rs    |  67 +----
 core/src/services/hdfs/backend.rs  |  65 +----
 core/src/services/sftp/backend.rs  |  41 +--
 6 files changed, 599 insertions(+), 263 deletions(-)

diff --git a/core/src/raw/oio/buf/adaptive.rs b/core/src/raw/oio/buf/adaptive.rs
new file mode 100644
index 000000000..eb5b72b04
--- /dev/null
+++ b/core/src/raw/oio/buf/adaptive.rs
@@ -0,0 +1,140 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use bytes::{Bytes, BytesMut};
+use std::cmp;
+use tokio::io::ReadBuf;
+
+/// The default minimum adaptive buffer size is 8 KiB.
+const DEFAULT_MIN_BUFFER_SIZE: usize = 8192;
+
+/// The default maximum adaptive buffer size is 4 MiB.
+///
+/// We will not grow the buffer beyond this size.
+const DEFAULT_MAX_BUFFER_SIZE: usize = 4 * 1024 * 1024;
+
+/// AdaptiveBuf is inspired by hyper 
[ReadStrategy](https://github.com/hyperium/hyper/blob/master/src/proto/h1/io.rs#L26).
+///
+/// We build this adaptive buf to make our internal buf grow and shrink 
automatically based on IO
+/// throughput.
+pub struct AdaptiveBuf {
+    /// The underlying buffer.
+    buffer: BytesMut,
+
+    next: usize,
+    decrease_now: bool,
+}
+
+impl Default for AdaptiveBuf {
+    fn default() -> Self {
+        Self {
+            buffer: BytesMut::default(),
+            next: DEFAULT_MIN_BUFFER_SIZE,
+            decrease_now: false,
+        }
+    }
+}
+
+impl AdaptiveBuf {
+    /// reserve will reserve the buffer to the next size.
+    pub fn reserve(&mut self) {
+        if self.buffer.capacity() < self.next {
+            self.buffer.reserve(self.next);
+        }
+    }
+
+    /// Returning the initialized part of the buffer.
+    pub fn initialized_mut<'a>(&'a mut self) -> ReadBuf<'a> {
+        let dst = self.buffer.spare_capacity_mut();
+        let length = dst.len();
+        let mut buf = ReadBuf::uninit(dst);
+
+        // Safety: we make sure that we only return the initialized part of 
the buffer.
+        unsafe {
+            buf.assume_init(length);
+        }
+        buf
+    }
+
+    /// Records the number of bytes read from the underlying IO.
+    pub fn record(&mut self, read: usize) {
+        if read >= self.next {
+            // Growing if we uses the whole buffer.
+            self.next = cmp::min(self.next.saturating_mul(2), 
DEFAULT_MAX_BUFFER_SIZE);
+            self.decrease_now = false;
+        } else {
+            // Shrinking if we uses less than half of the buffer.
+            let decr_to = self.next.saturating_div(2);
+            if read < decr_to {
+                if self.decrease_now {
+                    self.next = cmp::max(decr_to, DEFAULT_MIN_BUFFER_SIZE);
+                    self.decrease_now = false;
+                } else {
+                    // Mark decrease_now as true to shrink the buffer next 
time.
+                    self.decrease_now = true;
+                }
+            } else {
+                // Mark decrease_now as false to keep current buffer size.
+                self.decrease_now = false;
+            }
+        }
+    }
+
+    /// Splits the buffer into two at the given index.
+    ///
+    /// # Safety
+    ///
+    /// It's required that buffer has been filled with given bytes.
+    pub fn split(&mut self, n: usize) -> Bytes {
+        unsafe { self.buffer.set_len(n) }
+        self.buffer.split().freeze()
+    }
+}
+
+#[cfg(tests)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn read_strategy_adaptive_decrements() {
+        let mut huf = AdaptiveBuf::default();
+        huf.record(8192);
+        assert_eq!(huf.next, 16384);
+
+        huf.record(1);
+        assert_eq!(
+            huf.next, 16384,
+            "first smaller record doesn't decrement yet"
+        );
+        huf.record(8192);
+        assert_eq!(huf.next, 16384, "record was with range");
+
+        huf.record(1);
+        assert_eq!(
+            huf.next, 16384,
+            "in-range record should make this the 'first' again"
+        );
+
+        huf.record(1);
+        assert_eq!(huf.next, 8192, "second smaller record decrements");
+
+        huf.record(1);
+        assert_eq!(huf.next, 8192, "first doesn't decrement");
+        huf.record(1);
+        assert_eq!(huf.next, 8192, "doesn't decrement under minimum");
+    }
+}
diff --git a/core/src/raw/oio/buf/mod.rs b/core/src/raw/oio/buf/mod.rs
index dfd3663e5..abc8bf328 100644
--- a/core/src/raw/oio/buf/mod.rs
+++ b/core/src/raw/oio/buf/mod.rs
@@ -20,3 +20,6 @@ pub use chunked_bytes::ChunkedBytes;
 
 mod write_buf;
 pub use write_buf::WriteBuf;
+
+mod adaptive;
+pub use adaptive::AdaptiveBuf;
diff --git a/core/src/raw/oio/read/file_read.rs 
b/core/src/raw/oio/read/file_read.rs
index 852e7c47d..b06e65fef 100644
--- a/core/src/raw/oio/read/file_read.rs
+++ b/core/src/raw/oio/read/file_read.rs
@@ -16,171 +16,495 @@
 // under the License.
 
 use std::cmp;
-use std::io::Read;
-use std::io::Seek;
+
 use std::io::SeekFrom;
 use std::pin::Pin;
+use std::sync::Arc;
 use std::task::ready;
 use std::task::Context;
 use std::task::Poll;
 
 use bytes::Bytes;
-use futures::AsyncRead;
-use futures::AsyncSeek;
+use futures::future::BoxFuture;
+use futures::Future;
 
 use crate::raw::*;
 use crate::*;
 
-/// FileReader implements [`oio::Read`] via `AsyncRead + AsyncSeek`.
-pub struct FileReader<R> {
-    inner: R,
+/// FileReader that implement range read and streamable read on seekable 
reader.
+///
+/// `oio::Reader` requires the underlying reader to handle range correctly and 
have streamable support.
+/// But some services like `fs`, `hdfs` only have seek support. FileReader 
implements range and stream
+/// support based on `seek`. We will maintain the correct range for give file 
and implement streamable
+/// operations based on [`oio::AdaptiveBuf`].
+pub struct FileReader<A: Accessor, R> {
+    acc: Arc<A>,
+    path: Arc<String>,
+    op: OpRead,
+
+    offset: Option<u64>,
+    size: Option<u64>,
+    cur: u64,
 
-    start: u64,
-    end: Option<u64>,
+    buf: oio::AdaptiveBuf,
+    state: State<R>,
+}
 
-    offset: u64,
+enum State<R> {
+    Idle,
+    Send(BoxFuture<'static, Result<(RpRead, R)>>),
+    Read(R),
 }
 
-impl<R> FileReader<R> {
+/// Safety: State will only be accessed under &mut.
+unsafe impl<R> Sync for State<R> {}
+
+impl<A, R> FileReader<A, R>
+where
+    A: Accessor,
+{
     /// Create a new FileReader.
     ///
     /// # Notes
     ///
     /// It's required that input reader's cursor is at the input `start` of 
the file.
-    pub fn new(fd: R, start: u64, end: Option<u64>) -> FileReader<R> {
+    pub fn new(acc: Arc<A>, path: &str, op: OpRead) -> FileReader<A, R> {
         FileReader {
-            inner: fd,
-            start,
-            end,
+            acc,
+            path: Arc::new(path.to_string()),
+            op,
 
-            offset: start,
+            offset: None,
+            size: None,
+            cur: 0,
+            buf: oio::AdaptiveBuf::default(),
+            state: State::<R>::Idle,
         }
     }
+}
 
-    fn calculate_position(&self, pos: SeekFrom) -> Result<SeekFrom> {
-        match pos {
-            SeekFrom::Start(n) => {
-                if n < self.start {
-                    return Err(Error::new(
-                        ErrorKind::InvalidInput,
-                        "seek to a negative position is invalid",
-                    )
-                    .with_context("position", format!("{pos:?}")));
-                }
+impl<A, R> FileReader<A, R>
+where
+    A: Accessor<Reader = R>,
+    R: oio::Read,
+{
+    fn read_future(&self) -> BoxFuture<'static, Result<(RpRead, R)>> {
+        let acc = self.acc.clone();
+        let path = self.path.clone();
+
+        // FileReader doesn't support range, we will always use full range to 
open a file.
+        let op = self.op.clone().with_range(BytesRange::from(..));
 
-                Ok(SeekFrom::Start(self.start + n))
+        Box::pin(async move { acc.read(&path, op).await })
+    }
+}
+
+impl<A, R> oio::Read for FileReader<A, R>
+where
+    A: Accessor<Reader = R>,
+    R: oio::Read,
+{
+    fn poll_read(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> 
Poll<Result<usize>> {
+        match &mut self.state {
+            State::Idle => {
+                self.state = State::Send(self.read_future());
+                self.poll_read(cx, buf)
+            }
+            State::Send(fut) => {
+                let (_, r) = ready!(Pin::new(fut).poll(cx)).map_err(|err| {
+                    // If send future returns an error, we should reset
+                    // state to Idle so that we can retry it.
+                    self.state = State::Idle;
+                    err
+                })?;
+                self.state = State::Read(r);
+                self.poll_read(cx, buf)
             }
-            SeekFrom::End(n) => {
-                let end = if let Some(end) = self.end {
-                    end as i64 + n
+            State::Read(r) => {
+                // We should know where to start read the data.
+                if self.offset.is_none() {
+                    let (offset, size) = match (self.op.range().offset(), 
self.op.range().size()) {
+                        (None, None) => (0, None),
+                        (None, Some(size)) => {
+                            let start = ready!(r.poll_seek(cx, 
SeekFrom::End(size as i64)))?;
+                            (start, Some(size))
+                        }
+                        (Some(offset), None) => {
+                            let start = ready!(r.poll_seek(cx, 
SeekFrom::Start(offset)))?;
+                            (start, None)
+                        }
+                        (Some(offset), Some(size)) => {
+                            let start = ready!(r.poll_seek(cx, 
SeekFrom::Start(offset)))?;
+                            (start, Some(size))
+                        }
+                    };
+                    self.offset = Some(offset);
+                    self.size = size;
+                }
+                let size = if let Some(size) = self.size {
+                    // Sanity check.
+                    if self.cur >= size {
+                        return Poll::Ready(Ok(0));
+                    }
+                    cmp::min(buf.len(), (size - self.cur) as usize)
                 } else {
-                    n
+                    buf.len()
                 };
 
-                if self.start as i64 + end < 0 {
-                    return Err(Error::new(
-                        ErrorKind::InvalidInput,
-                        "seek to a negative position is invalid",
-                    )
-                    .with_context("position", format!("{pos:?}")));
+                match ready!(r.poll_read(cx, &mut buf[..size])) {
+                    Ok(0) => Poll::Ready(Ok(0)),
+                    Ok(n) => {
+                        self.cur += n as u64;
+                        Poll::Ready(Ok(n))
+                    }
+                    // We don't need to reset state here since it's ok to poll 
the same reader.
+                    Err(err) => Poll::Ready(Err(err)),
                 }
+            }
+        }
+    }
 
-                Ok(SeekFrom::End(end))
+    fn poll_seek(&mut self, cx: &mut Context<'_>, pos: SeekFrom) -> 
Poll<Result<u64>> {
+        match &mut self.state {
+            State::Idle => {
+                self.state = State::Send(self.read_future());
+                self.poll_seek(cx, pos)
             }
-            SeekFrom::Current(n) => {
-                if self.offset as i64 + n < self.start as i64 {
-                    return Err(Error::new(
-                        ErrorKind::InvalidInput,
-                        "seek to a negative position is invalid",
-                    )
-                    .with_context("position", format!("{pos:?}")));
+            State::Send(fut) => {
+                let (_, r) = ready!(Pin::new(fut).poll(cx)).map_err(|err| {
+                    // If send future returns an error, we should reset
+                    // state to Idle so that we can retry it.
+                    self.state = State::Idle;
+                    err
+                })?;
+                self.state = State::Read(r);
+                self.poll_seek(cx, pos)
+            }
+            State::Read(r) => {
+                // We should know where to start read the data.
+                if self.offset.is_none() {
+                    let (offset, size) = match (self.op.range().offset(), 
self.op.range().size()) {
+                        (None, None) => (0, None),
+                        (None, Some(size)) => {
+                            let start = ready!(r.poll_seek(cx, 
SeekFrom::End(-(size as i64))))?;
+                            (start, Some(size))
+                        }
+                        (Some(offset), None) => {
+                            let start = ready!(r.poll_seek(cx, 
SeekFrom::Start(offset)))?;
+                            (start, None)
+                        }
+                        (Some(offset), Some(size)) => {
+                            let start = ready!(r.poll_seek(cx, 
SeekFrom::Start(offset)))?;
+                            (start, Some(size))
+                        }
+                    };
+                    self.offset = Some(offset);
+                    self.size = size;
                 }
-
-                Ok(SeekFrom::Current(n))
+                let pos = calculate_position(self.offset, self.size, self.cur, 
pos)?;
+                let cur = ready!(r.poll_seek(cx, pos))?;
+                self.cur = cur - self.offset.unwrap();
+                Poll::Ready(Ok(cur))
             }
         }
     }
-}
 
-impl<R> oio::Read for FileReader<R>
-where
-    R: AsyncRead + AsyncSeek + Unpin + Send + Sync,
-{
-    fn poll_read(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> 
Poll<Result<usize>> {
-        let size = if let Some(end) = self.end {
-            if self.offset >= end {
-                return Poll::Ready(Ok(0));
-            }
-            cmp::min(buf.len(), (end - self.offset) as usize)
-        } else {
-            buf.len()
-        };
-
-        let n =
-            ready!(Pin::new(&mut self.inner).poll_read(cx, &mut 
buf[..size])).map_err(|err| {
-                Error::new(ErrorKind::Unexpected, "read data from 
FileReader").set_source(err)
-            })?;
-        self.offset += n as u64;
-        Poll::Ready(Ok(n))
-    }
-
-    fn poll_seek(&mut self, cx: &mut Context<'_>, pos: SeekFrom) -> 
Poll<Result<u64>> {
-        let pos = self.calculate_position(pos)?;
+    fn poll_next(&mut self, cx: &mut Context<'_>) -> 
Poll<Option<Result<Bytes>>> {
+        match &mut self.state {
+            State::Idle => {
+                self.state = State::Send(self.read_future());
+                self.poll_next(cx)
+            }
+            State::Send(fut) => {
+                let (_, r) = ready!(Pin::new(fut).poll(cx)).map_err(|err| {
+                    // If send future returns an error, we should reset
+                    // state to Idle so that we can retry it.
+                    self.state = State::Idle;
+                    err
+                })?;
+                self.state = State::Read(r);
+                self.poll_next(cx)
+            }
+            State::Read(r) => {
+                // We should know where to start read the data.
+                if self.offset.is_none() {
+                    let (offset, size) = match (self.op.range().offset(), 
self.op.range().size()) {
+                        (None, None) => (0, None),
+                        (None, Some(size)) => {
+                            let start = ready!(r.poll_seek(cx, 
SeekFrom::End(size as i64)))?;
+                            (start, Some(size))
+                        }
+                        (Some(offset), None) => {
+                            let start = ready!(r.poll_seek(cx, 
SeekFrom::Start(offset)))?;
+                            (start, None)
+                        }
+                        (Some(offset), Some(size)) => {
+                            let start = ready!(r.poll_seek(cx, 
SeekFrom::Start(offset)))?;
+                            (start, Some(size))
+                        }
+                    };
+                    self.offset = Some(offset);
+                    self.size = size;
+                }
 
-        let cur = ready!(Pin::new(&mut self.inner).poll_seek(cx, 
pos)).map_err(|err| {
-            Error::new(ErrorKind::Unexpected, "seek data from 
FileReader").set_source(err)
-        })?;
+                self.buf.reserve();
 
-        self.offset = cur;
-        Poll::Ready(Ok(self.offset - self.start))
-    }
+                let mut buf = self.buf.initialized_mut();
+                let buf = buf.initialized_mut();
 
-    fn poll_next(&mut self, cx: &mut Context<'_>) -> 
Poll<Option<Result<Bytes>>> {
-        let _ = cx;
+                let size = if let Some(size) = self.size {
+                    // Sanity check.
+                    if self.cur >= size {
+                        return Poll::Ready(None);
+                    }
+                    cmp::min(buf.len(), (size - self.cur) as usize)
+                } else {
+                    buf.len()
+                };
 
-        Poll::Ready(Some(Err(Error::new(
-            ErrorKind::Unsupported,
-            "output reader doesn't support next",
-        ))))
+                match ready!(r.poll_read(cx, &mut buf[..size])) {
+                    Ok(0) => Poll::Ready(None),
+                    Ok(n) => {
+                        self.cur += n as u64;
+                        self.buf.record(n);
+                        Poll::Ready(Some(Ok(self.buf.split(n))))
+                    }
+                    // We don't need to reset state here since it's ok to poll 
the same reader.
+                    Err(err) => Poll::Ready(Some(Err(err))),
+                }
+            }
+        }
     }
 }
 
-impl<R> oio::BlockingRead for FileReader<R>
+impl<A, R> oio::BlockingRead for FileReader<A, R>
 where
-    R: Read + Seek + Send + Sync + 'static,
+    A: Accessor<BlockingReader = R>,
+    R: oio::BlockingRead,
 {
     fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
-        let size = if let Some(end) = self.end {
-            if self.offset >= end {
-                return Ok(0);
-            }
-            cmp::min(buf.len(), (end - self.offset) as usize)
-        } else {
-            buf.len()
-        };
-
-        let n = self.inner.read(&mut buf[..size]).map_err(|err| {
-            Error::new(ErrorKind::Unexpected, "read data from 
FileReader").set_source(err)
-        })?;
-        self.offset += n as u64;
-        Ok(n)
+        match &mut self.state {
+            State::Idle => {
+                // FileReader doesn't support range, we will always use full 
range to open a file.
+                let op = self.op.clone().with_range(BytesRange::from(..));
+
+                let (_, r) = self.acc.blocking_read(&self.path, op)?;
+                self.state = State::Read(r);
+                self.read(buf)
+            }
+
+            State::Read(r) => {
+                // We should know where to start read the data.
+                if self.offset.is_none() {
+                    let (offset, size) = match (self.op.range().offset(), 
self.op.range().size()) {
+                        (None, None) => (0, None),
+                        (None, Some(size)) => {
+                            let start = r.seek(SeekFrom::End(size as i64))?;
+                            (start, Some(size))
+                        }
+                        (Some(offset), None) => {
+                            let start = r.seek(SeekFrom::Start(offset))?;
+                            (start, None)
+                        }
+                        (Some(offset), Some(size)) => {
+                            let start = r.seek(SeekFrom::Start(offset))?;
+                            (start, Some(size))
+                        }
+                    };
+                    self.offset = Some(offset);
+                    self.size = size;
+                }
+                let size = if let Some(size) = self.size {
+                    // Sanity check.
+                    if self.cur >= size {
+                        return Ok(0);
+                    }
+                    cmp::min(buf.len(), (size - self.cur) as usize)
+                } else {
+                    buf.len()
+                };
+
+                match r.read(&mut buf[..size]) {
+                    Ok(0) => Ok(0),
+                    Ok(n) => {
+                        self.cur += n as u64;
+                        Ok(n)
+                    }
+                    // We don't need to reset state here since it's ok to poll 
the same reader.
+                    Err(err) => Err(err),
+                }
+            }
+            State::Send(_) => {
+                unreachable!(
+                    "It's invalid to go into State::Send for BlockingRead, 
please report this bug"
+                )
+            }
+        }
     }
 
     fn seek(&mut self, pos: SeekFrom) -> Result<u64> {
-        let pos = self.calculate_position(pos)?;
-
-        let cur = self.inner.seek(pos).map_err(|err| {
-            Error::new(ErrorKind::Unexpected, "seek data from 
FileReader").set_source(err)
-        })?;
+        match &mut self.state {
+            State::Idle => {
+                // FileReader doesn't support range, we will always use full 
range to open a file.
+                let op = self.op.clone().with_range(BytesRange::from(..));
 
-        self.offset = cur;
-        Ok(self.offset - self.start)
+                let (_, r) = self.acc.blocking_read(&self.path, op)?;
+                self.state = State::Read(r);
+                self.seek(pos)
+            }
+            State::Read(r) => {
+                // We should know where to start read the data.
+                if self.offset.is_none() {
+                    let (offset, size) = match (self.op.range().offset(), 
self.op.range().size()) {
+                        (None, None) => (0, None),
+                        (None, Some(size)) => {
+                            let start = r.seek(SeekFrom::End(-(size as i64)))?;
+                            (start, Some(size))
+                        }
+                        (Some(offset), None) => {
+                            let start = r.seek(SeekFrom::Start(offset))?;
+                            (start, None)
+                        }
+                        (Some(offset), Some(size)) => {
+                            let start = r.seek(SeekFrom::Start(offset))?;
+                            (start, Some(size))
+                        }
+                    };
+                    self.offset = Some(offset);
+                    self.size = size;
+                }
+                let pos = calculate_position(self.offset, self.size, self.cur, 
pos)?;
+                let cur = r.seek(pos)?;
+                self.cur = cur - self.offset.unwrap();
+                Ok(self.cur)
+            }
+            State::Send(_) => {
+                unreachable!(
+                    "It's invalid to go into State::Send for BlockingRead, 
please report this bug"
+                )
+            }
+        }
     }
 
     fn next(&mut self) -> Option<Result<Bytes>> {
-        Some(Err(Error::new(
-            ErrorKind::Unsupported,
-            "output reader doesn't support iterating",
-        )))
+        match &mut self.state {
+            State::Idle => {
+                // FileReader doesn't support range, we will always use full 
range to open a file.
+                let op = self.op.clone().with_range(BytesRange::from(..));
+
+                let r = match self.acc.blocking_read(&self.path, op) {
+                    Ok((_, r)) => r,
+                    Err(err) => return Some(Err(err)),
+                };
+                self.state = State::Read(r);
+                self.next()
+            }
+
+            State::Read(r) => {
+                // We should know where to start read the data.
+                if self.offset.is_none() {
+                    let (offset, size) = match (self.op.range().offset(), 
self.op.range().size()) {
+                        (None, None) => (0, None),
+                        (None, Some(size)) => {
+                            let start = match r.seek(SeekFrom::End(size as 
i64)) {
+                                Ok(v) => v,
+                                Err(err) => return Some(Err(err)),
+                            };
+                            (start, Some(size))
+                        }
+                        (Some(offset), None) => {
+                            let start = match r.seek(SeekFrom::Start(offset)) {
+                                Ok(v) => v,
+                                Err(err) => return Some(Err(err)),
+                            };
+                            (start, None)
+                        }
+                        (Some(offset), Some(size)) => {
+                            let start = match r.seek(SeekFrom::Start(offset)) {
+                                Ok(v) => v,
+                                Err(err) => return Some(Err(err)),
+                            };
+                            (start, Some(size))
+                        }
+                    };
+                    self.offset = Some(offset);
+                    self.size = size;
+                }
+
+                self.buf.reserve();
+
+                let mut buf = self.buf.initialized_mut();
+                let buf = buf.initialized_mut();
+
+                let size = if let Some(size) = self.size {
+                    // Sanity check.
+                    if self.cur >= size {
+                        return None;
+                    }
+                    cmp::min(buf.len(), (size - self.cur) as usize)
+                } else {
+                    buf.len()
+                };
+
+                match r.read(&mut buf[..size]) {
+                    Ok(0) => None,
+                    Ok(n) => {
+                        self.cur += n as u64;
+                        self.buf.record(n);
+                        Some(Ok(self.buf.split(n)))
+                    }
+                    // We don't need to reset state here since it's ok to poll 
the same reader.
+                    Err(err) => Some(Err(err)),
+                }
+            }
+            State::Send(_) => {
+                unreachable!(
+                    "It's invalid to go into State::Send for BlockingRead, 
please report this bug"
+                )
+            }
+        }
+    }
+}
+
+/// Calculate the actual position that we should seek to.
+fn calculate_position(
+    offset: Option<u64>,
+    size: Option<u64>,
+    cur: u64,
+    pos: SeekFrom,
+) -> Result<SeekFrom> {
+    let offset = offset.expect("offset should be set for calculate_position");
+
+    match pos {
+        SeekFrom::Start(n) => {
+            // It's valid for user to seek outsides end of the file.
+            Ok(SeekFrom::Start(offset + n))
+        }
+        SeekFrom::End(n) => {
+            if let Some(size) = size {
+                if size as i64 + n < 0 {
+                    return Err(Error::new(
+                        ErrorKind::InvalidInput,
+                        "seek to a negative position is invalid",
+                    )
+                    .with_context("position", format!("{pos:?}")));
+                }
+                // size is known, we can convert SeekFrom::End into 
SeekFrom::Start.
+                Ok(SeekFrom::Start(offset + (size as i64 + n) as u64))
+            } else {
+                // size unknown means we can forward seek end to underlying 
reader directly.
+                Ok(SeekFrom::End(n))
+            }
+        }
+        SeekFrom::Current(n) => {
+            if cur as i64 + n < 0 {
+                return Err(Error::new(
+                    ErrorKind::InvalidInput,
+                    "seek to a negative position is invalid",
+                )
+                .with_context("position", format!("{pos:?}")));
+            }
+            Ok(SeekFrom::Start(offset + (cur as i64 + n) as u64))
+        }
     }
 }
diff --git a/core/src/services/fs/backend.rs b/core/src/services/fs/backend.rs
index 524c4ef5e..034d75e03 100644
--- a/core/src/services/fs/backend.rs
+++ b/core/src/services/fs/backend.rs
@@ -16,11 +16,9 @@
 // under the License.
 
 use std::collections::HashMap;
-use std::io::SeekFrom;
 use std::path::Path;
 use std::path::PathBuf;
 
-use async_compat::Compat;
 use async_trait::async_trait;
 use chrono::DateTime;
 use log::debug;
@@ -246,8 +244,8 @@ impl FsBackend {
 
 #[async_trait]
 impl Accessor for FsBackend {
-    type Reader = oio::FileReader<Compat<tokio::fs::File>>;
-    type BlockingReader = oio::FileReader<std::fs::File>;
+    type Reader = oio::TokioReader<tokio::fs::File>;
+    type BlockingReader = oio::StdReader<std::fs::File>;
     type Writer = FsWriter<tokio::fs::File>;
     type BlockingWriter = FsWriter<std::fs::File>;
     type Pager = Option<FsPager<tokio::fs::ReadDir>>;
@@ -262,7 +260,6 @@ impl Accessor for FsBackend {
 
                 read: true,
                 read_can_seek: true,
-                read_with_range: true,
 
                 write: true,
                 write_can_empty: true,
@@ -303,12 +300,10 @@ impl Accessor for FsBackend {
     /// - open file first, and than use `seek`. (100ns)
     ///
     /// Benchmark could be found 
[here](https://gist.github.com/Xuanwo/48f9cfbc3022ea5f865388bb62e1a70f)
-    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, 
Self::Reader)> {
-        use tokio::io::AsyncSeekExt;
-
+    async fn read(&self, path: &str, _: OpRead) -> Result<(RpRead, 
Self::Reader)> {
         let p = self.root.join(path.trim_end_matches('/'));
 
-        let mut f = tokio::fs::OpenOptions::new()
+        let f = tokio::fs::OpenOptions::new()
             .read(true)
             .open(&p)
             .await
@@ -331,33 +326,7 @@ impl Accessor for FsBackend {
             }
         }
 
-        let (start, end) = match (args.range().offset(), args.range().size()) {
-            (None, None) => (0, None),
-            (None, Some(size)) => {
-                let start = f
-                    .seek(SeekFrom::End(size as i64))
-                    .await
-                    .map_err(new_std_io_error)?;
-                (start, Some(start + size))
-            }
-            (Some(offset), None) => {
-                let start = f
-                    .seek(SeekFrom::Start(offset))
-                    .await
-                    .map_err(new_std_io_error)?;
-                (start, None)
-            }
-            (Some(offset), Some(size)) => {
-                let start = f
-                    .seek(SeekFrom::Start(offset))
-                    .await
-                    .map_err(new_std_io_error)?;
-                (start, Some(size))
-            }
-        };
-
-        let r = oio::FileReader::new(Compat::new(f), start, end);
-
+        let r = oio::TokioReader::new(f);
         Ok((RpRead::new(0), r))
     }
 
@@ -504,12 +473,10 @@ impl Accessor for FsBackend {
         Ok(RpCreateDir::default())
     }
 
-    fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, 
Self::BlockingReader)> {
-        use std::io::Seek;
-
+    fn blocking_read(&self, path: &str, _: OpRead) -> Result<(RpRead, 
Self::BlockingReader)> {
         let p = self.root.join(path.trim_end_matches('/'));
 
-        let mut f = std::fs::OpenOptions::new()
+        let f = std::fs::OpenOptions::new()
             .read(true)
             .open(p)
             .map_err(new_std_io_error)?;
@@ -531,25 +498,7 @@ impl Accessor for FsBackend {
             }
         }
 
-        let (start, end) = match (args.range().offset(), args.range().size()) {
-            (None, None) => (0, None),
-            (None, Some(size)) => {
-                let start = f
-                    .seek(SeekFrom::End(size as i64))
-                    .map_err(new_std_io_error)?;
-                (start, Some(start + size))
-            }
-            (Some(offset), None) => {
-                let start = 
f.seek(SeekFrom::Start(offset)).map_err(new_std_io_error)?;
-                (start, None)
-            }
-            (Some(offset), Some(size)) => {
-                let start = 
f.seek(SeekFrom::Start(offset)).map_err(new_std_io_error)?;
-                (start, Some(size))
-            }
-        };
-
-        let r = oio::FileReader::new(f, start, end);
+        let r = oio::StdReader::new(f);
 
         Ok((RpRead::new(0), r))
     }
diff --git a/core/src/services/hdfs/backend.rs 
b/core/src/services/hdfs/backend.rs
index a7fb23f0c..fed635b15 100644
--- a/core/src/services/hdfs/backend.rs
+++ b/core/src/services/hdfs/backend.rs
@@ -18,7 +18,6 @@
 use std::collections::HashMap;
 use std::fmt::Debug;
 use std::io;
-use std::io::SeekFrom;
 use std::path::PathBuf;
 use std::sync::Arc;
 
@@ -158,8 +157,8 @@ unsafe impl Sync for HdfsBackend {}
 
 #[async_trait]
 impl Accessor for HdfsBackend {
-    type Reader = oio::FileReader<hdrs::AsyncFile>;
-    type BlockingReader = oio::FileReader<hdrs::File>;
+    type Reader = oio::FuturesReader<hdrs::AsyncFile>;
+    type BlockingReader = oio::StdReader<hdrs::File>;
     type Writer = HdfsWriter<hdrs::AsyncFile>;
     type BlockingWriter = HdfsWriter<hdrs::File>;
     type Pager = Option<HdfsPager>;
@@ -174,7 +173,6 @@ impl Accessor for HdfsBackend {
 
                 read: true,
                 read_can_seek: true,
-                read_with_range: true,
 
                 write: true,
                 // TODO: wait for 
https://github.com/apache/incubator-opendal/pull/2715
@@ -202,12 +200,10 @@ impl Accessor for HdfsBackend {
         Ok(RpCreateDir::default())
     }
 
-    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, 
Self::Reader)> {
-        use futures::AsyncSeekExt;
-
+    async fn read(&self, path: &str, _: OpRead) -> Result<(RpRead, 
Self::Reader)> {
         let p = build_rooted_abs_path(&self.root, path);
 
-        let mut f = self
+        let f = self
             .client
             .open_file()
             .read(true)
@@ -215,32 +211,7 @@ impl Accessor for HdfsBackend {
             .await
             .map_err(new_std_io_error)?;
 
-        let (start, end) = match (args.range().offset(), args.range().size()) {
-            (None, None) => (0, None),
-            (None, Some(size)) => {
-                let start = f
-                    .seek(SeekFrom::End(size as i64))
-                    .await
-                    .map_err(new_std_io_error)?;
-                (start, Some(start + size))
-            }
-            (Some(offset), None) => {
-                let start = f
-                    .seek(SeekFrom::Start(offset))
-                    .await
-                    .map_err(new_std_io_error)?;
-                (start, None)
-            }
-            (Some(offset), Some(size)) => {
-                let start = f
-                    .seek(SeekFrom::Start(offset))
-                    .await
-                    .map_err(new_std_io_error)?;
-                (start, Some(size))
-            }
-        };
-
-        let r = oio::FileReader::new(f, start, end);
+        let r = oio::FuturesReader::new(f);
 
         Ok((RpRead::new(0), r))
     }
@@ -352,37 +323,17 @@ impl Accessor for HdfsBackend {
         Ok(RpCreateDir::default())
     }
 
-    fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, 
Self::BlockingReader)> {
-        use std::io::Seek;
-
+    fn blocking_read(&self, path: &str, _: OpRead) -> Result<(RpRead, 
Self::BlockingReader)> {
         let p = build_rooted_abs_path(&self.root, path);
 
-        let mut f = self
+        let f = self
             .client
             .open_file()
             .read(true)
             .open(&p)
             .map_err(new_std_io_error)?;
 
-        let (start, end) = match (args.range().offset(), args.range().size()) {
-            (None, None) => (0, None),
-            (None, Some(size)) => {
-                let start = f
-                    .seek(SeekFrom::End(size as i64))
-                    .map_err(new_std_io_error)?;
-                (start, Some(start + size))
-            }
-            (Some(offset), None) => {
-                let start = 
f.seek(SeekFrom::Start(offset)).map_err(new_std_io_error)?;
-                (start, None)
-            }
-            (Some(offset), Some(size)) => {
-                let start = 
f.seek(SeekFrom::Start(offset)).map_err(new_std_io_error)?;
-                (start, Some(size))
-            }
-        };
-
-        let r = oio::FileReader::new(f, start, end);
+        let r = oio::StdReader::new(f);
 
         Ok((RpRead::new(0), r))
     }
diff --git a/core/src/services/sftp/backend.rs 
b/core/src/services/sftp/backend.rs
index 258c77066..5c67dab76 100644
--- a/core/src/services/sftp/backend.rs
+++ b/core/src/services/sftp/backend.rs
@@ -15,11 +15,9 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use async_compat::Compat;
 use std::collections::HashMap;
 use std::fmt::Debug;
 use std::fmt::Formatter;
-use std::io::SeekFrom;
 use std::path::Path;
 use std::path::PathBuf;
 use std::pin::Pin;
@@ -226,7 +224,7 @@ impl Debug for SftpBackend {
 
 #[async_trait]
 impl Accessor for SftpBackend {
-    type Reader = oio::FileReader<Pin<Box<Compat<TokioCompatFile>>>>;
+    type Reader = oio::TokioReader<Pin<Box<TokioCompatFile>>>;
     type BlockingReader = ();
     type Writer = SftpWriter;
     type BlockingWriter = ();
@@ -241,7 +239,6 @@ impl Accessor for SftpBackend {
                 stat: true,
 
                 read: true,
-                read_with_range: true,
                 read_can_seek: true,
 
                 write: true,
@@ -286,50 +283,22 @@ impl Accessor for SftpBackend {
         return Ok(RpCreateDir::default());
     }
 
-    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, 
Self::Reader)> {
-        use tokio::io::AsyncSeekExt;
-
+    async fn read(&self, path: &str, _: OpRead) -> Result<(RpRead, 
Self::Reader)> {
         let client = self.connect().await?;
 
         let mut fs = client.fs();
         fs.set_cwd(&self.root);
         let path = fs.canonicalize(path).await?;
 
-        let mut f = client.open(path.as_path()).await?;
-
-        let (start, end) = match (args.range().offset(), args.range().size()) {
-            (None, None) => (0, None),
-            (None, Some(size)) => {
-                let start = f
-                    .seek(SeekFrom::End(size as i64))
-                    .await
-                    .map_err(new_std_io_error)?;
-                (start, Some(start + size))
-            }
-            (Some(offset), None) => {
-                let start = f
-                    .seek(SeekFrom::Start(offset))
-                    .await
-                    .map_err(new_std_io_error)?;
-                (start, None)
-            }
-            (Some(offset), Some(size)) => {
-                let start = f
-                    .seek(SeekFrom::Start(offset))
-                    .await
-                    .map_err(new_std_io_error)?;
-                (start, Some(size))
-            }
-        };
+        let f = client.open(path.as_path()).await?;
 
         // Sorry for the ugly code...
         //
         // - `f` is a openssh file.
         // - `TokioCompatFile::new(f)` makes it implements tokio AsyncRead + 
AsyncSeek for openssh File.
-        // - `Compat::new(f)` make it compatible to `futures::AsyncRead + 
futures::AsyncSeek`.
         // - `Box::pin(x)` to make sure this reader implements `Unpin`, since 
`TokioCompatFile` is not.
-        // - `oio::FileReader::new(x)` makes it a `oio::FileReader` which 
implements `oio::Read`.
-        let r = 
oio::FileReader::new(Box::pin(Compat::new(TokioCompatFile::new(f))), start, 
end);
+        // - `oio::TokioReader::new(x)` makes it a `oio::TokioReader` which 
implements `oio::Read`.
+        let r = oio::TokioReader::new(Box::pin(TokioCompatFile::new(f)));
 
         Ok((RpRead::new(0), r))
     }


Reply via email to