This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch lazy-reader
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git

commit 2f091123fd50d6f0d3a06222f834589a4baf6c02
Author: Xuanwo <[email protected]>
AuthorDate: Wed Oct 25 20:25:59 2023 +0800

    Implement file reader
    
    Signed-off-by: Xuanwo <[email protected]>
---
 core/src/raw/oio/read/file_read.rs           | 186 ++++++++++++++++++++++++++
 core/src/raw/oio/read/into_read_from_file.rs | 192 ---------------------------
 core/src/raw/oio/read/mod.rs                 |   5 +-
 core/src/services/fs/backend.rs              | 116 +++++++---------
 core/src/services/hdfs/backend.rs            |  88 ++++++------
 core/src/services/sftp/backend.rs            |  90 +++++++++----
 core/src/services/sftp/utils.rs              |  73 ----------
 7 files changed, 345 insertions(+), 405 deletions(-)

diff --git a/core/src/raw/oio/read/file_read.rs 
b/core/src/raw/oio/read/file_read.rs
new file mode 100644
index 000000000..852e7c47d
--- /dev/null
+++ b/core/src/raw/oio/read/file_read.rs
@@ -0,0 +1,186 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::cmp;
+use std::io::Read;
+use std::io::Seek;
+use std::io::SeekFrom;
+use std::pin::Pin;
+use std::task::ready;
+use std::task::Context;
+use std::task::Poll;
+
+use bytes::Bytes;
+use futures::AsyncRead;
+use futures::AsyncSeek;
+
+use crate::raw::*;
+use crate::*;
+
+/// FileReader implements [`oio::Read`] via `AsyncRead + AsyncSeek`.
+pub struct FileReader<R> {
+    inner: R,
+
+    start: u64,
+    end: Option<u64>,
+
+    offset: u64,
+}
+
+impl<R> FileReader<R> {
+    /// Create a new FileReader.
+    ///
+    /// # Notes
+    ///
+    /// It's required that input reader's cursor is at the input `start` of 
the file.
+    pub fn new(fd: R, start: u64, end: Option<u64>) -> FileReader<R> {
+        FileReader {
+            inner: fd,
+            start,
+            end,
+
+            offset: start,
+        }
+    }
+
+    fn calculate_position(&self, pos: SeekFrom) -> Result<SeekFrom> {
+        match pos {
+            SeekFrom::Start(n) => {
+                if n < self.start {
+                    return Err(Error::new(
+                        ErrorKind::InvalidInput,
+                        "seek to a negative position is invalid",
+                    )
+                    .with_context("position", format!("{pos:?}")));
+                }
+
+                Ok(SeekFrom::Start(self.start + n))
+            }
+            SeekFrom::End(n) => {
+                let end = if let Some(end) = self.end {
+                    end as i64 + n
+                } else {
+                    n
+                };
+
+                if self.start as i64 + end < 0 {
+                    return Err(Error::new(
+                        ErrorKind::InvalidInput,
+                        "seek to a negative position is invalid",
+                    )
+                    .with_context("position", format!("{pos:?}")));
+                }
+
+                Ok(SeekFrom::End(end))
+            }
+            SeekFrom::Current(n) => {
+                if self.offset as i64 + n < self.start as i64 {
+                    return Err(Error::new(
+                        ErrorKind::InvalidInput,
+                        "seek to a negative position is invalid",
+                    )
+                    .with_context("position", format!("{pos:?}")));
+                }
+
+                Ok(SeekFrom::Current(n))
+            }
+        }
+    }
+}
+
+impl<R> oio::Read for FileReader<R>
+where
+    R: AsyncRead + AsyncSeek + Unpin + Send + Sync,
+{
+    fn poll_read(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> 
Poll<Result<usize>> {
+        let size = if let Some(end) = self.end {
+            if self.offset >= end {
+                return Poll::Ready(Ok(0));
+            }
+            cmp::min(buf.len(), (end - self.offset) as usize)
+        } else {
+            buf.len()
+        };
+
+        let n =
+            ready!(Pin::new(&mut self.inner).poll_read(cx, &mut 
buf[..size])).map_err(|err| {
+                Error::new(ErrorKind::Unexpected, "read data from 
FileReader").set_source(err)
+            })?;
+        self.offset += n as u64;
+        Poll::Ready(Ok(n))
+    }
+
+    fn poll_seek(&mut self, cx: &mut Context<'_>, pos: SeekFrom) -> 
Poll<Result<u64>> {
+        let pos = self.calculate_position(pos)?;
+
+        let cur = ready!(Pin::new(&mut self.inner).poll_seek(cx, 
pos)).map_err(|err| {
+            Error::new(ErrorKind::Unexpected, "seek data from 
FileReader").set_source(err)
+        })?;
+
+        self.offset = cur;
+        Poll::Ready(Ok(self.offset - self.start))
+    }
+
+    fn poll_next(&mut self, cx: &mut Context<'_>) -> 
Poll<Option<Result<Bytes>>> {
+        let _ = cx;
+
+        Poll::Ready(Some(Err(Error::new(
+            ErrorKind::Unsupported,
+            "output reader doesn't support next",
+        ))))
+    }
+}
+
+impl<R> oio::BlockingRead for FileReader<R>
+where
+    R: Read + Seek + Send + Sync + 'static,
+{
+    fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
+        let size = if let Some(end) = self.end {
+            if self.offset >= end {
+                return Ok(0);
+            }
+            cmp::min(buf.len(), (end - self.offset) as usize)
+        } else {
+            buf.len()
+        };
+
+        let n = self.inner.read(&mut buf[..size]).map_err(|err| {
+            Error::new(ErrorKind::Unexpected, "read data from 
FileReader").set_source(err)
+        })?;
+        self.offset += n as u64;
+        Ok(n)
+    }
+
+    fn seek(&mut self, pos: SeekFrom) -> Result<u64> {
+        let pos = self.calculate_position(pos)?;
+
+        let cur = self.inner.seek(pos).map_err(|err| {
+            Error::new(ErrorKind::Unexpected, "seek data from 
FileReader").set_source(err)
+        })?;
+
+        self.offset = cur;
+        Ok(self.offset - self.start)
+    }
+
+    fn next(&mut self) -> Option<Result<Bytes>> {
+        Some(Err(Error::new(
+            ErrorKind::Unsupported,
+            "output reader doesn't support iterating",
+        )))
+    }
+}
diff --git a/core/src/raw/oio/read/into_read_from_file.rs 
b/core/src/raw/oio/read/into_read_from_file.rs
deleted file mode 100644
index f005ac737..000000000
--- a/core/src/raw/oio/read/into_read_from_file.rs
+++ /dev/null
@@ -1,192 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use std::cmp;
-use std::io::Read;
-use std::io::Seek;
-use std::io::SeekFrom;
-use std::pin::Pin;
-use std::task::ready;
-use std::task::Context;
-use std::task::Poll;
-
-use bytes::Bytes;
-use futures::AsyncRead;
-use futures::AsyncSeek;
-
-use crate::raw::*;
-use crate::*;
-
-/// Convert given file into [`oio::Reader`].
-pub fn into_read_from_file<R>(fd: R, start: u64, end: u64) -> 
FromFileReader<R> {
-    FromFileReader {
-        inner: fd,
-        start,
-        end,
-        offset: 0,
-    }
-}
-
-/// FromFileReader is a wrapper of input fd to implement [`oio::Read`].
-pub struct FromFileReader<R> {
-    inner: R,
-
-    start: u64,
-    end: u64,
-    offset: u64,
-}
-
-impl<R> FromFileReader<R> {
-    pub(crate) fn current_size(&self) -> i64 {
-        debug_assert!(self.offset >= self.start, "offset must in range");
-        self.end as i64 - self.offset as i64
-    }
-}
-
-impl<R> oio::Read for FromFileReader<R>
-where
-    R: AsyncRead + AsyncSeek + Unpin + Send + Sync,
-{
-    fn poll_read(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> 
Poll<Result<usize>> {
-        if self.current_size() <= 0 {
-            return Poll::Ready(Ok(0));
-        }
-
-        let max = cmp::min(buf.len() as u64, self.current_size() as u64) as 
usize;
-        // TODO: we can use pread instead.
-        let n =
-            ready!(Pin::new(&mut self.inner).poll_read(cx, &mut 
buf[..max])).map_err(|err| {
-                Error::new(ErrorKind::Unexpected, "read data from FdReader")
-                    .with_context("source", "FdReader")
-                    .set_source(err)
-            })?;
-        self.offset += n as u64;
-        Poll::Ready(Ok(n))
-    }
-
-    /// TODO: maybe we don't need to do seek really, just call pread instead.
-    ///
-    /// We need to wait for tokio's pread support.
-    fn poll_seek(&mut self, cx: &mut Context<'_>, pos: SeekFrom) -> 
Poll<Result<u64>> {
-        let (base, offset) = match pos {
-            SeekFrom::Start(n) => (self.start as i64, n as i64),
-            SeekFrom::End(n) => (self.end as i64, n),
-            SeekFrom::Current(n) => (self.offset as i64, n),
-        };
-
-        match base.checked_add(offset) {
-            // Seek to position like `-123` is invalid.
-            Some(n) if n < 0 => Poll::Ready(Err(Error::new(
-                ErrorKind::InvalidInput,
-                "seek to a negative or overflowing position is invalid",
-            )
-            .with_context("position", n.to_string()))),
-            // Seek to position before the start of current file is invalid.
-            Some(n) if n < self.start as i64 => Poll::Ready(Err(Error::new(
-                ErrorKind::InvalidInput,
-                "seek to a position before start of file is invalid",
-            )
-            .with_context("position", n.to_string())
-            .with_context("start", self.start.to_string()))),
-            Some(n) => {
-                let cur =
-                    ready!(Pin::new(&mut self.inner).poll_seek(cx, 
SeekFrom::Start(n as u64)))
-                        .map_err(|err| {
-                            Error::new(ErrorKind::Unexpected, "seek data from 
FdReader")
-                                .with_context("source", "FdReader")
-                                .set_source(err)
-                        })?;
-
-                self.offset = cur;
-                Poll::Ready(Ok(self.offset - self.start))
-            }
-            None => Poll::Ready(Err(Error::new(
-                ErrorKind::InvalidInput,
-                "invalid seek to a negative or overflowing position",
-            ))),
-        }
-    }
-
-    fn poll_next(&mut self, cx: &mut Context<'_>) -> 
Poll<Option<Result<Bytes>>> {
-        let _ = cx;
-
-        Poll::Ready(Some(Err(Error::new(
-            ErrorKind::Unsupported,
-            "output reader doesn't support next",
-        ))))
-    }
-}
-
-impl<R> oio::BlockingRead for FromFileReader<R>
-where
-    R: Read + Seek + Send + Sync + 'static,
-{
-    fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
-        if self.current_size() <= 0 {
-            return Ok(0);
-        }
-
-        let max = cmp::min(buf.len() as u64, self.current_size() as u64) as 
usize;
-        // TODO: we can use pread instead.
-        let n = self.inner.read(&mut buf[..max]).map_err(|err| {
-            Error::new(ErrorKind::Unexpected, "read data from FdReader")
-                .with_context("source", "FdReader")
-                .set_source(err)
-        })?;
-        self.offset += n as u64;
-        Ok(n)
-    }
-
-    /// TODO: maybe we don't need to do seek really, just call pread instead.
-    ///
-    /// We need to wait for tokio's pread support.
-    fn seek(&mut self, pos: SeekFrom) -> Result<u64> {
-        let (base, offset) = match pos {
-            SeekFrom::Start(n) => (self.start as i64, n as i64),
-            SeekFrom::End(n) => (self.end as i64, n),
-            SeekFrom::Current(n) => (self.offset as i64, n),
-        };
-
-        match base.checked_add(offset) {
-            Some(n) if n < 0 => Err(Error::new(
-                ErrorKind::InvalidInput,
-                "invalid seek to a negative or overflowing position",
-            )),
-            Some(n) => {
-                let cur = self.inner.seek(SeekFrom::Start(n as 
u64)).map_err(|err| {
-                    Error::new(ErrorKind::Unexpected, "seek data from 
FdReader")
-                        .with_context("source", "FdReader")
-                        .set_source(err)
-                })?;
-
-                self.offset = cur;
-                Ok(self.offset - self.start)
-            }
-            None => Err(Error::new(
-                ErrorKind::InvalidInput,
-                "invalid seek to a negative or overflowing position",
-            )),
-        }
-    }
-
-    fn next(&mut self) -> Option<Result<Bytes>> {
-        Some(Err(Error::new(
-            ErrorKind::Unsupported,
-            "output reader doesn't support iterating",
-        )))
-    }
-}
diff --git a/core/src/raw/oio/read/mod.rs b/core/src/raw/oio/read/mod.rs
index f7d971782..78e888865 100644
--- a/core/src/raw/oio/read/mod.rs
+++ b/core/src/raw/oio/read/mod.rs
@@ -30,9 +30,8 @@ pub use into_streamable_read::StreamableReader;
 mod range_read;
 pub use range_read::RangeReader;
 
-mod into_read_from_file;
-pub use into_read_from_file::into_read_from_file;
-pub use into_read_from_file::FromFileReader;
+mod file_read;
+pub use file_read::FileReader;
 
 mod into_read_from_stream;
 pub use into_read_from_stream::into_read_from_stream;
diff --git a/core/src/services/fs/backend.rs b/core/src/services/fs/backend.rs
index 40ada10f2..9af013cf1 100644
--- a/core/src/services/fs/backend.rs
+++ b/core/src/services/fs/backend.rs
@@ -15,7 +15,6 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::cmp::min;
 use std::collections::HashMap;
 use std::io::SeekFrom;
 use std::path::Path;
@@ -248,8 +247,8 @@ impl FsBackend {
 
 #[async_trait]
 impl Accessor for FsBackend {
-    type Reader = oio::FromFileReader<Compat<tokio::fs::File>>;
-    type BlockingReader = oio::FromFileReader<std::fs::File>;
+    type Reader = oio::FileReader<Compat<tokio::fs::File>>;
+    type BlockingReader = oio::FileReader<std::fs::File>;
     type Writer = FsWriter<tokio::fs::File>;
     type BlockingWriter = FsWriter<std::fs::File>;
     type Pager = Option<FsPager<tokio::fs::ReadDir>>;
@@ -306,7 +305,7 @@ impl Accessor for FsBackend {
     ///
     /// Benchmark could be found 
[here](https://gist.github.com/Xuanwo/48f9cfbc3022ea5f865388bb62e1a70f)
     async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, 
Self::Reader)> {
-        use oio::ReadExt;
+        use tokio::io::AsyncSeekExt;
 
         let p = self.root.join(path.trim_end_matches('/'));
 
@@ -316,7 +315,7 @@ impl Accessor for FsBackend {
             .await
             .map_err(parse_io_error)?;
 
-        let total_length = if self.enable_path_check {
+        if self.enable_path_check {
             // Get fs metadata of file at given path, ensuring it is not a 
false-positive due to slash normalization.
             let meta = f.metadata().await.map_err(parse_io_error)?;
             if meta.is_dir() != path.ends_with('/') {
@@ -331,41 +330,36 @@ impl Accessor for FsBackend {
                     "given path is a directory",
                 ));
             }
+        }
 
-            meta.len()
-        } else {
-            use tokio::io::AsyncSeekExt;
-
-            f.seek(SeekFrom::End(0)).await.map_err(parse_io_error)?
-        };
-
-        let f = Compat::new(f);
-
-        let br = args.range();
-        let (start, end) = match (br.offset(), br.size()) {
-            // Read a specific range.
-            (Some(offset), Some(size)) => (offset, min(offset + size, 
total_length)),
-            // Read from offset.
-            (Some(offset), None) => (offset, total_length),
-            // Read the last size bytes.
-            (None, Some(size)) => (
-                if total_length > size {
-                    total_length - size
-                } else {
-                    0
-                },
-                total_length,
-            ),
-            // Read the whole file.
-            (None, None) => (0, total_length),
+        let (start, end) = match (args.range().offset(), args.range().size()) {
+            (None, None) => (0, None),
+            (None, Some(size)) => {
+                let start = f
+                    .seek(SeekFrom::End(size as i64))
+                    .await
+                    .map_err(parse_io_error)?;
+                (start, Some(start + size))
+            }
+            (Some(offset), None) => {
+                let start = f
+                    .seek(SeekFrom::Start(offset))
+                    .await
+                    .map_err(parse_io_error)?;
+                (start, None)
+            }
+            (Some(offset), Some(size)) => {
+                let start = f
+                    .seek(SeekFrom::Start(offset))
+                    .await
+                    .map_err(parse_io_error)?;
+                (start, Some(size))
+            }
         };
 
-        let mut r = oio::into_read_from_file(f, start, end);
+        let r = oio::FileReader::new(Compat::new(f), start, end);
 
-        // Rewind to make sure we are on the correct offset.
-        r.seek(SeekFrom::Start(0)).await?;
-
-        Ok((RpRead::new(end - start), r))
+        Ok((RpRead::new(0), r))
     }
 
     async fn write(&self, path: &str, op: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
@@ -510,7 +504,7 @@ impl Accessor for FsBackend {
     }
 
     fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, 
Self::BlockingReader)> {
-        use oio::BlockingRead;
+        use std::io::Seek;
 
         let p = self.root.join(path.trim_end_matches('/'));
 
@@ -519,7 +513,7 @@ impl Accessor for FsBackend {
             .open(p)
             .map_err(parse_io_error)?;
 
-        let total_length = if self.enable_path_check {
+        if self.enable_path_check {
             // Get fs metadata of file at given path, ensuring it is not a 
false-positive due to slash normalization.
             let meta = f.metadata().map_err(parse_io_error)?;
             if meta.is_dir() != path.ends_with('/') {
@@ -534,39 +528,27 @@ impl Accessor for FsBackend {
                     "given path is a directory",
                 ));
             }
+        }
 
-            meta.len()
-        } else {
-            use std::io::Seek;
-
-            f.seek(SeekFrom::End(0)).map_err(parse_io_error)?
-        };
-
-        let br = args.range();
-        let (start, end) = match (br.offset(), br.size()) {
-            // Read a specific range.
-            (Some(offset), Some(size)) => (offset, min(offset + size, 
total_length)),
-            // Read from offset.
-            (Some(offset), None) => (offset, total_length),
-            // Read the last size bytes.
-            (None, Some(size)) => (
-                if total_length > size {
-                    total_length - size
-                } else {
-                    0
-                },
-                total_length,
-            ),
-            // Read the whole file.
-            (None, None) => (0, total_length),
+        let (start, end) = match (args.range().offset(), args.range().size()) {
+            (None, None) => (0, None),
+            (None, Some(size)) => {
+                let start = f.seek(SeekFrom::End(size as 
i64)).map_err(parse_io_error)?;
+                (start, Some(start + size))
+            }
+            (Some(offset), None) => {
+                let start = 
f.seek(SeekFrom::Start(offset)).map_err(parse_io_error)?;
+                (start, None)
+            }
+            (Some(offset), Some(size)) => {
+                let start = 
f.seek(SeekFrom::Start(offset)).map_err(parse_io_error)?;
+                (start, Some(size))
+            }
         };
 
-        let mut r: oio::FromFileReader<std::fs::File> = 
oio::into_read_from_file(f, start, end);
-
-        // Rewind to make sure we are on the correct offset.
-        r.seek(SeekFrom::Start(0))?;
+        let r = oio::FileReader::new(f, start, end);
 
-        Ok((RpRead::new(end - start), r))
+        Ok((RpRead::new(0), r))
     }
 
     fn blocking_write(&self, path: &str, op: OpWrite) -> Result<(RpWrite, 
Self::BlockingWriter)> {
diff --git a/core/src/services/hdfs/backend.rs 
b/core/src/services/hdfs/backend.rs
index a6078c89e..4d093656b 100644
--- a/core/src/services/hdfs/backend.rs
+++ b/core/src/services/hdfs/backend.rs
@@ -15,7 +15,6 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::cmp::min;
 use std::collections::HashMap;
 use std::fmt::Debug;
 use std::io;
@@ -160,8 +159,8 @@ unsafe impl Sync for HdfsBackend {}
 
 #[async_trait]
 impl Accessor for HdfsBackend {
-    type Reader = oio::FromFileReader<hdrs::AsyncFile>;
-    type BlockingReader = oio::FromFileReader<hdrs::File>;
+    type Reader = oio::FileReader<hdrs::AsyncFile>;
+    type BlockingReader = oio::FileReader<hdrs::File>;
     type Writer = HdfsWriter<hdrs::AsyncFile>;
     type BlockingWriter = HdfsWriter<hdrs::File>;
     type Pager = Option<HdfsPager>;
@@ -205,14 +204,11 @@ impl Accessor for HdfsBackend {
     }
 
     async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, 
Self::Reader)> {
-        use oio::ReadExt;
+        use futures::AsyncSeekExt;
 
         let p = build_rooted_abs_path(&self.root, path);
 
-        // This will be addressed by 
https://github.com/apache/incubator-opendal/issues/506
-        let meta = self.client.metadata(&p).map_err(parse_io_error)?;
-
-        let f = self
+        let mut f = self
             .client
             .open_file()
             .read(true)
@@ -220,23 +216,34 @@ impl Accessor for HdfsBackend {
             .await
             .map_err(parse_io_error)?;
 
-        let br = args.range();
-        let (start, end) = match (br.offset(), br.size()) {
-            // Read a specific range.
-            (Some(offset), Some(size)) => (offset, min(offset + size, 
meta.len())),
-            // Read from offset.
-            (Some(offset), None) => (offset, meta.len()),
-            // Read the last size bytes.
-            (None, Some(size)) => (meta.len() - size, meta.len()),
-            // Read the whole file.
-            (None, None) => (0, meta.len()),
+        let (start, end) = match (args.range().offset(), args.range().size()) {
+            (None, None) => (0, None),
+            (None, Some(size)) => {
+                let start = f
+                    .seek(SeekFrom::End(size as i64))
+                    .await
+                    .map_err(parse_io_error)?;
+                (start, Some(start + size))
+            }
+            (Some(offset), None) => {
+                let start = f
+                    .seek(SeekFrom::Start(offset))
+                    .await
+                    .map_err(parse_io_error)?;
+                (start, None)
+            }
+            (Some(offset), Some(size)) => {
+                let start = f
+                    .seek(SeekFrom::Start(offset))
+                    .await
+                    .map_err(parse_io_error)?;
+                (start, Some(size))
+            }
         };
 
-        let mut r = oio::into_read_from_file(f, start, end);
-        // Rewind to make sure we are on the correct offset.
-        r.seek(SeekFrom::Start(0)).await?;
+        let r = oio::FileReader::new(f, start, end);
 
-        Ok((RpRead::new(end - start), r))
+        Ok((RpRead::new(0), r))
     }
 
     async fn write(&self, path: &str, op: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
@@ -344,37 +351,36 @@ impl Accessor for HdfsBackend {
     }
 
     fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, 
Self::BlockingReader)> {
-        use oio::BlockingRead;
+        use std::io::Seek;
 
         let p = build_rooted_abs_path(&self.root, path);
 
-        // This will be addressed by 
https://github.com/apache/incubator-opendal/issues/506
-        let meta = self.client.metadata(&p).map_err(parse_io_error)?;
-
-        let f = self
+        let mut f = self
             .client
             .open_file()
             .read(true)
             .open(&p)
             .map_err(parse_io_error)?;
 
-        let br = args.range();
-        let (start, end) = match (br.offset(), br.size()) {
-            // Read a specific range.
-            (Some(offset), Some(size)) => (offset, min(offset + size, 
meta.len())),
-            // Read from offset.
-            (Some(offset), None) => (offset, meta.len()),
-            // Read the last size bytes.
-            (None, Some(size)) => (meta.len() - size, meta.len()),
-            // Read the whole file.
-            (None, None) => (0, meta.len()),
+        let (start, end) = match (args.range().offset(), args.range().size()) {
+            (None, None) => (0, None),
+            (None, Some(size)) => {
+                let start = f.seek(SeekFrom::End(size as 
i64)).map_err(parse_io_error)?;
+                (start, Some(start + size))
+            }
+            (Some(offset), None) => {
+                let start = 
f.seek(SeekFrom::Start(offset)).map_err(parse_io_error)?;
+                (start, None)
+            }
+            (Some(offset), Some(size)) => {
+                let start = 
f.seek(SeekFrom::Start(offset)).map_err(parse_io_error)?;
+                (start, Some(size))
+            }
         };
 
-        let mut r = oio::into_read_from_file(f, start, end);
-        // Rewind to make sure we are on the correct offset.
-        r.seek(SeekFrom::Start(0))?;
+        let r = oio::FileReader::new(f, start, end);
 
-        Ok((RpRead::new(end - start), r))
+        Ok((RpRead::new(0), r))
     }
 
     fn blocking_write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, 
Self::BlockingWriter)> {
diff --git a/core/src/services/sftp/backend.rs 
b/core/src/services/sftp/backend.rs
index 0f9b7f3bd..774221534 100644
--- a/core/src/services/sftp/backend.rs
+++ b/core/src/services/sftp/backend.rs
@@ -15,25 +15,27 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::cmp::min;
+use async_compat::Compat;
 use std::collections::HashMap;
 use std::fmt::Debug;
 use std::fmt::Formatter;
+use std::io::SeekFrom;
 use std::path::Path;
 use std::path::PathBuf;
+use std::pin::Pin;
 
 use async_trait::async_trait;
 use futures::StreamExt;
 use log::debug;
 use openssh::KnownHosts;
 use openssh::SessionBuilder;
+use openssh_sftp_client::file::TokioCompatFile;
 use openssh_sftp_client::Sftp;
 use openssh_sftp_client::SftpOptions;
 
 use super::error::is_not_found;
 use super::error::is_sftp_protocol_error;
 use super::pager::SftpPager;
-use super::utils::SftpReader;
 use super::writer::SftpWriter;
 use crate::raw::*;
 use crate::*;
@@ -224,7 +226,7 @@ impl Debug for SftpBackend {
 
 #[async_trait]
 impl Accessor for SftpBackend {
-    type Reader = SftpReader;
+    type Reader = oio::FileReader<Pin<Box<Compat<TokioCompatFile>>>>;
     type BlockingReader = ();
     type Writer = SftpWriter;
     type BlockingWriter = ();
@@ -285,41 +287,51 @@ impl Accessor for SftpBackend {
     }
 
     async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, 
Self::Reader)> {
+        use tokio::io::AsyncSeekExt;
+
         let client = self.connect().await?;
 
         let mut fs = client.fs();
         fs.set_cwd(&self.root);
         let path = fs.canonicalize(path).await?;
 
-        let mut file = client.open(path.as_path()).await?;
-
-        let total_length = file.metadata().await?.len().ok_or(Error::new(
-            ErrorKind::NotFound,
-            format!("file not found: {}", path.to_str().unwrap()).as_str(),
-        ))?;
-
-        let br = args.range();
-        let (start, end) = match (br.offset(), br.size()) {
-            // Read a specific range.
-            (Some(offset), Some(size)) => (offset, min(offset + size, 
total_length)),
-            // Read from offset.
-            (Some(offset), None) => (offset, total_length),
-            // Read the last size bytes.
-            (None, Some(size)) => (
-                if total_length > size {
-                    total_length - size
-                } else {
-                    0
-                },
-                total_length,
-            ),
-            // Read the whole file.
-            (None, None) => (0, total_length),
+        let mut f = client.open(path.as_path()).await?;
+
+        let (start, end) = match (args.range().offset(), args.range().size()) {
+            (None, None) => (0, None),
+            (None, Some(size)) => {
+                let start = f
+                    .seek(SeekFrom::End(size as i64))
+                    .await
+                    .map_err(parse_io_error)?;
+                (start, Some(start + size))
+            }
+            (Some(offset), None) => {
+                let start = f
+                    .seek(SeekFrom::Start(offset))
+                    .await
+                    .map_err(parse_io_error)?;
+                (start, None)
+            }
+            (Some(offset), Some(size)) => {
+                let start = f
+                    .seek(SeekFrom::Start(offset))
+                    .await
+                    .map_err(parse_io_error)?;
+                (start, Some(size))
+            }
         };
 
-        let r = SftpReader::new(file, start, end).await?;
+        // Sorry for the ugly code...
+        //
+        // - `f` is a openssh file.
+        // - `TokioCompatFile::new(f)` makes it implements tokio AsyncRead + 
AsyncSeek for openssh File.
+        // - `Compat::new(f)` make it compatible to `futures::AsyncRead + 
futures::AsyncSeek`.
+        // - `Box::pin(x)` to make sure this reader implements `Unpin`, since 
`TokioCompatFile` is not.
+        // - `oio::FileReader::new(x)` makes it a `oio::FileReader` which 
implements `oio::Read`.
+        let r = 
oio::FileReader::new(Box::pin(Compat::new(TokioCompatFile::new(f))), start, 
end);
 
-        Ok((RpRead::new(end - start), r))
+        Ok((RpRead::new(0), r))
     }
 
     async fn write(&self, path: &str, op: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
@@ -546,3 +558,23 @@ async fn connect_sftp(
 
     Ok(sftp)
 }
+
+/// Parse all io related errors.
+pub fn parse_io_error(err: std::io::Error) -> Error {
+    use std::io::ErrorKind::*;
+
+    let (kind, retryable) = match err.kind() {
+        NotFound => (ErrorKind::NotFound, false),
+        PermissionDenied => (ErrorKind::PermissionDenied, false),
+        Interrupted | UnexpectedEof | TimedOut | WouldBlock => 
(ErrorKind::Unexpected, true),
+        _ => (ErrorKind::Unexpected, true),
+    };
+
+    let mut err = Error::new(kind, &err.kind().to_string()).set_source(err);
+
+    if retryable {
+        err = err.set_temporary();
+    }
+
+    err
+}
diff --git a/core/src/services/sftp/utils.rs b/core/src/services/sftp/utils.rs
index fc8bb8a6f..5e627b89a 100644
--- a/core/src/services/sftp/utils.rs
+++ b/core/src/services/sftp/utils.rs
@@ -15,83 +15,10 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::io::SeekFrom;
-use std::pin::Pin;
-use std::task::Context;
-use std::task::Poll;
-
-use async_compat::Compat;
-use futures::AsyncBufRead;
-use futures::AsyncRead;
-use futures::AsyncSeek;
-use openssh_sftp_client::file::File;
-use openssh_sftp_client::file::TokioCompatFile;
 use openssh_sftp_client::metadata::MetaData as SftpMeta;
 
-use crate::raw::oio;
-use crate::raw::oio::FromFileReader;
-use crate::raw::oio::ReadExt;
 use crate::EntryMode;
 use crate::Metadata;
-use crate::Result;
-
-pub struct SftpReaderInner {
-    file: Pin<Box<Compat<TokioCompatFile>>>,
-}
-pub type SftpReader = FromFileReader<SftpReaderInner>;
-
-impl SftpReaderInner {
-    pub async fn new(file: File) -> Self {
-        let file = Compat::new(file.into());
-        Self {
-            file: Box::pin(file),
-        }
-    }
-}
-
-impl SftpReader {
-    /// Create a new reader from a file, starting at the given offset and 
ending at the given offset.
-    pub async fn new(file: File, start: u64, end: u64) -> Result<Self> {
-        let file = SftpReaderInner::new(file).await;
-        let mut r = oio::into_read_from_file(file, start, end);
-        r.seek(SeekFrom::Start(0)).await?;
-        Ok(r)
-    }
-}
-
-impl AsyncRead for SftpReaderInner {
-    fn poll_read(
-        self: Pin<&mut Self>,
-        cx: &mut Context,
-        buf: &mut [u8],
-    ) -> Poll<std::io::Result<usize>> {
-        let this = self.get_mut();
-        Pin::new(&mut this.file).poll_read(cx, buf)
-    }
-}
-
-impl AsyncBufRead for SftpReaderInner {
-    fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context) -> 
Poll<std::io::Result<&[u8]>> {
-        let this = self.get_mut();
-        Pin::new(&mut this.file).poll_fill_buf(cx)
-    }
-
-    fn consume(self: Pin<&mut Self>, amt: usize) {
-        let this = self.get_mut();
-        Pin::new(&mut this.file).consume(amt)
-    }
-}
-
-impl AsyncSeek for SftpReaderInner {
-    fn poll_seek(
-        self: Pin<&mut Self>,
-        cx: &mut Context,
-        pos: SeekFrom,
-    ) -> Poll<std::io::Result<u64>> {
-        let this = self.get_mut();
-        Pin::new(&mut this.file).poll_seek(cx, pos)
-    }
-}
 
 impl From<SftpMeta> for Metadata {
     fn from(meta: SftpMeta) -> Self {

Reply via email to