This is an automated email from the ASF dual-hosted git repository. xuanwo pushed a commit to branch lazy-reader in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
commit c431480b8a353fee103a3f616ec4364b341df96e Author: Xuanwo <[email protected]> AuthorDate: Wed Oct 25 21:01:44 2023 +0800 Make clippy happy Signed-off-by: Xuanwo <[email protected]> --- core/src/raw/mod.rs | 3 ++ core/src/raw/oio/read/futures_read.rs | 67 ++++++++++++++++++++++++++++++++++ core/src/raw/oio/read/mod.rs | 3 ++ core/src/raw/std_io_util.rs | 48 ++++++++++++++++++++++++ core/tests/behavior/blocking_copy.rs | 6 +-- core/tests/behavior/blocking_rename.rs | 6 +-- core/tests/behavior/copy.rs | 8 ++-- core/tests/behavior/rename.rs | 6 +-- 8 files changed, 134 insertions(+), 13 deletions(-) diff --git a/core/src/raw/mod.rs b/core/src/raw/mod.rs index 313c5d04f..34f1a0c67 100644 --- a/core/src/raw/mod.rs +++ b/core/src/raw/mod.rs @@ -59,6 +59,9 @@ pub use chrono_util::*; mod tokio_util; pub use tokio_util::*; +mod std_io_util; +pub use std_io_util::*; + // Expose as a pub mod to avoid confusing. pub mod adapters; pub mod oio; diff --git a/core/src/raw/oio/read/futures_read.rs b/core/src/raw/oio/read/futures_read.rs new file mode 100644 index 000000000..03b52bbf7 --- /dev/null +++ b/core/src/raw/oio/read/futures_read.rs @@ -0,0 +1,67 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::raw::*; +use crate::*; +use bytes::Bytes; +use futures::AsyncRead; +use futures::AsyncSeek; +use std::io::SeekFrom; +use std::pin::Pin; +use std::task::{Context, Poll}; + +/// FuturesReader implements [`oio::Read`] via [`AsyncRead`] + [`AsyncSeek`]. +pub struct FuturesReader<R: AsyncRead + AsyncSeek> { + inner: R, +} + +impl<R: AsyncRead + AsyncSeek> FuturesReader<R> { + /// Create a new futures reader. + pub fn new(inner: R) -> Self { + Self { inner } + } +} + +impl<R> oio::Read for FuturesReader<R> +where + R: AsyncRead + AsyncSeek + Unpin + Send + Sync, +{ + fn poll_read(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<Result<usize>> { + Pin::new(&mut self.inner).poll_read(cx, buf).map_err(|err| { + new_std_io_error(err) + .with_operation(oio::ReadOperation::Read) + .with_context("source", "FuturesReader") + }) + } + + fn poll_seek(&mut self, cx: &mut Context<'_>, pos: SeekFrom) -> Poll<Result<u64>> { + Pin::new(&mut self.inner).poll_seek(cx, pos).map_err(|err| { + new_std_io_error(err) + .with_operation(oio::ReadOperation::Seek) + .with_context("source", "FuturesReader") + }) + } + + fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes>>> { + let _ = cx; + + Poll::Ready(Some(Err(Error::new( + ErrorKind::Unsupported, + "FuturesReader doesn't support poll_next", + )))) + } +} diff --git a/core/src/raw/oio/read/mod.rs b/core/src/raw/oio/read/mod.rs index 78e888865..9ccbebdc3 100644 --- a/core/src/raw/oio/read/mod.rs +++ b/core/src/raw/oio/read/mod.rs @@ -36,3 +36,6 @@ pub use file_read::FileReader; mod into_read_from_stream; pub use into_read_from_stream::into_read_from_stream; pub use into_read_from_stream::FromStreamReader; + +mod futures_read; +pub use futures_read::FuturesReader; diff --git a/core/src/raw/std_io_util.rs b/core/src/raw/std_io_util.rs new file mode 100644 index 000000000..a36e1e47f --- /dev/null +++ b/core/src/raw/std_io_util.rs @@ -0,0 +1,48 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::*; + +/// Parse std io error into opendal::Error. +/// +/// # TODO +/// +/// Add `NotADirectory` and `IsADirectory` once they are stable. +/// +/// ref: <https://github.com/rust-lang/rust/issues/86442> +pub fn new_std_io_error(err: std::io::Error) -> Error { + use std::io::ErrorKind::*; + + let (kind, retryable) = match err.kind() { + NotFound => (ErrorKind::NotFound, false), + PermissionDenied => (ErrorKind::PermissionDenied, false), + AlreadyExists => (ErrorKind::AlreadyExists, false), + InvalidInput => (ErrorKind::InvalidInput, false), + Unsupported => (ErrorKind::Unsupported, false), + + Interrupted | UnexpectedEof | TimedOut | WouldBlock => (ErrorKind::Unexpected, true), + _ => (ErrorKind::Unexpected, true), + }; + + let mut err = Error::new(kind, &err.kind().to_string()).set_source(err); + + if retryable { + err = err.set_temporary(); + } + + err +} diff --git a/core/tests/behavior/blocking_copy.rs b/core/tests/behavior/blocking_copy.rs index 53b26e8f0..9188e35ef 100644 --- a/core/tests/behavior/blocking_copy.rs +++ b/core/tests/behavior/blocking_copy.rs @@ -52,7 +52,7 @@ pub fn test_blocking_copy_file(op: BlockingOperator) -> Result<()> { let target_content = op.read(&target_path).expect("read must succeed"); assert_eq!( - format!("{:x}", Sha256::digest(&target_content)), + format!("{:x}", Sha256::digest(target_content)), format!("{:x}", Sha256::digest(&source_content)), ); @@ -142,7 +142,7 @@ pub fn test_blocking_copy_nested(op: BlockingOperator) -> Result<()> { let target_content = op.read(&target_path).expect("read must succeed"); assert_eq!( - format!("{:x}", Sha256::digest(&target_content)), + format!("{:x}", Sha256::digest(target_content)), format!("{:x}", Sha256::digest(&source_content)), ); @@ -168,7 +168,7 @@ pub fn test_blocking_copy_overwrite(op: BlockingOperator) -> Result<()> { let target_content = op.read(&target_path).expect("read must succeed"); assert_eq!( - format!("{:x}", Sha256::digest(&target_content)), + format!("{:x}", Sha256::digest(target_content)), format!("{:x}", Sha256::digest(&source_content)), ); diff --git a/core/tests/behavior/blocking_rename.rs b/core/tests/behavior/blocking_rename.rs index 24c1bc559..56a71dead 100644 --- a/core/tests/behavior/blocking_rename.rs +++ b/core/tests/behavior/blocking_rename.rs @@ -55,7 +55,7 @@ pub fn test_blocking_rename_file(op: BlockingOperator) -> Result<()> { let target_content = op.read(&target_path).expect("read must succeed"); assert_eq!( - format!("{:x}", Sha256::digest(&target_content)), + format!("{:x}", Sha256::digest(target_content)), format!("{:x}", Sha256::digest(&source_content)), ); @@ -148,7 +148,7 @@ pub fn test_blocking_rename_nested(op: BlockingOperator) -> Result<()> { let target_content = op.read(&target_path).expect("read must succeed"); assert_eq!( - format!("{:x}", Sha256::digest(&target_content)), + format!("{:x}", Sha256::digest(target_content)), format!("{:x}", Sha256::digest(&source_content)), ); @@ -177,7 +177,7 @@ pub fn test_blocking_rename_overwrite(op: BlockingOperator) -> Result<()> { let target_content = op.read(&target_path).expect("read must succeed"); assert_eq!( - format!("{:x}", Sha256::digest(&target_content)), + format!("{:x}", Sha256::digest(target_content)), format!("{:x}", Sha256::digest(&source_content)), ); diff --git a/core/tests/behavior/copy.rs b/core/tests/behavior/copy.rs index 7d65a2e6c..501ea33b9 100644 --- a/core/tests/behavior/copy.rs +++ b/core/tests/behavior/copy.rs @@ -53,7 +53,7 @@ pub async fn test_copy_file_with_ascii_name(op: Operator) -> Result<()> { let target_content = op.read(&target_path).await.expect("read must succeed"); assert_eq!( - format!("{:x}", Sha256::digest(&target_content)), + format!("{:x}", Sha256::digest(target_content)), format!("{:x}", Sha256::digest(&source_content)), ); @@ -73,7 +73,7 @@ pub async fn test_copy_file_with_non_ascii_name(op: Operator) -> Result<()> { let target_content = op.read(target_path).await.expect("read must succeed"); assert_eq!( - format!("{:x}", Sha256::digest(&target_content)), + format!("{:x}", Sha256::digest(target_content)), format!("{:x}", Sha256::digest(&source_content)), ); @@ -167,7 +167,7 @@ pub async fn test_copy_nested(op: Operator) -> Result<()> { let target_content = op.read(&target_path).await.expect("read must succeed"); assert_eq!( - format!("{:x}", Sha256::digest(&target_content)), + format!("{:x}", Sha256::digest(target_content)), format!("{:x}", Sha256::digest(&source_content)), ); @@ -193,7 +193,7 @@ pub async fn test_copy_overwrite(op: Operator) -> Result<()> { let target_content = op.read(&target_path).await.expect("read must succeed"); assert_eq!( - format!("{:x}", Sha256::digest(&target_content)), + format!("{:x}", Sha256::digest(target_content)), format!("{:x}", Sha256::digest(&source_content)), ); diff --git a/core/tests/behavior/rename.rs b/core/tests/behavior/rename.rs index 8880e16c8..9ed9172e3 100644 --- a/core/tests/behavior/rename.rs +++ b/core/tests/behavior/rename.rs @@ -55,7 +55,7 @@ pub async fn test_rename_file(op: Operator) -> Result<()> { let target_content = op.read(&target_path).await.expect("read must succeed"); assert_eq!( - format!("{:x}", Sha256::digest(&target_content)), + format!("{:x}", Sha256::digest(target_content)), format!("{:x}", Sha256::digest(&source_content)), ); @@ -152,7 +152,7 @@ pub async fn test_rename_nested(op: Operator) -> Result<()> { let target_content = op.read(&target_path).await.expect("read must succeed"); assert_eq!( - format!("{:x}", Sha256::digest(&target_content)), + format!("{:x}", Sha256::digest(target_content)), format!("{:x}", Sha256::digest(&source_content)), ); @@ -181,7 +181,7 @@ pub async fn test_rename_overwrite(op: Operator) -> Result<()> { let target_content = op.read(&target_path).await.expect("read must succeed"); assert_eq!( - format!("{:x}", Sha256::digest(&target_content)), + format!("{:x}", Sha256::digest(target_content)), format!("{:x}", Sha256::digest(&source_content)), );
