This is an automated email from the ASF dual-hosted git repository. xuanwo pushed a commit to branch lazy-reader in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
commit 08d757a4d3895ef084edc302f0b1079828e5928c Author: Xuanwo <[email protected]> AuthorDate: Wed Oct 25 23:14:37 2023 +0800 Implement tokio_read Signed-off-by: Xuanwo <[email protected]> --- core/src/raw/oio/read/mod.rs | 3 ++ core/src/raw/oio/read/tokio_read.rs | 88 +++++++++++++++++++++++++++++++++++++ 2 files changed, 91 insertions(+) diff --git a/core/src/raw/oio/read/mod.rs b/core/src/raw/oio/read/mod.rs index 9ccbebdc3..e5f66a538 100644 --- a/core/src/raw/oio/read/mod.rs +++ b/core/src/raw/oio/read/mod.rs @@ -39,3 +39,6 @@ pub use into_read_from_stream::FromStreamReader; mod futures_read; pub use futures_read::FuturesReader; + +mod tokio_read; +pub use tokio_read::TokioReader; diff --git a/core/src/raw/oio/read/tokio_read.rs b/core/src/raw/oio/read/tokio_read.rs new file mode 100644 index 000000000..966973bdc --- /dev/null +++ b/core/src/raw/oio/read/tokio_read.rs @@ -0,0 +1,88 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::raw::*; +use crate::*; +use bytes::Bytes; +use std::io::SeekFrom; +use std::pin::Pin; +use std::task::{ready, Context, Poll}; +use tokio::io::AsyncSeek; +use tokio::io::{AsyncRead, ReadBuf}; + +/// FuturesReader implements [`oio::Read`] via [`AsyncRead`] + [`AsyncSeek`]. +pub struct TokioReader<R: AsyncRead + AsyncSeek> { + inner: R, + + seek_pos: Option<SeekFrom>, +} + +impl<R: AsyncRead + AsyncSeek> TokioReader<R> { + /// Create a new tokio reader. + pub fn new(inner: R) -> Self { + Self { + inner, + seek_pos: None, + } + } +} + +impl<R> oio::Read for TokioReader<R> +where + R: AsyncRead + AsyncSeek + Unpin + Send + Sync, +{ + fn poll_read(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<Result<usize>> { + let mut buf = ReadBuf::new(buf); + + ready!(Pin::new(&mut self.inner).poll_read(cx, &mut buf)).map_err(|err| { + new_std_io_error(err) + .with_operation(oio::ReadOperation::Read) + .with_context("source", "TokioReader") + })?; + + Poll::Ready(Ok(buf.filled().len())) + } + + fn poll_seek(&mut self, cx: &mut Context<'_>, pos: SeekFrom) -> Poll<Result<u64>> { + if self.seek_pos != Some(pos) { + Pin::new(&mut self.inner).start_seek(pos).map_err(|err| { + new_std_io_error(err) + .with_operation(oio::ReadOperation::Seek) + .with_context("source", "TokioReader") + })?; + self.seek_pos = Some(pos) + } + + // NOTE: don't return error by `?` here, we need to reset seek_pos. + let pos = ready!(Pin::new(&mut self.inner).poll_complete(cx).map_err(|err| { + new_std_io_error(err) + .with_operation(oio::ReadOperation::Seek) + .with_context("source", "TokioReader") + })); + self.seek_pos = None; + Poll::Ready(pos) + } + + fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes>>> { + let _ = cx; + + Poll::Ready(Some(Err(Error::new( + ErrorKind::Unsupported, + "TokioReader doesn't support poll_next", + )))) + } +}
