This is an automated email from the ASF dual-hosted git repository. xuanwo pushed a commit to branch lazy-reader in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
commit 0c28f67d3f4970fe29dfe7059cda71ed3f0c71e1 Author: Xuanwo <[email protected]> AuthorDate: Thu Oct 26 17:01:03 2023 +0800 Add file reader support Signed-off-by: Xuanwo <[email protected]> --- core/src/raw/oio/buf/adaptive.rs | 140 ++++++++++ core/src/raw/oio/buf/mod.rs | 3 + core/src/raw/oio/read/file_read.rs | 546 +++++++++++++++++++++++++++++-------- core/src/services/fs/backend.rs | 67 +---- core/src/services/hdfs/backend.rs | 65 +---- core/src/services/sftp/backend.rs | 41 +-- 6 files changed, 599 insertions(+), 263 deletions(-) diff --git a/core/src/raw/oio/buf/adaptive.rs b/core/src/raw/oio/buf/adaptive.rs new file mode 100644 index 000000000..eb5b72b04 --- /dev/null +++ b/core/src/raw/oio/buf/adaptive.rs @@ -0,0 +1,140 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use bytes::{Bytes, BytesMut}; +use std::cmp; +use tokio::io::ReadBuf; + +/// The default minimum adaptive buffer size is 8 KiB. +const DEFAULT_MIN_BUFFER_SIZE: usize = 8192; + +/// The default maximum adaptive buffer size is 4 MiB. +/// +/// We will not grow the buffer beyond this size. +const DEFAULT_MAX_BUFFER_SIZE: usize = 4 * 1024 * 1024; + +/// AdaptiveBuf is inspired by hyper [ReadStrategy](https://github.com/hyperium/hyper/blob/master/src/proto/h1/io.rs#L26). +/// +/// We build this adaptive buf to make our internal buf grow and shrink automatically based on IO +/// throughput. +pub struct AdaptiveBuf { + /// The underlying buffer. + buffer: BytesMut, + + next: usize, + decrease_now: bool, +} + +impl Default for AdaptiveBuf { + fn default() -> Self { + Self { + buffer: BytesMut::default(), + next: DEFAULT_MIN_BUFFER_SIZE, + decrease_now: false, + } + } +} + +impl AdaptiveBuf { + /// reserve will reserve the buffer to the next size. + pub fn reserve(&mut self) { + if self.buffer.capacity() < self.next { + self.buffer.reserve(self.next); + } + } + + /// Returning the initialized part of the buffer. + pub fn initialized_mut<'a>(&'a mut self) -> ReadBuf<'a> { + let dst = self.buffer.spare_capacity_mut(); + let length = dst.len(); + let mut buf = ReadBuf::uninit(dst); + + // Safety: we make sure that we only return the initialized part of the buffer. + unsafe { + buf.assume_init(length); + } + buf + } + + /// Records the number of bytes read from the underlying IO. + pub fn record(&mut self, read: usize) { + if read >= self.next { + // Growing if we uses the whole buffer. + self.next = cmp::min(self.next.saturating_mul(2), DEFAULT_MAX_BUFFER_SIZE); + self.decrease_now = false; + } else { + // Shrinking if we uses less than half of the buffer. + let decr_to = self.next.saturating_div(2); + if read < decr_to { + if self.decrease_now { + self.next = cmp::max(decr_to, DEFAULT_MIN_BUFFER_SIZE); + self.decrease_now = false; + } else { + // Mark decrease_now as true to shrink the buffer next time. + self.decrease_now = true; + } + } else { + // Mark decrease_now as false to keep current buffer size. + self.decrease_now = false; + } + } + } + + /// Splits the buffer into two at the given index. + /// + /// # Safety + /// + /// It's required that buffer has been filled with given bytes. + pub fn split(&mut self, n: usize) -> Bytes { + unsafe { self.buffer.set_len(n) } + self.buffer.split().freeze() + } +} + +#[cfg(tests)] +mod tests { + use super::*; + + #[test] + fn read_strategy_adaptive_decrements() { + let mut huf = AdaptiveBuf::default(); + huf.record(8192); + assert_eq!(huf.next, 16384); + + huf.record(1); + assert_eq!( + huf.next, 16384, + "first smaller record doesn't decrement yet" + ); + huf.record(8192); + assert_eq!(huf.next, 16384, "record was with range"); + + huf.record(1); + assert_eq!( + huf.next, 16384, + "in-range record should make this the 'first' again" + ); + + huf.record(1); + assert_eq!(huf.next, 8192, "second smaller record decrements"); + + huf.record(1); + assert_eq!(huf.next, 8192, "first doesn't decrement"); + huf.record(1); + assert_eq!(huf.next, 8192, "doesn't decrement under minimum"); + } +} diff --git a/core/src/raw/oio/buf/mod.rs b/core/src/raw/oio/buf/mod.rs index dfd3663e5..abc8bf328 100644 --- a/core/src/raw/oio/buf/mod.rs +++ b/core/src/raw/oio/buf/mod.rs @@ -20,3 +20,6 @@ pub use chunked_bytes::ChunkedBytes; mod write_buf; pub use write_buf::WriteBuf; + +mod adaptive; +pub use adaptive::AdaptiveBuf; diff --git a/core/src/raw/oio/read/file_read.rs b/core/src/raw/oio/read/file_read.rs index 852e7c47d..b06e65fef 100644 --- a/core/src/raw/oio/read/file_read.rs +++ b/core/src/raw/oio/read/file_read.rs @@ -16,171 +16,495 @@ // under the License. use std::cmp; -use std::io::Read; -use std::io::Seek; + use std::io::SeekFrom; use std::pin::Pin; +use std::sync::Arc; use std::task::ready; use std::task::Context; use std::task::Poll; use bytes::Bytes; -use futures::AsyncRead; -use futures::AsyncSeek; +use futures::future::BoxFuture; +use futures::Future; use crate::raw::*; use crate::*; -/// FileReader implements [`oio::Read`] via `AsyncRead + AsyncSeek`. -pub struct FileReader<R> { - inner: R, +/// FileReader that implement range read and streamable read on seekable reader. +/// +/// `oio::Reader` requires the underlying reader to handle range correctly and have streamable support. +/// But some services like `fs`, `hdfs` only have seek support. FileReader implements range and stream +/// support based on `seek`. We will maintain the correct range for give file and implement streamable +/// operations based on [`oio::AdaptiveBuf`]. +pub struct FileReader<A: Accessor, R> { + acc: Arc<A>, + path: Arc<String>, + op: OpRead, + + offset: Option<u64>, + size: Option<u64>, + cur: u64, - start: u64, - end: Option<u64>, + buf: oio::AdaptiveBuf, + state: State<R>, +} - offset: u64, +enum State<R> { + Idle, + Send(BoxFuture<'static, Result<(RpRead, R)>>), + Read(R), } -impl<R> FileReader<R> { +/// Safety: State will only be accessed under &mut. +unsafe impl<R> Sync for State<R> {} + +impl<A, R> FileReader<A, R> +where + A: Accessor, +{ /// Create a new FileReader. /// /// # Notes /// /// It's required that input reader's cursor is at the input `start` of the file. - pub fn new(fd: R, start: u64, end: Option<u64>) -> FileReader<R> { + pub fn new(acc: Arc<A>, path: &str, op: OpRead) -> FileReader<A, R> { FileReader { - inner: fd, - start, - end, + acc, + path: Arc::new(path.to_string()), + op, - offset: start, + offset: None, + size: None, + cur: 0, + buf: oio::AdaptiveBuf::default(), + state: State::<R>::Idle, } } +} - fn calculate_position(&self, pos: SeekFrom) -> Result<SeekFrom> { - match pos { - SeekFrom::Start(n) => { - if n < self.start { - return Err(Error::new( - ErrorKind::InvalidInput, - "seek to a negative position is invalid", - ) - .with_context("position", format!("{pos:?}"))); - } +impl<A, R> FileReader<A, R> +where + A: Accessor<Reader = R>, + R: oio::Read, +{ + fn read_future(&self) -> BoxFuture<'static, Result<(RpRead, R)>> { + let acc = self.acc.clone(); + let path = self.path.clone(); + + // FileReader doesn't support range, we will always use full range to open a file. + let op = self.op.clone().with_range(BytesRange::from(..)); - Ok(SeekFrom::Start(self.start + n)) + Box::pin(async move { acc.read(&path, op).await }) + } +} + +impl<A, R> oio::Read for FileReader<A, R> +where + A: Accessor<Reader = R>, + R: oio::Read, +{ + fn poll_read(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<Result<usize>> { + match &mut self.state { + State::Idle => { + self.state = State::Send(self.read_future()); + self.poll_read(cx, buf) + } + State::Send(fut) => { + let (_, r) = ready!(Pin::new(fut).poll(cx)).map_err(|err| { + // If send future returns an error, we should reset + // state to Idle so that we can retry it. + self.state = State::Idle; + err + })?; + self.state = State::Read(r); + self.poll_read(cx, buf) } - SeekFrom::End(n) => { - let end = if let Some(end) = self.end { - end as i64 + n + State::Read(r) => { + // We should know where to start read the data. + if self.offset.is_none() { + let (offset, size) = match (self.op.range().offset(), self.op.range().size()) { + (None, None) => (0, None), + (None, Some(size)) => { + let start = ready!(r.poll_seek(cx, SeekFrom::End(size as i64)))?; + (start, Some(size)) + } + (Some(offset), None) => { + let start = ready!(r.poll_seek(cx, SeekFrom::Start(offset)))?; + (start, None) + } + (Some(offset), Some(size)) => { + let start = ready!(r.poll_seek(cx, SeekFrom::Start(offset)))?; + (start, Some(size)) + } + }; + self.offset = Some(offset); + self.size = size; + } + let size = if let Some(size) = self.size { + // Sanity check. + if self.cur >= size { + return Poll::Ready(Ok(0)); + } + cmp::min(buf.len(), (size - self.cur) as usize) } else { - n + buf.len() }; - if self.start as i64 + end < 0 { - return Err(Error::new( - ErrorKind::InvalidInput, - "seek to a negative position is invalid", - ) - .with_context("position", format!("{pos:?}"))); + match ready!(r.poll_read(cx, &mut buf[..size])) { + Ok(0) => Poll::Ready(Ok(0)), + Ok(n) => { + self.cur += n as u64; + Poll::Ready(Ok(n)) + } + // We don't need to reset state here since it's ok to poll the same reader. + Err(err) => Poll::Ready(Err(err)), } + } + } + } - Ok(SeekFrom::End(end)) + fn poll_seek(&mut self, cx: &mut Context<'_>, pos: SeekFrom) -> Poll<Result<u64>> { + match &mut self.state { + State::Idle => { + self.state = State::Send(self.read_future()); + self.poll_seek(cx, pos) } - SeekFrom::Current(n) => { - if self.offset as i64 + n < self.start as i64 { - return Err(Error::new( - ErrorKind::InvalidInput, - "seek to a negative position is invalid", - ) - .with_context("position", format!("{pos:?}"))); + State::Send(fut) => { + let (_, r) = ready!(Pin::new(fut).poll(cx)).map_err(|err| { + // If send future returns an error, we should reset + // state to Idle so that we can retry it. + self.state = State::Idle; + err + })?; + self.state = State::Read(r); + self.poll_seek(cx, pos) + } + State::Read(r) => { + // We should know where to start read the data. + if self.offset.is_none() { + let (offset, size) = match (self.op.range().offset(), self.op.range().size()) { + (None, None) => (0, None), + (None, Some(size)) => { + let start = ready!(r.poll_seek(cx, SeekFrom::End(-(size as i64))))?; + (start, Some(size)) + } + (Some(offset), None) => { + let start = ready!(r.poll_seek(cx, SeekFrom::Start(offset)))?; + (start, None) + } + (Some(offset), Some(size)) => { + let start = ready!(r.poll_seek(cx, SeekFrom::Start(offset)))?; + (start, Some(size)) + } + }; + self.offset = Some(offset); + self.size = size; } - - Ok(SeekFrom::Current(n)) + let pos = calculate_position(self.offset, self.size, self.cur, pos)?; + let cur = ready!(r.poll_seek(cx, pos))?; + self.cur = cur - self.offset.unwrap(); + Poll::Ready(Ok(cur)) } } } -} -impl<R> oio::Read for FileReader<R> -where - R: AsyncRead + AsyncSeek + Unpin + Send + Sync, -{ - fn poll_read(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<Result<usize>> { - let size = if let Some(end) = self.end { - if self.offset >= end { - return Poll::Ready(Ok(0)); - } - cmp::min(buf.len(), (end - self.offset) as usize) - } else { - buf.len() - }; - - let n = - ready!(Pin::new(&mut self.inner).poll_read(cx, &mut buf[..size])).map_err(|err| { - Error::new(ErrorKind::Unexpected, "read data from FileReader").set_source(err) - })?; - self.offset += n as u64; - Poll::Ready(Ok(n)) - } - - fn poll_seek(&mut self, cx: &mut Context<'_>, pos: SeekFrom) -> Poll<Result<u64>> { - let pos = self.calculate_position(pos)?; + fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes>>> { + match &mut self.state { + State::Idle => { + self.state = State::Send(self.read_future()); + self.poll_next(cx) + } + State::Send(fut) => { + let (_, r) = ready!(Pin::new(fut).poll(cx)).map_err(|err| { + // If send future returns an error, we should reset + // state to Idle so that we can retry it. + self.state = State::Idle; + err + })?; + self.state = State::Read(r); + self.poll_next(cx) + } + State::Read(r) => { + // We should know where to start read the data. + if self.offset.is_none() { + let (offset, size) = match (self.op.range().offset(), self.op.range().size()) { + (None, None) => (0, None), + (None, Some(size)) => { + let start = ready!(r.poll_seek(cx, SeekFrom::End(size as i64)))?; + (start, Some(size)) + } + (Some(offset), None) => { + let start = ready!(r.poll_seek(cx, SeekFrom::Start(offset)))?; + (start, None) + } + (Some(offset), Some(size)) => { + let start = ready!(r.poll_seek(cx, SeekFrom::Start(offset)))?; + (start, Some(size)) + } + }; + self.offset = Some(offset); + self.size = size; + } - let cur = ready!(Pin::new(&mut self.inner).poll_seek(cx, pos)).map_err(|err| { - Error::new(ErrorKind::Unexpected, "seek data from FileReader").set_source(err) - })?; + self.buf.reserve(); - self.offset = cur; - Poll::Ready(Ok(self.offset - self.start)) - } + let mut buf = self.buf.initialized_mut(); + let buf = buf.initialized_mut(); - fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes>>> { - let _ = cx; + let size = if let Some(size) = self.size { + // Sanity check. + if self.cur >= size { + return Poll::Ready(None); + } + cmp::min(buf.len(), (size - self.cur) as usize) + } else { + buf.len() + }; - Poll::Ready(Some(Err(Error::new( - ErrorKind::Unsupported, - "output reader doesn't support next", - )))) + match ready!(r.poll_read(cx, &mut buf[..size])) { + Ok(0) => Poll::Ready(None), + Ok(n) => { + self.cur += n as u64; + self.buf.record(n); + Poll::Ready(Some(Ok(self.buf.split(n)))) + } + // We don't need to reset state here since it's ok to poll the same reader. + Err(err) => Poll::Ready(Some(Err(err))), + } + } + } } } -impl<R> oio::BlockingRead for FileReader<R> +impl<A, R> oio::BlockingRead for FileReader<A, R> where - R: Read + Seek + Send + Sync + 'static, + A: Accessor<BlockingReader = R>, + R: oio::BlockingRead, { fn read(&mut self, buf: &mut [u8]) -> Result<usize> { - let size = if let Some(end) = self.end { - if self.offset >= end { - return Ok(0); - } - cmp::min(buf.len(), (end - self.offset) as usize) - } else { - buf.len() - }; - - let n = self.inner.read(&mut buf[..size]).map_err(|err| { - Error::new(ErrorKind::Unexpected, "read data from FileReader").set_source(err) - })?; - self.offset += n as u64; - Ok(n) + match &mut self.state { + State::Idle => { + // FileReader doesn't support range, we will always use full range to open a file. + let op = self.op.clone().with_range(BytesRange::from(..)); + + let (_, r) = self.acc.blocking_read(&self.path, op)?; + self.state = State::Read(r); + self.read(buf) + } + + State::Read(r) => { + // We should know where to start read the data. + if self.offset.is_none() { + let (offset, size) = match (self.op.range().offset(), self.op.range().size()) { + (None, None) => (0, None), + (None, Some(size)) => { + let start = r.seek(SeekFrom::End(size as i64))?; + (start, Some(size)) + } + (Some(offset), None) => { + let start = r.seek(SeekFrom::Start(offset))?; + (start, None) + } + (Some(offset), Some(size)) => { + let start = r.seek(SeekFrom::Start(offset))?; + (start, Some(size)) + } + }; + self.offset = Some(offset); + self.size = size; + } + let size = if let Some(size) = self.size { + // Sanity check. + if self.cur >= size { + return Ok(0); + } + cmp::min(buf.len(), (size - self.cur) as usize) + } else { + buf.len() + }; + + match r.read(&mut buf[..size]) { + Ok(0) => Ok(0), + Ok(n) => { + self.cur += n as u64; + Ok(n) + } + // We don't need to reset state here since it's ok to poll the same reader. + Err(err) => Err(err), + } + } + State::Send(_) => { + unreachable!( + "It's invalid to go into State::Send for BlockingRead, please report this bug" + ) + } + } } fn seek(&mut self, pos: SeekFrom) -> Result<u64> { - let pos = self.calculate_position(pos)?; - - let cur = self.inner.seek(pos).map_err(|err| { - Error::new(ErrorKind::Unexpected, "seek data from FileReader").set_source(err) - })?; + match &mut self.state { + State::Idle => { + // FileReader doesn't support range, we will always use full range to open a file. + let op = self.op.clone().with_range(BytesRange::from(..)); - self.offset = cur; - Ok(self.offset - self.start) + let (_, r) = self.acc.blocking_read(&self.path, op)?; + self.state = State::Read(r); + self.seek(pos) + } + State::Read(r) => { + // We should know where to start read the data. + if self.offset.is_none() { + let (offset, size) = match (self.op.range().offset(), self.op.range().size()) { + (None, None) => (0, None), + (None, Some(size)) => { + let start = r.seek(SeekFrom::End(-(size as i64)))?; + (start, Some(size)) + } + (Some(offset), None) => { + let start = r.seek(SeekFrom::Start(offset))?; + (start, None) + } + (Some(offset), Some(size)) => { + let start = r.seek(SeekFrom::Start(offset))?; + (start, Some(size)) + } + }; + self.offset = Some(offset); + self.size = size; + } + let pos = calculate_position(self.offset, self.size, self.cur, pos)?; + let cur = r.seek(pos)?; + self.cur = cur - self.offset.unwrap(); + Ok(self.cur) + } + State::Send(_) => { + unreachable!( + "It's invalid to go into State::Send for BlockingRead, please report this bug" + ) + } + } } fn next(&mut self) -> Option<Result<Bytes>> { - Some(Err(Error::new( - ErrorKind::Unsupported, - "output reader doesn't support iterating", - ))) + match &mut self.state { + State::Idle => { + // FileReader doesn't support range, we will always use full range to open a file. + let op = self.op.clone().with_range(BytesRange::from(..)); + + let r = match self.acc.blocking_read(&self.path, op) { + Ok((_, r)) => r, + Err(err) => return Some(Err(err)), + }; + self.state = State::Read(r); + self.next() + } + + State::Read(r) => { + // We should know where to start read the data. + if self.offset.is_none() { + let (offset, size) = match (self.op.range().offset(), self.op.range().size()) { + (None, None) => (0, None), + (None, Some(size)) => { + let start = match r.seek(SeekFrom::End(size as i64)) { + Ok(v) => v, + Err(err) => return Some(Err(err)), + }; + (start, Some(size)) + } + (Some(offset), None) => { + let start = match r.seek(SeekFrom::Start(offset)) { + Ok(v) => v, + Err(err) => return Some(Err(err)), + }; + (start, None) + } + (Some(offset), Some(size)) => { + let start = match r.seek(SeekFrom::Start(offset)) { + Ok(v) => v, + Err(err) => return Some(Err(err)), + }; + (start, Some(size)) + } + }; + self.offset = Some(offset); + self.size = size; + } + + self.buf.reserve(); + + let mut buf = self.buf.initialized_mut(); + let buf = buf.initialized_mut(); + + let size = if let Some(size) = self.size { + // Sanity check. + if self.cur >= size { + return None; + } + cmp::min(buf.len(), (size - self.cur) as usize) + } else { + buf.len() + }; + + match r.read(&mut buf[..size]) { + Ok(0) => None, + Ok(n) => { + self.cur += n as u64; + self.buf.record(n); + Some(Ok(self.buf.split(n))) + } + // We don't need to reset state here since it's ok to poll the same reader. + Err(err) => Some(Err(err)), + } + } + State::Send(_) => { + unreachable!( + "It's invalid to go into State::Send for BlockingRead, please report this bug" + ) + } + } + } +} + +/// Calculate the actual position that we should seek to. +fn calculate_position( + offset: Option<u64>, + size: Option<u64>, + cur: u64, + pos: SeekFrom, +) -> Result<SeekFrom> { + let offset = offset.expect("offset should be set for calculate_position"); + + match pos { + SeekFrom::Start(n) => { + // It's valid for user to seek outsides end of the file. + Ok(SeekFrom::Start(offset + n)) + } + SeekFrom::End(n) => { + if let Some(size) = size { + if size as i64 + n < 0 { + return Err(Error::new( + ErrorKind::InvalidInput, + "seek to a negative position is invalid", + ) + .with_context("position", format!("{pos:?}"))); + } + // size is known, we can convert SeekFrom::End into SeekFrom::Start. + Ok(SeekFrom::Start(offset + (size as i64 + n) as u64)) + } else { + // size unknown means we can forward seek end to underlying reader directly. + Ok(SeekFrom::End(n)) + } + } + SeekFrom::Current(n) => { + if cur as i64 + n < 0 { + return Err(Error::new( + ErrorKind::InvalidInput, + "seek to a negative position is invalid", + ) + .with_context("position", format!("{pos:?}"))); + } + Ok(SeekFrom::Start(offset + (cur as i64 + n) as u64)) + } } } diff --git a/core/src/services/fs/backend.rs b/core/src/services/fs/backend.rs index 524c4ef5e..034d75e03 100644 --- a/core/src/services/fs/backend.rs +++ b/core/src/services/fs/backend.rs @@ -16,11 +16,9 @@ // under the License. use std::collections::HashMap; -use std::io::SeekFrom; use std::path::Path; use std::path::PathBuf; -use async_compat::Compat; use async_trait::async_trait; use chrono::DateTime; use log::debug; @@ -246,8 +244,8 @@ impl FsBackend { #[async_trait] impl Accessor for FsBackend { - type Reader = oio::FileReader<Compat<tokio::fs::File>>; - type BlockingReader = oio::FileReader<std::fs::File>; + type Reader = oio::TokioReader<tokio::fs::File>; + type BlockingReader = oio::StdReader<std::fs::File>; type Writer = FsWriter<tokio::fs::File>; type BlockingWriter = FsWriter<std::fs::File>; type Pager = Option<FsPager<tokio::fs::ReadDir>>; @@ -262,7 +260,6 @@ impl Accessor for FsBackend { read: true, read_can_seek: true, - read_with_range: true, write: true, write_can_empty: true, @@ -303,12 +300,10 @@ impl Accessor for FsBackend { /// - open file first, and than use `seek`. (100ns) /// /// Benchmark could be found [here](https://gist.github.com/Xuanwo/48f9cfbc3022ea5f865388bb62e1a70f) - async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> { - use tokio::io::AsyncSeekExt; - + async fn read(&self, path: &str, _: OpRead) -> Result<(RpRead, Self::Reader)> { let p = self.root.join(path.trim_end_matches('/')); - let mut f = tokio::fs::OpenOptions::new() + let f = tokio::fs::OpenOptions::new() .read(true) .open(&p) .await @@ -331,33 +326,7 @@ impl Accessor for FsBackend { } } - let (start, end) = match (args.range().offset(), args.range().size()) { - (None, None) => (0, None), - (None, Some(size)) => { - let start = f - .seek(SeekFrom::End(size as i64)) - .await - .map_err(new_std_io_error)?; - (start, Some(start + size)) - } - (Some(offset), None) => { - let start = f - .seek(SeekFrom::Start(offset)) - .await - .map_err(new_std_io_error)?; - (start, None) - } - (Some(offset), Some(size)) => { - let start = f - .seek(SeekFrom::Start(offset)) - .await - .map_err(new_std_io_error)?; - (start, Some(size)) - } - }; - - let r = oio::FileReader::new(Compat::new(f), start, end); - + let r = oio::TokioReader::new(f); Ok((RpRead::new(0), r)) } @@ -504,12 +473,10 @@ impl Accessor for FsBackend { Ok(RpCreateDir::default()) } - fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> { - use std::io::Seek; - + fn blocking_read(&self, path: &str, _: OpRead) -> Result<(RpRead, Self::BlockingReader)> { let p = self.root.join(path.trim_end_matches('/')); - let mut f = std::fs::OpenOptions::new() + let f = std::fs::OpenOptions::new() .read(true) .open(p) .map_err(new_std_io_error)?; @@ -531,25 +498,7 @@ impl Accessor for FsBackend { } } - let (start, end) = match (args.range().offset(), args.range().size()) { - (None, None) => (0, None), - (None, Some(size)) => { - let start = f - .seek(SeekFrom::End(size as i64)) - .map_err(new_std_io_error)?; - (start, Some(start + size)) - } - (Some(offset), None) => { - let start = f.seek(SeekFrom::Start(offset)).map_err(new_std_io_error)?; - (start, None) - } - (Some(offset), Some(size)) => { - let start = f.seek(SeekFrom::Start(offset)).map_err(new_std_io_error)?; - (start, Some(size)) - } - }; - - let r = oio::FileReader::new(f, start, end); + let r = oio::StdReader::new(f); Ok((RpRead::new(0), r)) } diff --git a/core/src/services/hdfs/backend.rs b/core/src/services/hdfs/backend.rs index a7fb23f0c..fed635b15 100644 --- a/core/src/services/hdfs/backend.rs +++ b/core/src/services/hdfs/backend.rs @@ -18,7 +18,6 @@ use std::collections::HashMap; use std::fmt::Debug; use std::io; -use std::io::SeekFrom; use std::path::PathBuf; use std::sync::Arc; @@ -158,8 +157,8 @@ unsafe impl Sync for HdfsBackend {} #[async_trait] impl Accessor for HdfsBackend { - type Reader = oio::FileReader<hdrs::AsyncFile>; - type BlockingReader = oio::FileReader<hdrs::File>; + type Reader = oio::FuturesReader<hdrs::AsyncFile>; + type BlockingReader = oio::StdReader<hdrs::File>; type Writer = HdfsWriter<hdrs::AsyncFile>; type BlockingWriter = HdfsWriter<hdrs::File>; type Pager = Option<HdfsPager>; @@ -174,7 +173,6 @@ impl Accessor for HdfsBackend { read: true, read_can_seek: true, - read_with_range: true, write: true, // TODO: wait for https://github.com/apache/incubator-opendal/pull/2715 @@ -202,12 +200,10 @@ impl Accessor for HdfsBackend { Ok(RpCreateDir::default()) } - async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> { - use futures::AsyncSeekExt; - + async fn read(&self, path: &str, _: OpRead) -> Result<(RpRead, Self::Reader)> { let p = build_rooted_abs_path(&self.root, path); - let mut f = self + let f = self .client .open_file() .read(true) @@ -215,32 +211,7 @@ impl Accessor for HdfsBackend { .await .map_err(new_std_io_error)?; - let (start, end) = match (args.range().offset(), args.range().size()) { - (None, None) => (0, None), - (None, Some(size)) => { - let start = f - .seek(SeekFrom::End(size as i64)) - .await - .map_err(new_std_io_error)?; - (start, Some(start + size)) - } - (Some(offset), None) => { - let start = f - .seek(SeekFrom::Start(offset)) - .await - .map_err(new_std_io_error)?; - (start, None) - } - (Some(offset), Some(size)) => { - let start = f - .seek(SeekFrom::Start(offset)) - .await - .map_err(new_std_io_error)?; - (start, Some(size)) - } - }; - - let r = oio::FileReader::new(f, start, end); + let r = oio::FuturesReader::new(f); Ok((RpRead::new(0), r)) } @@ -352,37 +323,17 @@ impl Accessor for HdfsBackend { Ok(RpCreateDir::default()) } - fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> { - use std::io::Seek; - + fn blocking_read(&self, path: &str, _: OpRead) -> Result<(RpRead, Self::BlockingReader)> { let p = build_rooted_abs_path(&self.root, path); - let mut f = self + let f = self .client .open_file() .read(true) .open(&p) .map_err(new_std_io_error)?; - let (start, end) = match (args.range().offset(), args.range().size()) { - (None, None) => (0, None), - (None, Some(size)) => { - let start = f - .seek(SeekFrom::End(size as i64)) - .map_err(new_std_io_error)?; - (start, Some(start + size)) - } - (Some(offset), None) => { - let start = f.seek(SeekFrom::Start(offset)).map_err(new_std_io_error)?; - (start, None) - } - (Some(offset), Some(size)) => { - let start = f.seek(SeekFrom::Start(offset)).map_err(new_std_io_error)?; - (start, Some(size)) - } - }; - - let r = oio::FileReader::new(f, start, end); + let r = oio::StdReader::new(f); Ok((RpRead::new(0), r)) } diff --git a/core/src/services/sftp/backend.rs b/core/src/services/sftp/backend.rs index 258c77066..5c67dab76 100644 --- a/core/src/services/sftp/backend.rs +++ b/core/src/services/sftp/backend.rs @@ -15,11 +15,9 @@ // specific language governing permissions and limitations // under the License. -use async_compat::Compat; use std::collections::HashMap; use std::fmt::Debug; use std::fmt::Formatter; -use std::io::SeekFrom; use std::path::Path; use std::path::PathBuf; use std::pin::Pin; @@ -226,7 +224,7 @@ impl Debug for SftpBackend { #[async_trait] impl Accessor for SftpBackend { - type Reader = oio::FileReader<Pin<Box<Compat<TokioCompatFile>>>>; + type Reader = oio::TokioReader<Pin<Box<TokioCompatFile>>>; type BlockingReader = (); type Writer = SftpWriter; type BlockingWriter = (); @@ -241,7 +239,6 @@ impl Accessor for SftpBackend { stat: true, read: true, - read_with_range: true, read_can_seek: true, write: true, @@ -286,50 +283,22 @@ impl Accessor for SftpBackend { return Ok(RpCreateDir::default()); } - async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> { - use tokio::io::AsyncSeekExt; - + async fn read(&self, path: &str, _: OpRead) -> Result<(RpRead, Self::Reader)> { let client = self.connect().await?; let mut fs = client.fs(); fs.set_cwd(&self.root); let path = fs.canonicalize(path).await?; - let mut f = client.open(path.as_path()).await?; - - let (start, end) = match (args.range().offset(), args.range().size()) { - (None, None) => (0, None), - (None, Some(size)) => { - let start = f - .seek(SeekFrom::End(size as i64)) - .await - .map_err(new_std_io_error)?; - (start, Some(start + size)) - } - (Some(offset), None) => { - let start = f - .seek(SeekFrom::Start(offset)) - .await - .map_err(new_std_io_error)?; - (start, None) - } - (Some(offset), Some(size)) => { - let start = f - .seek(SeekFrom::Start(offset)) - .await - .map_err(new_std_io_error)?; - (start, Some(size)) - } - }; + let f = client.open(path.as_path()).await?; // Sorry for the ugly code... // // - `f` is a openssh file. // - `TokioCompatFile::new(f)` makes it implements tokio AsyncRead + AsyncSeek for openssh File. - // - `Compat::new(f)` make it compatible to `futures::AsyncRead + futures::AsyncSeek`. // - `Box::pin(x)` to make sure this reader implements `Unpin`, since `TokioCompatFile` is not. - // - `oio::FileReader::new(x)` makes it a `oio::FileReader` which implements `oio::Read`. - let r = oio::FileReader::new(Box::pin(Compat::new(TokioCompatFile::new(f))), start, end); + // - `oio::TokioReader::new(x)` makes it a `oio::TokioReader` which implements `oio::Read`. + let r = oio::TokioReader::new(Box::pin(TokioCompatFile::new(f))); Ok((RpRead::new(0), r)) }
