This is an automated email from the ASF dual-hosted git repository. xuanwo pushed a commit to branch lazy-reader in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
commit 2f091123fd50d6f0d3a06222f834589a4baf6c02 Author: Xuanwo <[email protected]> AuthorDate: Wed Oct 25 20:25:59 2023 +0800 Implement file reader Signed-off-by: Xuanwo <[email protected]> --- core/src/raw/oio/read/file_read.rs | 186 ++++++++++++++++++++++++++ core/src/raw/oio/read/into_read_from_file.rs | 192 --------------------------- core/src/raw/oio/read/mod.rs | 5 +- core/src/services/fs/backend.rs | 116 +++++++--------- core/src/services/hdfs/backend.rs | 88 ++++++------ core/src/services/sftp/backend.rs | 90 +++++++++---- core/src/services/sftp/utils.rs | 73 ---------- 7 files changed, 345 insertions(+), 405 deletions(-) diff --git a/core/src/raw/oio/read/file_read.rs b/core/src/raw/oio/read/file_read.rs new file mode 100644 index 000000000..852e7c47d --- /dev/null +++ b/core/src/raw/oio/read/file_read.rs @@ -0,0 +1,186 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::cmp; +use std::io::Read; +use std::io::Seek; +use std::io::SeekFrom; +use std::pin::Pin; +use std::task::ready; +use std::task::Context; +use std::task::Poll; + +use bytes::Bytes; +use futures::AsyncRead; +use futures::AsyncSeek; + +use crate::raw::*; +use crate::*; + +/// FileReader implements [`oio::Read`] via `AsyncRead + AsyncSeek`. +pub struct FileReader<R> { + inner: R, + + start: u64, + end: Option<u64>, + + offset: u64, +} + +impl<R> FileReader<R> { + /// Create a new FileReader. + /// + /// # Notes + /// + /// It's required that input reader's cursor is at the input `start` of the file. + pub fn new(fd: R, start: u64, end: Option<u64>) -> FileReader<R> { + FileReader { + inner: fd, + start, + end, + + offset: start, + } + } + + fn calculate_position(&self, pos: SeekFrom) -> Result<SeekFrom> { + match pos { + SeekFrom::Start(n) => { + if n < self.start { + return Err(Error::new( + ErrorKind::InvalidInput, + "seek to a negative position is invalid", + ) + .with_context("position", format!("{pos:?}"))); + } + + Ok(SeekFrom::Start(self.start + n)) + } + SeekFrom::End(n) => { + let end = if let Some(end) = self.end { + end as i64 + n + } else { + n + }; + + if self.start as i64 + end < 0 { + return Err(Error::new( + ErrorKind::InvalidInput, + "seek to a negative position is invalid", + ) + .with_context("position", format!("{pos:?}"))); + } + + Ok(SeekFrom::End(end)) + } + SeekFrom::Current(n) => { + if self.offset as i64 + n < self.start as i64 { + return Err(Error::new( + ErrorKind::InvalidInput, + "seek to a negative position is invalid", + ) + .with_context("position", format!("{pos:?}"))); + } + + Ok(SeekFrom::Current(n)) + } + } + } +} + +impl<R> oio::Read for FileReader<R> +where + R: AsyncRead + AsyncSeek + Unpin + Send + Sync, +{ + fn poll_read(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<Result<usize>> { + let size = if let Some(end) = self.end { + if self.offset >= end { + return Poll::Ready(Ok(0)); + } + cmp::min(buf.len(), (end - self.offset) as usize) + } else { + buf.len() + }; + + let n = + ready!(Pin::new(&mut self.inner).poll_read(cx, &mut buf[..size])).map_err(|err| { + Error::new(ErrorKind::Unexpected, "read data from FileReader").set_source(err) + })?; + self.offset += n as u64; + Poll::Ready(Ok(n)) + } + + fn poll_seek(&mut self, cx: &mut Context<'_>, pos: SeekFrom) -> Poll<Result<u64>> { + let pos = self.calculate_position(pos)?; + + let cur = ready!(Pin::new(&mut self.inner).poll_seek(cx, pos)).map_err(|err| { + Error::new(ErrorKind::Unexpected, "seek data from FileReader").set_source(err) + })?; + + self.offset = cur; + Poll::Ready(Ok(self.offset - self.start)) + } + + fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes>>> { + let _ = cx; + + Poll::Ready(Some(Err(Error::new( + ErrorKind::Unsupported, + "output reader doesn't support next", + )))) + } +} + +impl<R> oio::BlockingRead for FileReader<R> +where + R: Read + Seek + Send + Sync + 'static, +{ + fn read(&mut self, buf: &mut [u8]) -> Result<usize> { + let size = if let Some(end) = self.end { + if self.offset >= end { + return Ok(0); + } + cmp::min(buf.len(), (end - self.offset) as usize) + } else { + buf.len() + }; + + let n = self.inner.read(&mut buf[..size]).map_err(|err| { + Error::new(ErrorKind::Unexpected, "read data from FileReader").set_source(err) + })?; + self.offset += n as u64; + Ok(n) + } + + fn seek(&mut self, pos: SeekFrom) -> Result<u64> { + let pos = self.calculate_position(pos)?; + + let cur = self.inner.seek(pos).map_err(|err| { + Error::new(ErrorKind::Unexpected, "seek data from FileReader").set_source(err) + })?; + + self.offset = cur; + Ok(self.offset - self.start) + } + + fn next(&mut self) -> Option<Result<Bytes>> { + Some(Err(Error::new( + ErrorKind::Unsupported, + "output reader doesn't support iterating", + ))) + } +} diff --git a/core/src/raw/oio/read/into_read_from_file.rs b/core/src/raw/oio/read/into_read_from_file.rs deleted file mode 100644 index f005ac737..000000000 --- a/core/src/raw/oio/read/into_read_from_file.rs +++ /dev/null @@ -1,192 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use std::cmp; -use std::io::Read; -use std::io::Seek; -use std::io::SeekFrom; -use std::pin::Pin; -use std::task::ready; -use std::task::Context; -use std::task::Poll; - -use bytes::Bytes; -use futures::AsyncRead; -use futures::AsyncSeek; - -use crate::raw::*; -use crate::*; - -/// Convert given file into [`oio::Reader`]. -pub fn into_read_from_file<R>(fd: R, start: u64, end: u64) -> FromFileReader<R> { - FromFileReader { - inner: fd, - start, - end, - offset: 0, - } -} - -/// FromFileReader is a wrapper of input fd to implement [`oio::Read`]. -pub struct FromFileReader<R> { - inner: R, - - start: u64, - end: u64, - offset: u64, -} - -impl<R> FromFileReader<R> { - pub(crate) fn current_size(&self) -> i64 { - debug_assert!(self.offset >= self.start, "offset must in range"); - self.end as i64 - self.offset as i64 - } -} - -impl<R> oio::Read for FromFileReader<R> -where - R: AsyncRead + AsyncSeek + Unpin + Send + Sync, -{ - fn poll_read(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<Result<usize>> { - if self.current_size() <= 0 { - return Poll::Ready(Ok(0)); - } - - let max = cmp::min(buf.len() as u64, self.current_size() as u64) as usize; - // TODO: we can use pread instead. - let n = - ready!(Pin::new(&mut self.inner).poll_read(cx, &mut buf[..max])).map_err(|err| { - Error::new(ErrorKind::Unexpected, "read data from FdReader") - .with_context("source", "FdReader") - .set_source(err) - })?; - self.offset += n as u64; - Poll::Ready(Ok(n)) - } - - /// TODO: maybe we don't need to do seek really, just call pread instead. - /// - /// We need to wait for tokio's pread support. - fn poll_seek(&mut self, cx: &mut Context<'_>, pos: SeekFrom) -> Poll<Result<u64>> { - let (base, offset) = match pos { - SeekFrom::Start(n) => (self.start as i64, n as i64), - SeekFrom::End(n) => (self.end as i64, n), - SeekFrom::Current(n) => (self.offset as i64, n), - }; - - match base.checked_add(offset) { - // Seek to position like `-123` is invalid. - Some(n) if n < 0 => Poll::Ready(Err(Error::new( - ErrorKind::InvalidInput, - "seek to a negative or overflowing position is invalid", - ) - .with_context("position", n.to_string()))), - // Seek to position before the start of current file is invalid. - Some(n) if n < self.start as i64 => Poll::Ready(Err(Error::new( - ErrorKind::InvalidInput, - "seek to a position before start of file is invalid", - ) - .with_context("position", n.to_string()) - .with_context("start", self.start.to_string()))), - Some(n) => { - let cur = - ready!(Pin::new(&mut self.inner).poll_seek(cx, SeekFrom::Start(n as u64))) - .map_err(|err| { - Error::new(ErrorKind::Unexpected, "seek data from FdReader") - .with_context("source", "FdReader") - .set_source(err) - })?; - - self.offset = cur; - Poll::Ready(Ok(self.offset - self.start)) - } - None => Poll::Ready(Err(Error::new( - ErrorKind::InvalidInput, - "invalid seek to a negative or overflowing position", - ))), - } - } - - fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes>>> { - let _ = cx; - - Poll::Ready(Some(Err(Error::new( - ErrorKind::Unsupported, - "output reader doesn't support next", - )))) - } -} - -impl<R> oio::BlockingRead for FromFileReader<R> -where - R: Read + Seek + Send + Sync + 'static, -{ - fn read(&mut self, buf: &mut [u8]) -> Result<usize> { - if self.current_size() <= 0 { - return Ok(0); - } - - let max = cmp::min(buf.len() as u64, self.current_size() as u64) as usize; - // TODO: we can use pread instead. - let n = self.inner.read(&mut buf[..max]).map_err(|err| { - Error::new(ErrorKind::Unexpected, "read data from FdReader") - .with_context("source", "FdReader") - .set_source(err) - })?; - self.offset += n as u64; - Ok(n) - } - - /// TODO: maybe we don't need to do seek really, just call pread instead. - /// - /// We need to wait for tokio's pread support. - fn seek(&mut self, pos: SeekFrom) -> Result<u64> { - let (base, offset) = match pos { - SeekFrom::Start(n) => (self.start as i64, n as i64), - SeekFrom::End(n) => (self.end as i64, n), - SeekFrom::Current(n) => (self.offset as i64, n), - }; - - match base.checked_add(offset) { - Some(n) if n < 0 => Err(Error::new( - ErrorKind::InvalidInput, - "invalid seek to a negative or overflowing position", - )), - Some(n) => { - let cur = self.inner.seek(SeekFrom::Start(n as u64)).map_err(|err| { - Error::new(ErrorKind::Unexpected, "seek data from FdReader") - .with_context("source", "FdReader") - .set_source(err) - })?; - - self.offset = cur; - Ok(self.offset - self.start) - } - None => Err(Error::new( - ErrorKind::InvalidInput, - "invalid seek to a negative or overflowing position", - )), - } - } - - fn next(&mut self) -> Option<Result<Bytes>> { - Some(Err(Error::new( - ErrorKind::Unsupported, - "output reader doesn't support iterating", - ))) - } -} diff --git a/core/src/raw/oio/read/mod.rs b/core/src/raw/oio/read/mod.rs index f7d971782..78e888865 100644 --- a/core/src/raw/oio/read/mod.rs +++ b/core/src/raw/oio/read/mod.rs @@ -30,9 +30,8 @@ pub use into_streamable_read::StreamableReader; mod range_read; pub use range_read::RangeReader; -mod into_read_from_file; -pub use into_read_from_file::into_read_from_file; -pub use into_read_from_file::FromFileReader; +mod file_read; +pub use file_read::FileReader; mod into_read_from_stream; pub use into_read_from_stream::into_read_from_stream; diff --git a/core/src/services/fs/backend.rs b/core/src/services/fs/backend.rs index 40ada10f2..9af013cf1 100644 --- a/core/src/services/fs/backend.rs +++ b/core/src/services/fs/backend.rs @@ -15,7 +15,6 @@ // specific language governing permissions and limitations // under the License. -use std::cmp::min; use std::collections::HashMap; use std::io::SeekFrom; use std::path::Path; @@ -248,8 +247,8 @@ impl FsBackend { #[async_trait] impl Accessor for FsBackend { - type Reader = oio::FromFileReader<Compat<tokio::fs::File>>; - type BlockingReader = oio::FromFileReader<std::fs::File>; + type Reader = oio::FileReader<Compat<tokio::fs::File>>; + type BlockingReader = oio::FileReader<std::fs::File>; type Writer = FsWriter<tokio::fs::File>; type BlockingWriter = FsWriter<std::fs::File>; type Pager = Option<FsPager<tokio::fs::ReadDir>>; @@ -306,7 +305,7 @@ impl Accessor for FsBackend { /// /// Benchmark could be found [here](https://gist.github.com/Xuanwo/48f9cfbc3022ea5f865388bb62e1a70f) async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> { - use oio::ReadExt; + use tokio::io::AsyncSeekExt; let p = self.root.join(path.trim_end_matches('/')); @@ -316,7 +315,7 @@ impl Accessor for FsBackend { .await .map_err(parse_io_error)?; - let total_length = if self.enable_path_check { + if self.enable_path_check { // Get fs metadata of file at given path, ensuring it is not a false-positive due to slash normalization. let meta = f.metadata().await.map_err(parse_io_error)?; if meta.is_dir() != path.ends_with('/') { @@ -331,41 +330,36 @@ impl Accessor for FsBackend { "given path is a directory", )); } + } - meta.len() - } else { - use tokio::io::AsyncSeekExt; - - f.seek(SeekFrom::End(0)).await.map_err(parse_io_error)? - }; - - let f = Compat::new(f); - - let br = args.range(); - let (start, end) = match (br.offset(), br.size()) { - // Read a specific range. - (Some(offset), Some(size)) => (offset, min(offset + size, total_length)), - // Read from offset. - (Some(offset), None) => (offset, total_length), - // Read the last size bytes. - (None, Some(size)) => ( - if total_length > size { - total_length - size - } else { - 0 - }, - total_length, - ), - // Read the whole file. - (None, None) => (0, total_length), + let (start, end) = match (args.range().offset(), args.range().size()) { + (None, None) => (0, None), + (None, Some(size)) => { + let start = f + .seek(SeekFrom::End(size as i64)) + .await + .map_err(parse_io_error)?; + (start, Some(start + size)) + } + (Some(offset), None) => { + let start = f + .seek(SeekFrom::Start(offset)) + .await + .map_err(parse_io_error)?; + (start, None) + } + (Some(offset), Some(size)) => { + let start = f + .seek(SeekFrom::Start(offset)) + .await + .map_err(parse_io_error)?; + (start, Some(size)) + } }; - let mut r = oio::into_read_from_file(f, start, end); + let r = oio::FileReader::new(Compat::new(f), start, end); - // Rewind to make sure we are on the correct offset. - r.seek(SeekFrom::Start(0)).await?; - - Ok((RpRead::new(end - start), r)) + Ok((RpRead::new(0), r)) } async fn write(&self, path: &str, op: OpWrite) -> Result<(RpWrite, Self::Writer)> { @@ -510,7 +504,7 @@ impl Accessor for FsBackend { } fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> { - use oio::BlockingRead; + use std::io::Seek; let p = self.root.join(path.trim_end_matches('/')); @@ -519,7 +513,7 @@ impl Accessor for FsBackend { .open(p) .map_err(parse_io_error)?; - let total_length = if self.enable_path_check { + if self.enable_path_check { // Get fs metadata of file at given path, ensuring it is not a false-positive due to slash normalization. let meta = f.metadata().map_err(parse_io_error)?; if meta.is_dir() != path.ends_with('/') { @@ -534,39 +528,27 @@ impl Accessor for FsBackend { "given path is a directory", )); } + } - meta.len() - } else { - use std::io::Seek; - - f.seek(SeekFrom::End(0)).map_err(parse_io_error)? - }; - - let br = args.range(); - let (start, end) = match (br.offset(), br.size()) { - // Read a specific range. - (Some(offset), Some(size)) => (offset, min(offset + size, total_length)), - // Read from offset. - (Some(offset), None) => (offset, total_length), - // Read the last size bytes. - (None, Some(size)) => ( - if total_length > size { - total_length - size - } else { - 0 - }, - total_length, - ), - // Read the whole file. - (None, None) => (0, total_length), + let (start, end) = match (args.range().offset(), args.range().size()) { + (None, None) => (0, None), + (None, Some(size)) => { + let start = f.seek(SeekFrom::End(size as i64)).map_err(parse_io_error)?; + (start, Some(start + size)) + } + (Some(offset), None) => { + let start = f.seek(SeekFrom::Start(offset)).map_err(parse_io_error)?; + (start, None) + } + (Some(offset), Some(size)) => { + let start = f.seek(SeekFrom::Start(offset)).map_err(parse_io_error)?; + (start, Some(size)) + } }; - let mut r: oio::FromFileReader<std::fs::File> = oio::into_read_from_file(f, start, end); - - // Rewind to make sure we are on the correct offset. - r.seek(SeekFrom::Start(0))?; + let r = oio::FileReader::new(f, start, end); - Ok((RpRead::new(end - start), r)) + Ok((RpRead::new(0), r)) } fn blocking_write(&self, path: &str, op: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> { diff --git a/core/src/services/hdfs/backend.rs b/core/src/services/hdfs/backend.rs index a6078c89e..4d093656b 100644 --- a/core/src/services/hdfs/backend.rs +++ b/core/src/services/hdfs/backend.rs @@ -15,7 +15,6 @@ // specific language governing permissions and limitations // under the License. -use std::cmp::min; use std::collections::HashMap; use std::fmt::Debug; use std::io; @@ -160,8 +159,8 @@ unsafe impl Sync for HdfsBackend {} #[async_trait] impl Accessor for HdfsBackend { - type Reader = oio::FromFileReader<hdrs::AsyncFile>; - type BlockingReader = oio::FromFileReader<hdrs::File>; + type Reader = oio::FileReader<hdrs::AsyncFile>; + type BlockingReader = oio::FileReader<hdrs::File>; type Writer = HdfsWriter<hdrs::AsyncFile>; type BlockingWriter = HdfsWriter<hdrs::File>; type Pager = Option<HdfsPager>; @@ -205,14 +204,11 @@ impl Accessor for HdfsBackend { } async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> { - use oio::ReadExt; + use futures::AsyncSeekExt; let p = build_rooted_abs_path(&self.root, path); - // This will be addressed by https://github.com/apache/incubator-opendal/issues/506 - let meta = self.client.metadata(&p).map_err(parse_io_error)?; - - let f = self + let mut f = self .client .open_file() .read(true) @@ -220,23 +216,34 @@ impl Accessor for HdfsBackend { .await .map_err(parse_io_error)?; - let br = args.range(); - let (start, end) = match (br.offset(), br.size()) { - // Read a specific range. - (Some(offset), Some(size)) => (offset, min(offset + size, meta.len())), - // Read from offset. - (Some(offset), None) => (offset, meta.len()), - // Read the last size bytes. - (None, Some(size)) => (meta.len() - size, meta.len()), - // Read the whole file. - (None, None) => (0, meta.len()), + let (start, end) = match (args.range().offset(), args.range().size()) { + (None, None) => (0, None), + (None, Some(size)) => { + let start = f + .seek(SeekFrom::End(size as i64)) + .await + .map_err(parse_io_error)?; + (start, Some(start + size)) + } + (Some(offset), None) => { + let start = f + .seek(SeekFrom::Start(offset)) + .await + .map_err(parse_io_error)?; + (start, None) + } + (Some(offset), Some(size)) => { + let start = f + .seek(SeekFrom::Start(offset)) + .await + .map_err(parse_io_error)?; + (start, Some(size)) + } }; - let mut r = oio::into_read_from_file(f, start, end); - // Rewind to make sure we are on the correct offset. - r.seek(SeekFrom::Start(0)).await?; + let r = oio::FileReader::new(f, start, end); - Ok((RpRead::new(end - start), r)) + Ok((RpRead::new(0), r)) } async fn write(&self, path: &str, op: OpWrite) -> Result<(RpWrite, Self::Writer)> { @@ -344,37 +351,36 @@ impl Accessor for HdfsBackend { } fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> { - use oio::BlockingRead; + use std::io::Seek; let p = build_rooted_abs_path(&self.root, path); - // This will be addressed by https://github.com/apache/incubator-opendal/issues/506 - let meta = self.client.metadata(&p).map_err(parse_io_error)?; - - let f = self + let mut f = self .client .open_file() .read(true) .open(&p) .map_err(parse_io_error)?; - let br = args.range(); - let (start, end) = match (br.offset(), br.size()) { - // Read a specific range. - (Some(offset), Some(size)) => (offset, min(offset + size, meta.len())), - // Read from offset. - (Some(offset), None) => (offset, meta.len()), - // Read the last size bytes. - (None, Some(size)) => (meta.len() - size, meta.len()), - // Read the whole file. - (None, None) => (0, meta.len()), + let (start, end) = match (args.range().offset(), args.range().size()) { + (None, None) => (0, None), + (None, Some(size)) => { + let start = f.seek(SeekFrom::End(size as i64)).map_err(parse_io_error)?; + (start, Some(start + size)) + } + (Some(offset), None) => { + let start = f.seek(SeekFrom::Start(offset)).map_err(parse_io_error)?; + (start, None) + } + (Some(offset), Some(size)) => { + let start = f.seek(SeekFrom::Start(offset)).map_err(parse_io_error)?; + (start, Some(size)) + } }; - let mut r = oio::into_read_from_file(f, start, end); - // Rewind to make sure we are on the correct offset. - r.seek(SeekFrom::Start(0))?; + let r = oio::FileReader::new(f, start, end); - Ok((RpRead::new(end - start), r)) + Ok((RpRead::new(0), r)) } fn blocking_write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> { diff --git a/core/src/services/sftp/backend.rs b/core/src/services/sftp/backend.rs index 0f9b7f3bd..774221534 100644 --- a/core/src/services/sftp/backend.rs +++ b/core/src/services/sftp/backend.rs @@ -15,25 +15,27 @@ // specific language governing permissions and limitations // under the License. -use std::cmp::min; +use async_compat::Compat; use std::collections::HashMap; use std::fmt::Debug; use std::fmt::Formatter; +use std::io::SeekFrom; use std::path::Path; use std::path::PathBuf; +use std::pin::Pin; use async_trait::async_trait; use futures::StreamExt; use log::debug; use openssh::KnownHosts; use openssh::SessionBuilder; +use openssh_sftp_client::file::TokioCompatFile; use openssh_sftp_client::Sftp; use openssh_sftp_client::SftpOptions; use super::error::is_not_found; use super::error::is_sftp_protocol_error; use super::pager::SftpPager; -use super::utils::SftpReader; use super::writer::SftpWriter; use crate::raw::*; use crate::*; @@ -224,7 +226,7 @@ impl Debug for SftpBackend { #[async_trait] impl Accessor for SftpBackend { - type Reader = SftpReader; + type Reader = oio::FileReader<Pin<Box<Compat<TokioCompatFile>>>>; type BlockingReader = (); type Writer = SftpWriter; type BlockingWriter = (); @@ -285,41 +287,51 @@ impl Accessor for SftpBackend { } async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> { + use tokio::io::AsyncSeekExt; + let client = self.connect().await?; let mut fs = client.fs(); fs.set_cwd(&self.root); let path = fs.canonicalize(path).await?; - let mut file = client.open(path.as_path()).await?; - - let total_length = file.metadata().await?.len().ok_or(Error::new( - ErrorKind::NotFound, - format!("file not found: {}", path.to_str().unwrap()).as_str(), - ))?; - - let br = args.range(); - let (start, end) = match (br.offset(), br.size()) { - // Read a specific range. - (Some(offset), Some(size)) => (offset, min(offset + size, total_length)), - // Read from offset. - (Some(offset), None) => (offset, total_length), - // Read the last size bytes. - (None, Some(size)) => ( - if total_length > size { - total_length - size - } else { - 0 - }, - total_length, - ), - // Read the whole file. - (None, None) => (0, total_length), + let mut f = client.open(path.as_path()).await?; + + let (start, end) = match (args.range().offset(), args.range().size()) { + (None, None) => (0, None), + (None, Some(size)) => { + let start = f + .seek(SeekFrom::End(size as i64)) + .await + .map_err(parse_io_error)?; + (start, Some(start + size)) + } + (Some(offset), None) => { + let start = f + .seek(SeekFrom::Start(offset)) + .await + .map_err(parse_io_error)?; + (start, None) + } + (Some(offset), Some(size)) => { + let start = f + .seek(SeekFrom::Start(offset)) + .await + .map_err(parse_io_error)?; + (start, Some(size)) + } }; - let r = SftpReader::new(file, start, end).await?; + // Sorry for the ugly code... + // + // - `f` is a openssh file. + // - `TokioCompatFile::new(f)` makes it implements tokio AsyncRead + AsyncSeek for openssh File. + // - `Compat::new(f)` make it compatible to `futures::AsyncRead + futures::AsyncSeek`. + // - `Box::pin(x)` to make sure this reader implements `Unpin`, since `TokioCompatFile` is not. + // - `oio::FileReader::new(x)` makes it a `oio::FileReader` which implements `oio::Read`. + let r = oio::FileReader::new(Box::pin(Compat::new(TokioCompatFile::new(f))), start, end); - Ok((RpRead::new(end - start), r)) + Ok((RpRead::new(0), r)) } async fn write(&self, path: &str, op: OpWrite) -> Result<(RpWrite, Self::Writer)> { @@ -546,3 +558,23 @@ async fn connect_sftp( Ok(sftp) } + +/// Parse all io related errors. +pub fn parse_io_error(err: std::io::Error) -> Error { + use std::io::ErrorKind::*; + + let (kind, retryable) = match err.kind() { + NotFound => (ErrorKind::NotFound, false), + PermissionDenied => (ErrorKind::PermissionDenied, false), + Interrupted | UnexpectedEof | TimedOut | WouldBlock => (ErrorKind::Unexpected, true), + _ => (ErrorKind::Unexpected, true), + }; + + let mut err = Error::new(kind, &err.kind().to_string()).set_source(err); + + if retryable { + err = err.set_temporary(); + } + + err +} diff --git a/core/src/services/sftp/utils.rs b/core/src/services/sftp/utils.rs index fc8bb8a6f..5e627b89a 100644 --- a/core/src/services/sftp/utils.rs +++ b/core/src/services/sftp/utils.rs @@ -15,83 +15,10 @@ // specific language governing permissions and limitations // under the License. -use std::io::SeekFrom; -use std::pin::Pin; -use std::task::Context; -use std::task::Poll; - -use async_compat::Compat; -use futures::AsyncBufRead; -use futures::AsyncRead; -use futures::AsyncSeek; -use openssh_sftp_client::file::File; -use openssh_sftp_client::file::TokioCompatFile; use openssh_sftp_client::metadata::MetaData as SftpMeta; -use crate::raw::oio; -use crate::raw::oio::FromFileReader; -use crate::raw::oio::ReadExt; use crate::EntryMode; use crate::Metadata; -use crate::Result; - -pub struct SftpReaderInner { - file: Pin<Box<Compat<TokioCompatFile>>>, -} -pub type SftpReader = FromFileReader<SftpReaderInner>; - -impl SftpReaderInner { - pub async fn new(file: File) -> Self { - let file = Compat::new(file.into()); - Self { - file: Box::pin(file), - } - } -} - -impl SftpReader { - /// Create a new reader from a file, starting at the given offset and ending at the given offset. - pub async fn new(file: File, start: u64, end: u64) -> Result<Self> { - let file = SftpReaderInner::new(file).await; - let mut r = oio::into_read_from_file(file, start, end); - r.seek(SeekFrom::Start(0)).await?; - Ok(r) - } -} - -impl AsyncRead for SftpReaderInner { - fn poll_read( - self: Pin<&mut Self>, - cx: &mut Context, - buf: &mut [u8], - ) -> Poll<std::io::Result<usize>> { - let this = self.get_mut(); - Pin::new(&mut this.file).poll_read(cx, buf) - } -} - -impl AsyncBufRead for SftpReaderInner { - fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context) -> Poll<std::io::Result<&[u8]>> { - let this = self.get_mut(); - Pin::new(&mut this.file).poll_fill_buf(cx) - } - - fn consume(self: Pin<&mut Self>, amt: usize) { - let this = self.get_mut(); - Pin::new(&mut this.file).consume(amt) - } -} - -impl AsyncSeek for SftpReaderInner { - fn poll_seek( - self: Pin<&mut Self>, - cx: &mut Context, - pos: SeekFrom, - ) -> Poll<std::io::Result<u64>> { - let this = self.get_mut(); - Pin::new(&mut this.file).poll_seek(cx, pos) - } -} impl From<SftpMeta> for Metadata { fn from(meta: SftpMeta) -> Self {
